Source code for isopy.toolbox.doublespike

import numpy as np
import scipy.optimize as optimize
import isopy as isopy
import itertools as itertools
from collections import namedtuple as _namedtuple
from matplotlib import cm as mplcm
from matplotlib import colors as mplcolors
from isopy import core
from . import isotope
import warnings
from datetime import datetime as dt

__all__ = ['ds_correction', 'ds_Delta', 'ds_Delta_prime', 'ds_grid']

import isopy.checks

"""
Functions for doublespike data reduction
"""

def _inversion_rudge_function(x, P, n, T, m):
    x = x.reshape(3, -1)
    lambda_ = x[0]
    alpha = x[1] / (1 - lambda_)
    beta = x[2]

    return ((lambda_ * T) + ((1 - lambda_) * n * np.exp(-alpha * P)) - m * np.exp(
        -beta * P)).flatten()


# Currently no used as it doesnt work when the input has more than one dimension.
# Might just be a matter of rearranging the output
def _inversion_rudge_jacobian(x, P, n, T, m):
    x = x.reshape(3, -1)
    lambda_ = x[0]
    alpha = x[1] / (1 - lambda_)
    beta = x[2]
    output = np.transpose(
        [T - (n * np.exp(-alpha * P) * (1 + alpha * P)), -n * P * np.exp(-alpha * P),
         m * P * np.exp(-beta * P)])
    return output


# split if more than x so that it doesnt break
def ds_inversion_rudge(m, n, T, P, xtol=1e-10, **kwargs):
    """
    Double spike inversion using the method of Rudge et al. 2009, Chemical Geology 265 420-431.

    Parameters
    ----------
    self
    m : ndarray
        Measured isotope ratios. Must be an 1-dim array of size 3 or a 2-dim array with first dimension of 3.
    n : ndarray
        Reference isotope ratios. Must be a 1-dim array of size 3 or a 2-dim array with the same shape as `m`.
    T : ndarray
        Spike isotope ratios. Must be a 1-dim array of size 3 or a 2-dim array with the same shape as `m`.
    P : ndarray
        Log of mass isotope ratios. Must be a 1-dim array of size 3 or a 2-dim array with the same shape as `m`.
    xtol : float
        Required tolerance for inversion. Default value is 1E-10

    Returns
    -------
    DSResult
    """
    mndim = m.ndim
    if m.size == 3:
        m = m.reshape(3, 1)
    elif m.ndim == 1:
        raise ValueError()
    elif m.ndim == 2:
        if m.shape[0] != 3: raise ValueError(f'shape of m is incorrect ({m.shape})')
    else:
        raise ValueError('m has to many dimensions ({})'.format(m.ndim))

    if n.size == 3:
        n = np.ones(m.shape) * n.reshape(3, 1)
    elif n.shape != m.shape:
        raise ValueError('Shape of n {} does not match shape of m {}'.format(n.shape, m.shape))

    if T.size == 3:
        T = np.ones(m.shape) * T.reshape(3, 1)
    elif T.shape != m.shape:
        ValueError('Shape of T {} does not match shape of m {}'.format(T.shape, m.shape))

    if P.size == 3:
        P = np.ones(m.shape) * P.reshape(3, 1)
    elif P.shape != m.shape:
        ValueError('Shape of P {} does not match shape of m {}'.format(P.shape, m.shape))

    xtol = isopy.checks.check_type('xtol', xtol, np.float_, float, coerce=True)

    A = np.transpose([T - n, -n * P, m * P])
    b = np.transpose(m - n)
    x0 = np.transpose(np.linalg.solve(A, b))

    x, infodict, ier, mesg = optimize.fsolve(_inversion_rudge_function, x0, (P, n, T, m), None,
                                             True, xtol=xtol, **kwargs)
    if ier != 1:
        x = np.ones(m.shape) * np.nan

    if mndim == 2: x = x.reshape(3, -1)
    lambda_ = x[0]
    alpha = x[1] / (1 - lambda_)
    beta = x[2]

    Fnat = alpha * -1
    Fins = beta

    spike_fraction = (1 + ((1 - lambda_) / lambda_) * (
                (1 + np.sum((n / np.exp(alpha * P)), axis=0)) / (1 + np.sum(T, axis=0)))) ** (-1)
    sample_fraction = (1 - spike_fraction)
    Q = sample_fraction / spike_fraction

    static_items = dict(method='rudge')
    dynamic_items = dict(alpha=alpha,
                         beta=beta,
                         lambda_=lambda_,
                         fnat=Fnat,
                         fins=Fins,
                         spike_fraction=spike_fraction,
                         sample_fraction=sample_fraction,
                         Q=Q)

    return DSResult(static_items, dynamic_items)


def ds_inversion_siebert(MS, ST, SP, Mass, Fins_guess=2, Fnat_guess=-0.00001,
                         outer_loop_iterations=3, inner_loop_iterations=6):
    """
    Double spike inversion using the method of Siebert et al. 2001, Geochemistry, Geophysics, Geosystems 2,

    Parameters
    ----------
    self
    MS : ndarray
        Measured isotope ratios. Must be an 1-dim array of size 3 or a 2-dim array with first dimension of 3.
    ST : ndarray
        Reference isotope ratios. Must be a 1-dim array of size 3 or a 2-dim array with the same shape as `MS`.
    SP : ndarray
        Spike isotope ratios. Must be a 1-dim array of size 3 or a 2-dim array with the same shape as `MS`.
    Mass : ndarray
        Log of mass isotope ratios. Must be a 1-dim array of size 3 or a 2-dim array with the same shape as `MS`.
    Fins_guess : float
        Starting guess for for the instrumental fractionation factor. Default value is 2
    Fnat_guess : float
        Starting guess for the natural fractionation factor. Default value is -0.00001
    outer_loop_iterations : int
        Number of iterations of the outer loop. Default value is 3
    inner_loop_iterations
        Number of iterations of the inner loop. Default value is 6

    Returns
    -------
    DSResult
    """
    output_index = 0
    mndim = MS.ndim
    if MS.size == 3:
        MS = MS.reshape(3, 1)
    elif MS.ndim == 1:
        raise ValueError()
    elif MS.ndim == 2:
        if MS.shape[0] != 3: raise ValueError()
    else:
        raise ValueError('MS has to many dimensions ({})'.format(MS.ndim))

    if ST.size == 3:
        ST = ST.reshape(3)
    elif ST.shape != MS.shape:
        raise ValueError('Shape of ST {} does not match shape of MS {}'.format(ST.shape, MS.shape))

    if ST.size == 3:
        ST = ST.reshape(3)
    elif ST.shape != MS.shape:
        ValueError('Shape of ST {} does not match shape of MS {}'.format(ST.shape, MS.shape))

    if Mass.size == 3:
        Mass = Mass.reshape(3)
    elif Mass.shape != MS.shape:
        ValueError('Shape of Mass {} does not match shape of MS {}'.format(Mass.shape, MS.shape))

    Fins_guess = isopy.checks.check_type('Fins_guess', Fins_guess, float, np.float_,
                                         coerce=True, coerce_into=np.float_)
    Fnat_guess = isopy.checks.check_type('Fnat_guess', Fnat_guess, float, np.float_,
                                         coerce=True, coerce_into=np.float_)
    outer_loop_iterations = isopy.checks.check_type('outer_loop_iterations', outer_loop_iterations,
                                                    int)
    inner_loop_iterations = isopy.checks.check_type('inner_loop_iterations', inner_loop_iterations,
                                                    int)

    dlen = MS.shape[1]
    SA = np.zeros((outer_loop_iterations, 3, dlen), dtype=np.float64)
    Fins = np.zeros((outer_loop_iterations, 3, dlen), dtype=np.float64)
    Fnat = np.zeros((outer_loop_iterations, 3, dlen), dtype=np.float64)
    MT = np.zeros((3, dlen), dtype=np.float64)

    x = 0
    y = 1
    z = 2
    Ri = output_index

    Fnat_Ri = Fnat_guess
    Fins_Ri = Fins_guess

    with warnings.catch_warnings():
        #This supresses warning for unsolvable inversions
        #Mostly applicable for creating doublespike maps
        warnings.simplefilter("ignore")

        for i1 in range(outer_loop_iterations):
            SA[i1, x, :] = ST[x] * Mass[x] ** Fnat_Ri
            SA[i1, y, :] = ST[y] * Mass[y] ** Fnat_Ri
            SA[i1, z, :] = ST[z] * Mass[z] ** Fnat_Ri

            a = (ST[y] * (SA[i1, z] - SP[z]) + SA[i1, y] * (SP[z] - ST[z]) + SP[y] * (
                        ST[z] - SA[i1, z])) / (
                            ST[y] * (SA[i1, x] - SP[x]) + SA[i1, y] * (SP[x] - ST[x]) + SP[y] * (
                                ST[x] - SA[i1, x]))
            b = (ST[x] * (SA[i1, z] - SP[z]) + SA[i1, x] * (SP[z] - ST[z]) + SP[x] * (
                        ST[z] - SA[i1, z])) / (
                            ST[x] * (SA[i1, y] - SP[y]) + SA[i1, x] * (SP[y] - ST[y]) + SP[x] * (
                                ST[y] - SA[i1, y]))
            c = ST[z] - a * ST[x] - b * ST[y]

            for i2 in range(inner_loop_iterations):
                MT[x] = MS[x] * Mass[x] ** -Fins_Ri
                MT[y] = MS[y] * Mass[y] ** -Fins_Ri
                MT[z] = MS[z] * Mass[z] ** -Fins_Ri

                d = (MS[z] - MT[z]) / (MS[x] - MT[x])
                e = MS[z] - d * MS[x]
                f = (MS[z] - MT[z]) / (MS[y] - MT[y])
                g = MS[z] - f * MS[y]

                MT[x] = (b * g - b * e + e * f - c * f) / (a * f + b * d - d * f)
                MT[y] = (a * e - a * g + d * g - c * d) / (a * f + b * d - d * f)
                MT[z] = a * MT[x] + b * MT[y] + c

                # gives positive Fins
                Fins[i1, 0] = np.log(MS[x] / MT[x]) / np.log(Mass[x])
                Fins[i1, 1] = np.log(MS[y] / MT[y]) / np.log(Mass[y])
                Fins[i1, 2] = np.log(MS[z] / MT[z]) / np.log(Mass[z])
                Fins_Ri = Fins[i1, Ri]

            a = (MS[y] * (MT[z] - SP[z]) + MT[y] * (SP[z] - MS[z]) + SP[y] * (MS[z] - MT[z])) / (
                        MS[y] * (MT[x] - SP[x]) + MT[y] * (SP[x] - MS[x]) + SP[y] * (MS[x] - MT[x]))
            b = (MS[x] * (MT[z] - SP[z]) + MT[x] * (SP[z] - MS[z]) + SP[x] * (MS[z] - MT[z])) / (
                        MS[x] * (MT[y] - SP[y]) + MT[x] * (SP[y] - MS[y]) + SP[x] * (MS[y] - MT[y]))
            c = MS[z] - a * MS[x] - b * MS[y]

            d = (ST[z] - SA[i1, z]) / (ST[x] - SA[i1, x])
            e = ST[z] - d * ST[x]
            f = (ST[z] - SA[i1, z]) / (ST[y] - SA[i1, y])
            g = ST[z] - f * ST[y]

            SA[i1, x] = (b * g - b * e + e * f - c * f) / (a * f + b * d - d * f)
            SA[i1, y] = (a * e - a * g + d * g - c * d) / (a * f + b * d - d * f)
            SA[i1, z] = a * SA[i1, x] + b * SA[i1, y] + c

            Fnat[i1, x] = np.log(SA[i1, x] / ST[x]) / np.log(Mass[x])
            Fnat[i1, y] = np.log(SA[i1, y] / ST[y]) / np.log(Mass[y])
            Fnat[i1, z] = np.log(SA[i1, z] / ST[z]) / np.log(Mass[z])
            Fnat_Ri = Fnat[i1, Ri]

        sample_fraction = ((1 / (MT[x] + MT[y] + MT[z] + 1)) - (1 / (SP[x] + SP[y] + SP[z] + 1))) / ((
                             1 / (SA[outer_loop_iterations - 1, x] + SA[outer_loop_iterations - 1, y] +
                                 SA[outer_loop_iterations - 1, z] + 1)) - (1 / (SP[x] + SP[y] + SP[z] + 1)))
        Fnat_f = Fnat[-1, Ri]
        Fins_f = Fins[-1, Ri]

        lambda_ = ((1 - sample_fraction) ** (-1) - 1) / (
                    (1 + np.sum(SA[-1], axis=0)) / (1 + np.sum(SP, axis=0)))
        lambda_ = 1 - (lambda_ / (lambda_ + 1))

        if mndim == 1:
            sample_fraction = sample_fraction[0]
            Fnat_f = Fnat_f[0]
            Fins_f = Fins_f[0]
            lambda_ = lambda_[0]

        spike_fraction = (1 - sample_fraction)
        Q = sample_fraction / spike_fraction

        alpha = Fnat_f * -1
        beta = Fins_f

    static_items = dict(method='siebert')
    dynamic_items = dict(alpha=alpha,
                         beta=beta,
                         lambda_=lambda_,
                         fnat=Fnat_f,
                         fins=Fins_f,
                         spike_fraction=spike_fraction,
                         sample_fraction=sample_fraction,
                         Q=Q)

    return DSResult(static_items, dynamic_items)


def ds_inversion(measured, spike, standard=None, isotope_masses='isotope.mass', inversion_keys=None,
                 method='rudge', **method_kwargs):
    """
    Double spike inversion.

    Parameters
    ----------
    measured : IsopyArray
        Measured isotope ratios
    standard : IsopyArray, dict
        References isotope ratios or a dict or references values
    spike : IsopyArray, dict
        Spike isotope ratios or a dict or references values
    isotope_masses : IsopyArray, dict, Optional
        Mass isotope ratios or a dict or references values. If not given hte :attr:`isotope.mass` will be used.
    inversion_keys : RatioKeyList, Optional
        Keys used for the inversion. Can either be 3 ratio key strings or 4 isotope key strings.
        Does not have to be given if the inversion keys can be inferred from *spike*.
    method : str
        Inversion method to be used. Options are 'rudge' and 'siebert'.
    method_kwargs
        Keyword arguments for inversion method. See `inversion_rudge` and `inversion_siebert` for list of possible arguments.

    Returns
    -------
    inversion_result : DSResult
        The returned *DSResult* object contains the the following attributes:

        * ``method`` - Name of the method used to do the inversion.

        * ``alpha`` - The natural fractionation factor as defined by Rudge (:math:`n = N * mass^\\alpha`).
          **Note** this value has the opposite sign to *fnat*.

        * ``beta`` - The instrumental mass fractionation factor as defined by Rudge (:math:`m = M * mass^\\beta`).
          Same as *fins*.

        * ``lambda_`` - The lambda value defined by Rudge.

        * ``fnat`` - The natural fractionation factor as defined by Siebert (:math:`\\textrm{SA} = \\textrm{ST} * mass^{fnat}`).
          **Note** this value has the opposite sign to *alpha*.

        * ``fins`` - The instrumental fractionation factor as defined by Siebers (:math:`\\textrm{MS} = \\textrm{MT} * mass^{fins}`).
          Same as *beta*.

        * ``spike_fraction`` - The fraction of spike in the sample-spike mixture. Calculated on the
          basis of the four isotopes used in the inversion.

        * ``sample_fraction`` - The fraction of sample in the sample-spike mixture. Calculated on the
          basis of the four isotopes used in the inversion.

        * ``Q`` - The *sample_fraction* to *spike_fraction* ratio.

        Array functions, e.g. ``np.mean`` and ``isopy.sd`` can be used on this object. The
        function will be performed on each attribute and a new *DSResult* object returned.
    """
    #measured = isopy.checks.check_array('measured', measured, 'isotope', 'ratio')

    #spike = isopy.checks.check_array('spike', spike, 'isotope', 'ratio', allow_dict = True)

    inversion_keys = isopy.checks.check_type('inversion_keys', inversion_keys, isopy.core.IsopyKeyList, allow_none=True)
    if standard is None:
        standard = isopy.refval.isotope.fraction
    else:
        pass
        #standard = isopy.checks.check_array('standard', standard, 'isotope', 'ratio', allow_dict = True)

    isotope_masses = isopy.refval(isotope_masses, isopy.RefValDict)

    numerators, denominator, inversion_keys = _deduce_inversion_keys(spike, inversion_keys)

    if measured.flavour == 'isotope':
        m = np.array([measured.get(numer) / measured.get(denominator) for numer in numerators])
    else:
        m = np.array([measured.get(key) for key in inversion_keys])


    if spike.flavour == 'isotope':
        T = np.array([spike.get(numer) / spike.get(denominator) for numer in numerators])
    else:
        T = np.array([spike.get(key) for key in inversion_keys])

    if isopy.isarray(standard) and standard.flavour == 'isotope':
        n = np.array([standard.get(numer) / standard.get(denominator) for numer in numerators])
    else:
        n = np.array([standard.get(key) for key in inversion_keys])

    if isopy.isarray(isotope_masses) and isotope_masses.flavour == 'isotope':
        P = np.array([isotope_masses.get(numer) / isotope_masses.get(denominator) for numer in numerators])
    else:
        P = np.array([isotope_masses.get(key) for key in inversion_keys])

    if method == 'rudge':
        return ds_inversion_rudge(m, n, T, np.log(P), **method_kwargs)
    elif method == 'siebert':
        return ds_inversion_siebert(m, n, T, P, **method_kwargs)
    else:
        raise ValueError(f'method "{method}" not recognized')


[docs]def ds_correction(measured, spike, standard=None, inversion_keys=None, interference_correction = True, isotope_fractions='isotope.fraction', isotope_masses='isotope.mass', method='rudge', fins_tol = 0.000001, fins_guess = None, **method_kwargs): """ A double spike data reduction. If *interference_correction* is True a correction is applied for all isotopes in *measured* that have a different element symbol from the keys in *spike*. If more than one isotope exists for an an element the largest isotope is used for the interference correction. Parameters ---------- measured : IsopyArray Measured isotope ratios standard : IsopyArray, dict References isotope ratios or a dict or references values spike : IsopyArray, dict Spike isotope ratios or a dict or references values isotope_masses : IsopyArray, dict, Optional Mass isotope ratios or a dict or references values. If not given hte :attr:`isotope.mass` will be used. inversion_keys : RatioKeyList, Optional Keys used for the inversion. Can either be 3 ratio key strings or 4 isotope key strings. Does not have to be given if the inversion keys can be inferred from *spike*. method : str Inversion method to be used. Options are 'rudge' and 'siebert'. fins_tol : float The interference correction is considered a success once the difference between the current and the previous *fins* value is below this value. fins_guess The fins value used for the first iteration of the interference correction. If not given the *fins* value of an inversion without an interference correction applied is used. method_kwargs Keyword arguments for inversion method. For advanced users only. See `inversion_rudge` and `inversion_siebert` for list of possible arguments. Returns ------- inversion_result : DSResult The returned *DSResult* object contains the the following attributes: * ``method`` - Name of the method used to do the inversion. * ``alpha`` - The natural fractionation factor as defined by Rudge (:math:`n = N * mass^\\alpha`). **Note** this value has the opposite sign to *fnat*. * ``beta`` - The instrumental mass fractionation factor as defined by Rudge (:math:`m = M * mass^\\beta`). Same as *fins*. * ``lambda_`` - The lambda value defined by rudge. * ``fnat`` - The natural fractionation factor as defined by Siebert (:math:`\\textrm{SA} = \\textrm{ST} * mass^{fnat}`). **Note** this value has the opposite sign to *alpha*. * ``fins`` - The instrumental fractionation factor as defined by Siebers (:math:`\\textrm{MS} = \\textrm{MT} * mass^{fins}`). Same as *beta*. * ``spike_fraction`` - The fraction of spike in the sample-spike mixture. Calculated on the basis of the four isotopes used in the inversion. * ``sample_fraction`` - The fraction of sample in the sample-spike mixture. Calculated on the basis of the four isotopes used in the inversion. * ``Q`` - The *sample_fraction* to *spike_fraction* ratio. Array functions, e.g. ``np.mean`` and ``isopy.sd`` can be used on this object. The function will be performed on each attribute and a new *DSResult* object returned. """ #measured = isopy.checks.check_array('measured', measured, 'isotope', 'ratio') #spike = isopy.checks.check_array('spike', spike, 'isotope', 'ratio', allow_dict=True) inversion_keys = isopy.checks.check_type('inversion_keys', inversion_keys, isopy.core.IsopyKeyList, allow_none=True) isotope_fractions = isopy.refval(isotope_fractions, isopy.RefValDict) isotope_masses = isopy.refval(isotope_masses, isopy.RefValDict) if standard is None: standard = isotope_fractions else: pass #standard = isopy.checks.check_array('standard', standard, 'isotope', 'ratio', allow_dict = True) # If inversion keys are not given they are inferred from the spike numer, denom, inversion_keys = _deduce_inversion_keys(spike, inversion_keys) # Find isotopes of potential isobaric interferences # If more than one isotope of an element exits the correction is made with the largest one # Thus only the largest isotope is returned from this function if interference_correction is True: isobaric_interferences = isotope.find_isobaric_interferences(denom.element_symbol, measured) elif isinstance(interference_correction, dict): isobaric_interferences = interference_correction else: isobaric_interferences = [] # Calculate the starting value inversion = ds_inversion(measured, spike, standard, isotope_masses, inversion_keys, method, **method_kwargs) if fins_guess is not None: fins = fins_guess else: fins = inversion.fins # Iteratively perform the interference correction # Stop once the result converges if isobaric_interferences and not np.all(np.isnan(fins)): for i in range(10): measured2 = measured.copy() fins2 = fins measured2 = isotope.remove_isobaric_interferences(measured2, isobaric_interferences, fins, isotope_fractions, isotope_masses) #Recalculate the instrumental fractionation inversion = ds_inversion(measured2, spike, standard, isotope_masses, inversion_keys, method, **method_kwargs) fins = inversion.fins if np.all(np.isnan(fins)): #No solution break if np.all(np.abs(fins - fins2) < fins_tol): break # Beta value has converged so no need for more iterations. else: raise ValueError( 'inversion did not converge after 10 iterations of the interference correction') return inversion
# TODO cache for speedier grid def _deduce_inversion_keys(spike, inversion_keys): if inversion_keys is None: if not isinstance(spike, core.IsopyArray): raise ValueError( f'Can not deduce inversion keys from spike since it is not an isopy array') elif spike.flavour == 'isotope': if len(spike.keys) != 4: raise ValueError(f'inversion keys can not be deduced from *spike* as it' f'has {len(spike.keys)} isotope keys instead of the expected 4') else: denom = isopy.keymax(spike) numer = spike.keys - denom inversion_keys = numer / denom return numer, denom, inversion_keys elif spike.flavour == 'ratio': if len(spike.keys) != 3: raise ValueError(f'inversion keys can not be deduced from *spike* as it' f'has {len(spike.keys)} ratio keys instead of the expected 3') else: denom = spike.keys.common_denominator numer = spike.keys.numerators inversion_keys = numer / denom return numer, denom, inversion_keys else: inversion_keys = isopy.askeylist(inversion_keys) if inversion_keys.flavour == 'isotope': if len(inversion_keys) != 4: raise ValueError(f'got {len(inversion_keys)} inversion isotope keys instead of 4') elif spike.flavour == 'isotope': spike = spike.copy(key_eq=inversion_keys) denom = isopy.keymax(spike) numer = inversion_keys - denom inversion_keys = numer / denom return numer, denom, inversion_keys elif spike.flavour == 'ratio': denom = spike.keys.common_denominator numer = inversion_keys - denom inversion_keys = numer / denom return numer, denom, inversion_keys elif inversion_keys.flavour == 'ratio[isotope,isotope]': if len(inversion_keys) != 3: raise ValueError(f'got {len(inversion_keys)} inversion ratio keys instead of 3') elif inversion_keys.common_denominator is None: raise ValueError(f'inversion key ratios do not have a common denominator') else: denom = inversion_keys.common_denominator numer = inversion_keys.numerators return numer, denom, inversion_keys raise ValueError('Unable to deduce the inversion keys')
[docs]def ds_grid(standard, spike1, spike2=None, inversion_keys=None, n=19, *, fnat=0, fins=2, fixed_voltage=10, fixed_key = None, blank = None, blank_fixed_voltage = None, blank_fixed_key = None, integrations=100, integration_time=8.389, resistor=1E11, random_seed=46, method='siebert', fins_guess = None, isotope_masses='isotope.mass', isotope_fractions='isotope.fraction', correction_method=ds_correction, interferences = None, **kwargs): """ Compute the inversion result for a simulated measurement with varied sample/spike and spike1/spike2 ratios. Parameters ---------- standard Any object that can be passed to ``make_ms_array`` to returns valid array. Also accepts a tuple or a dict which will be unpacked appropriately. spike1 The composition of spike 1. If spike 2 is not given then this must contain both spikes. spike2 The composition of spike 2. Not necessary if *spike1* contains both spikes. In this case spike1/spike2 ratio will be fixed. inversion_keys Keys used for the inversion. Can either be 3 ratio key strings or 4 isotope key strings. Does not have to be given if the inversion keys can be inferred from *spike1*. n The number of intervals in the grid. The total number of data points will be :math:`n^2`. fnat If given, the natural fractionation fractionation factor is applied to the ms_array before *interferences* are added to the ms_array. fins If given, the instrumental mass fractionation factor is applied to the ms_array at the same time the *interferences* are added to the ms_array. fixed_voltage The voltage of the most abundant isotope in the array. The value for all other isotopes in the array are adjusted accordingly. fixed_key If not given then the sum of the inversion keys will be set to *fixed_voltage*. blank The blank sample to be added to the sample. Can be object that can be singularly passed to ``make_ms_array`` which returns valid array. Also accepts a tuple or a dict which will be unpacked appropriately. blank_fixed_voltage The voltage of the *blank_fixed_key* in returned sample that is *blank*. blank_fixed_key If not given then the sum of the inversion keys will be set to *blank_fixed_voltage*. integrations The number of simulated measurements. integration_time The integration time for each simulated measurement. resistor The resistor used for each measurement. A isotope array or a dictionary can be passed to to give different resistor values for different isotopes. random_seed Seed given for the random generator. The same seed will be used for all data points resulting in the same normal distribution for each set of integrations. If ``None`` then each point in the grid will have a different normal distribution. method Method used for the doublespike inversion. Default is the ``"siebert"`` method as it is faster however it occasionally fails to find solutions to extreme edge cases. If these are important use the ``"rudge"`` method instead. fins_guess The fins value used for the first iteration of the interference correction. If not given the *fins* value of an inversion without an interference correction applied is used. correction_method The method used to perform the double spike inversion. Must have the same signature as ``ds_correction``. Defaults to ``ds_correction``. isotope_fractions Reference value for the isotope fractions of different elements. Defaults to ``isopy.refval.isotope.fractions``. isotope_masses Reference value for the isotope masses of different elements. Defaults to ``isopy.refval.isotope.mass``. interferences A dictionary containing the interferences and relative abundances to the main elements. kwargs Additional keyword arguments for the inversion method. Returns ------- grid_result : DSGridResult The returned *DSGridResult* contains the following attributes: * ``solutions.method`` - Grid containing the method used to do the inversion for each datapoint. * ``solutions.alpha`` - Grid containing the natural fractionation factor as defined by Rudge (:math:`n = N * m^\\alpha`) for each datapoint. **Note** this value has the opposite sign to *fnat*. * ``solutions.beta`` - Grid containing the instrumental mass fractionation factor as defined by Rudge (:math:`m = M * m^\\beta`) for each datapoint. Same as *fins*. * ``solutions.lambda_`` - Grid containing the lambda value defined by Rudge for each datapoint. * ``solutions.fnat`` - Grid containing the natural fractionation factor as defined by Siebert (:math:`\\textrm{SA} = \\textrm{ST} * m^\\alpha`) for each datapoint. **Note** this value has the opposite sign to *alpha*. * ``solutions.fins`` - Grid containing the instrumental fractionation factor as defined by Siebert (:math:`\\textrm{MT} = \\textrm{MS} * m^\\alpha`) for each datapoint. Same as *beta*. * ``solutions.spike_fraction`` - Grid containing the fraction of spike in the sample-spike mixture for each datapoint. Calculated on the basis of the four isotopes used in the inversion. * ``solutions.sample_fraction`` - Grid containing the fraction of sample in the sample-spike mixture for each datapoint. Calculated on the basis of the four isotopes used in the inversion. * ``solutions.Q`` - Grid containing the *sample_fraction* to *spike_fraction* ratio for each datapoint. * ``input.doublespike_fraction`` - List of the true double spike fraction (:math:`\\frac{\\textrm{double spike}} {\\textrm{double spike} + \\textrm{sample}}`) for each datapoint in the x-axis. * ``input.sample_fraction`` - List of the true sample fraction (:math:`\\frac{\\textrm{sample}} {\\textrm{double spike} + \\textrm{sample}}`) for each datapoint in the x-axis. * ``input.spike1_fraction`` - List of the true spike 1 fraction (:math:`\\frac{\\textrm{spike 1}} {\\textrm{spike 1} + \\textrm{spike 2}}`) for each datapoint in the y-axis. * ``input.spike2_fraction`` - List of the true spike 2 fraction (:math:`\\frac{\\textrm{spike 2}} {\\textrm{spike 1} + \\textrm{spike 2}}`) for each datapoint in the y-axis. * ``input.fnat`` - The true natural fractionation value for each datapoint. * ``input.fins`` - The true instrumental fractionation value for each datapoint. * ``input.measured`` Grid containing the *measured* values for each datapoint. The returned *DSGridResult* contains the following methods: * ``xyz(zeval=isopy.sd, zattr='solutions.fnat')`` - Returns ``input.doublespike_fraction``. ``input.spike1_fraction``, ``zeval(getattr(grid_result, zattr))``. * ``yz(xval=0.5, zeval=isopy.sd, zattr='solutions.fnat')`` - Returns ``input.spike1_fraction`` and a 1-dimensional array of ``zeval(getattr(grid_result, zattr))`` at *xval*. * ``xz(yval=0.5, zeval=isopy.sd, zattr='solutions.fnat')`` - Returns ``input.doublespike_fraction`` and a 1-dimensional array of ``zeval(getattr(grid_result, zattr))`` at *yval*. """ isotope_fractions = isopy.refval(isotope_fractions, isopy.RefValDict) isotope_masses = isopy.refval(isotope_masses, isopy.RefValDict) numer, denom, inversion_keys = _deduce_inversion_keys(spike1, inversion_keys) standard = isotope.make_ms_array(standard, isotope_fractions=isotope_fractions, isotope_masses=isotope_masses) if interferences is None: interferences = {} if fixed_key is None: fixed_key = inversion_keys.flatten(ignore_duplicates=True) elif blank_fixed_key is None: blank_fixed_key = inversion_keys.flatten(ignore_duplicates=True) doublespike_fractions = np.linspace(0, 1, n + 2)[1:-1] spike1 = spike1 / np.sum(spike1, axis=None) if spike2 is None: spike2 = 0 spike1_fractions = np.array([1]) else: spike1_fractions = np.linspace(0, 1, n + 2)[1:-1] spike2 = spike2 / np.sum(spike2, axis=None) result_solutions = IsopyResultList() all_measured = IsopyResultList() for spike1_fraction in spike1_fractions: spike_mixture = (spike1 * spike1_fraction) + (spike2 * (1 - spike1_fraction)) result_solutions.append(IsopyResultList()) all_measured.append(IsopyResultList()) for doublespike_fraction in doublespike_fractions: measured = isotope.make_ms_sample(standard, spike=spike_mixture, spike_fraction=doublespike_fraction, fnat=fnat, fins=fins, fixed_voltage=fixed_voltage, fixed_key=fixed_key, blank=blank, blank_fixed_voltage=blank_fixed_voltage, blank_fixed_key=blank_fixed_key, integrations=integrations, integration_time=integration_time, resistors=resistor, random_seed=random_seed, isotope_fractions=isotope_fractions, isotope_masses=isotope_masses, **interferences) all_measured[-1].append(measured) try: solution = correction_method(measured, spike_mixture, standard, inversion_keys, isotope_masses=isotope_masses, isotope_fractions=isotope_fractions, method=method, fins_guess = fins_guess, **kwargs) result_solutions[-1].append(solution) except: nan = np.full(measured.size, np.nan) solution = DSResult(dict(method='rudge'), dict(alpha=nan, beta=nan, lambda_=nan, fnat=nan, fins=nan, spike_fraction=nan, sample_fraction=nan, Q=nan)) result_solutions[-1].append(solution) input = IsopyNamedResult(static_items=dict(doublespike_fraction = doublespike_fractions, sample_fraction = 1-doublespike_fractions, spike1_fraction = spike1_fractions, spike2_fraction = 1-spike1_fractions, fnat = fnat, fins = fins), dynamic_items=dict(measured=all_measured)) return DSGridResult(dynamic_items=dict(input=input, solutions=result_solutions))
[docs]@core.append_preset_docstring @core.add_preset(('delta', 'permil'), factor=1000) @core.add_preset('mu', factor=1E6) def ds_Delta(mass_ratio, fnat, reference_fnat=0, *, factor=1, isotope_masses=None): """ Calculate the Δ value for *mass_ratio* of a sample using the *fnat* mass fractionation factor. .. math:: \\Delta \\frac{smp_{i}} {smp_{j}} = \\left( \\left(\\frac{mass_i} {mass_j} \\right)^{fnat} - 1\\right) * \\textrm{factor} Where :math:`norm` is the normalisation factor. If *reference_fnat* values are given :math:`fnat` is the difference between the *fnat* and *reference_fnat*: .. math:: fnat = fnat_{smp} - fnat_{ref} If multiple *reference_fnat* values are passed the mean of those values is used. If the each *reference_fnat* passed has more than one value the mean is used. """ if type(fnat) is DSResult: fnat = fnat.fnat reference_fnat = _combine(reference_fnat) if isotope_masses is None: isotope_masses = isopy.refval.isotope.mass fnat = fnat - reference_fnat if isinstance(mass_ratio, str): mass_ratio = isotope_masses.get(mass_ratio) return (np.power(mass_ratio, fnat) - 1) * factor
[docs]@core.append_preset_docstring @core.add_preset(('delta', 'permil'), factor=1000) @core.add_preset('mu', factor=1E6) def ds_Delta_prime(mass_ratio, fnat, reference_fnat=0, *, factor=1, isotope_masses=None): """ Calculate the Δ' value for *mass_ratio* of a sample using the *fnat* mass fractionation factor. .. math:: \\Delta^{\\prime} \\frac{smp_{i}} {smp_{j}} = \\textrm{norm} * fnat * log\\left(\\frac{mass_i} {mass_j} \\right) Where *norm* is the normalisation factor and *fnat* is the difference between the sample and optional standard: .. math:: fnat = fnat_{smp} - fnat_{std} """ if type(fnat) is DSResult: fnat = fnat.fnat reference_fnat = _combine(reference_fnat) if isotope_masses is None: isotope_masses = isopy.refval.isotope.mass fnat = fnat - reference_fnat if isinstance(mass_ratio, str): mass_ratio = isotope_masses.get(mass_ratio) return np.log(mass_ratio) * fnat * factor
def _combine(value): if type(value) is not tuple: values = (value,) else: values = value out = [] for value in values: if type(value) is DSResult: value = value.fnat value = np.asarray(value) if value.size > 1: value = np.nanmean(value) out.append(value) if len(out) == 1: return out[0] else: return np.mean(out) class IsopyNamedResult: def __init__(self, static_items = None, dynamic_items=None): self.__static_item_names = [] self.__dynamic_item_names = [] if static_items: for name, value in static_items.items(): setattr(self, name, value) self.__static_item_names.append(name) if dynamic_items: for name, value in dynamic_items.items(): setattr(self, name, value) self.__dynamic_item_names.append(name) def __repr__(self): sitems = [f'{name}={getattr(self, name).__repr__()}' for name in self.__static_item_names] ditems = [f'{name}={getattr(self, name).__repr__()}' for name in self.__dynamic_item_names] items = "\n".join(sitems + ditems) return f'{type(self).__name__}({items})' def __array_function__(self, func, types, args, kwargs): static_items = {name: getattr(self, name) for name in self.__static_item_names} dynamic_items = {name: func(getattr(self, name), *args[1:], **kwargs) for name in self.__dynamic_item_names} return self.__class__(static_items, dynamic_items) def __getitem__(self, name): return getattr(self, name) def items(self): return ((name, getattr(self, name)) for name in self.__dynamic_item_names) def values(self): return (getattr(self, name) for name in self.__dynamic_item_names) def keys(self): return (name for name in self.__dynamic_item_names) class IsopyResultList(list): def __array_function__(self, func, types, args, kwargs): return self.__class__(func(item, *args[1:], **kwargs) for item in self) def __getattr__(self, attr): if attr[:1] == '_': raise AttributeError() #Avoid issues with numpy special attributes return self.__class__(getattr(thing, attr) for thing in self) def get(self, item): return self.__class__(thing.get(item) if type(thing) is self.__class__ else thing[item] for thing in self) class DSResult(IsopyNamedResult): method: str pass class DSGridResult(IsopyNamedResult): def __getattr__(self, attr): attrs = attr.split('.') if len(attrs) == 1: lastattr = self.solutions else: lastattr = self for a in attrs: lastattr = getattr(lastattr, a) return lastattr def xyz(self, zeval=isopy.sd, zattr='solutions.fnat'): """ Return a tuple of ``grid.input.doublespike_fraction``, ``grid.input.spike1_fraction`` and ``zeval(getattr(grid, zattr))``. This which can be used in conjunction with the :func:`plot_grid` function e.g. ``isopy.tb.plot_grid(plt, *grid.xyz())``. Parameters ---------- zeval Function to evaluate *zattr*. zattr The attribute to be evaluated. """ zval = getattr(self, zattr) zval = zeval(zval) return self.input.doublespike_fraction, self.input.spike1_fraction, zval def yz(self, xval=0.5, zeval=isopy.sd, zattr='solutions.fnat'): """ Return ``grid.input.spike1_fraction`` and a 1-dimensional arrays of z at *xval*. z is evaluated as for :func:`xyz`. Parameters ---------- xval If there is no exact match the *x* value closest to *xval* is used. zeval Function to evaluate *zattr*. zattr The attribute to be evaluated """ x, y, z = self.xyz(zeval, zattr) ix = np.nanargmin(np.abs(x-xval)) return y, [r[ix] for r in z] def xz(self, yval=0.5, zeval=isopy.sd, zattr='solutions.fnat'): """ Return ``grid.input.doublespike_fraction`` and a 1-dimensional arrays of z at *yval*. z is evaluated as for :func:`xyz`. Parameters ---------- yval If there is no exact match the *y* value closest to *yval* is used. zeval Function to evaluate *zattr*. zattr The attribute to be evaluated """ x, y, z = self.xyz(zeval, zattr) iy = np.nanargmin(np.abs(y-yval)) return x, z[iy]