Source code for stratmc.synthetics

import random
import sys
import warnings

import arviz as az
import numpy as np
import pandas as pd
import pymc as pm
from scipy.interpolate import UnivariateSpline, interp1d
from scipy.stats import gaussian_kde

warnings.filterwarnings("ignore", ".*The group X_new is not defined in the InferenceData scheme.*")
warnings.filterwarnings("ignore", ".*X_new group is not defined in the InferenceData scheme.*")

pd.options.mode.chained_assignment = None

from stratmc.model import build_model


[docs] def make_excursion(time, amplitude, baseline = 0, rising_time = None, rate_offset = True, excursion_duration = None, min_duration = 1, smooth = False, smoothing_factor = 10, seed = None): """ Function for generating a synthetic proxy signal that contains a number of user-specified excursions. Parameters ---------- time: numpy.array(float) Time vector over which to generate proxy signal. amplitude: float, list(float), or numpy.array(float) Amplitude of excursion; pass a list or array to generate multiple excursions. baseline: float, optional Baseline proxy value. Defaults to 0. rising_time: float, list(float), or numpy.array(float), optional Fraction of excursion duration spent on the rising limb (linear increase/decrease toward peak). Must be between 0 and 1. If not provided, randomly generated if ``rate_offset`` is ``True`` and set to 0.5 if ``rate_offset`` is ``False``. Pass a list to specify different rising times for each excursion. rate_offset: bool, optional If ``False``, rising and falling limbs of excursion have equal duration. If ``True``, the fraction of the excursion duration spent on the rising limb is set by ``rising_time``. Defaults to ``False``. excursion_duration: float, list(float), or numpy.array(float), optional Duration of excursion; pass a list or array to generate multiple excursions. Random if not provided. min_duration: float, optional Minimum excursion duration if ``excursion_duration`` is not provided. Defaults to 1. smooth: bool, optional Whether to smooth excursion peaks. Defaults to ``False``. smoothing_factor: float, optional Smoothing factor if ``smooth`` is ``True``; higher values produce smoother signals. Defaults to 10. seed: int, optional Random seed used to generate signal. Returns ------- interp_proxy: np.array Tracer signal interpolated to points in the ``time`` vector """ if seed is not None: random.seed(a = seed) np.random.seed(seed) if (type(amplitude) == float) or (type(amplitude) == int): amplitude = np.array([amplitude]) else: amplitude = np.array(amplitude) # if excursion duration not in inputs if excursion_duration == None: if len(amplitude) == 1: excursion_duration = random.uniform(np.min(time), np.max(time)-np.min(time)) excursion_duration = np.asarray([excursion_duration]) n = 1 # multiple excursions else: excursion_duration = np.asarray([excursion_duration]) amplitude = np.asarray(amplitude) n = len(amplitude) excursion_duration = [] duration_sum = 0 for i in np.arange(len(amplitude)).tolist(): max_duration = np.max(time) - np.min(time) - duration_sum - (min_duration * (len(amplitude)-i)) excursion_duration.append(random.uniform(min_duration, max_duration)) duration_sum = np.sum(excursion_duration) excursion_duration = np.array(list(excursion_duration)) # if excursion duration in inputs else: excursion_duration = np.array(list(excursion_duration)) n = len(amplitude) if len(amplitude) != len(excursion_duration): sys.exit('Duration and amplitude lists are not the same length') # fraction of excursion duration spent on rising vs. falling limb of excursion if (rising_time == None) and (rate_offset): rising_frac = np.random.uniform(low = 0.1, high = 0.9, size = len(excursion_duration)) elif (rising_time == None) and (not rate_offset): rising_frac = np.ones(len(excursion_duration)) * 0.5 else: rising_frac = rising_time rising_frac = np.asarray(rising_frac) # if more than 1 excursion, choose random starting point for each excursion if len(excursion_duration) > 1: # check that total excursion duration does not exceed total timespan if np.sum(excursion_duration) > np.max(time) - np.min(time): sys.exit('Sum of excursion durations exceeds total time') excursion_start = [] current_min = np.min(time) for i in np.arange(len(excursion_duration)): max_start = np.max(time) - np.sum(excursion_duration[i:]) start = random.uniform(current_min, max_start) excursion_start.append(start) current_min = excursion_start[-1] + excursion_duration[i] # if only 1 excursion, choose random starting point else: excursion_start = [random.uniform(np.min(time), np.max(time) - excursion_duration)] excursion_start = np.asarray(excursion_start) # generate proxy signal for each excursion excursion_proxy = {} excursion_time = {} if len(excursion_duration) > 1: for i, amp, dur, start in zip(np.arange(len(excursion_duration)), amplitude, excursion_duration, excursion_start): peak_time = start + (rising_frac[i] * dur) base_time = start end_time = start + dur # fit line through rising limb rising_p = np.polyfit([base_time, peak_time], [0, amp], 1) rising_t = np.linspace(base_time, peak_time, 1000) rising_proxy = np.polyval(rising_p, rising_t) # fit line through falling limb falling_p = np.polyfit([peak_time, end_time], [amp, 0], 1) falling_t = np.linspace(peak_time+1e-6, end_time, 1000) falling_proxy = np.polyval(falling_p, falling_t) # combine rising and falling limbs excursion_proxy[i] = np.concatenate([rising_proxy, falling_proxy]) excursion_time[i] = np.concatenate([rising_t, falling_t]) if smooth: spline = UnivariateSpline(excursion_time[i], excursion_proxy[i], s = smoothing_factor) x_new = np.linspace(np.min(excursion_time[i]), np.max(excursion_time[i]), 2000) excursion_proxy[i] = spline(x_new) excursion_time[i] = x_new else: peak_time = excursion_start[0] + (rising_frac * excursion_duration[0]) base_time = excursion_start[0] end_time = excursion_start[0] + excursion_duration[0] # fit line through rising limb rising_p = np.polyfit([base_time[0], peak_time[0]], [0, amplitude[0]], 1) rising_t = np.linspace(base_time[0], peak_time[0], 1000) rising_proxy = np.polyval(rising_p, rising_t) # fit line through falling limb falling_p = np.polyfit([peak_time[0], end_time[0]], [amplitude[0], 0], 1) falling_t = np.linspace(peak_time[0]+1e-6, end_time[0], 1000) falling_proxy = np.polyval(falling_p, falling_t) # combine rising and falling limbs excursion_proxy[0] = np.concatenate([rising_proxy, falling_proxy]) excursion_time[0] = np.concatenate([rising_t, falling_t]) if smooth: spline = UnivariateSpline(excursion_time[0], excursion_proxy[0], s = smoothing_factor) x_new = np.linspace(np.min(excursion_time[0]), np.max(excursion_time[0]), 2000) excursion_proxy[0] = spline(x_new) excursion_time[0] = x_new # fill with baseline outside of excursions timeline = [] proxy = [] for i in np.arange(n): if i == 0 and excursion_start[i] > 0: s = np.linspace(0, excursion_start[i], 1000) d = np.zeros(len(s)) timeline = np.insert(timeline, 0, s.ravel()) proxy = np.insert(proxy, 0, d) timeline = np.append(timeline, excursion_time[i]) proxy = np.append(proxy, excursion_proxy[i]) if i < len(excursion_duration)-1: if np.max(excursion_time[i]) < np.min(excursion_time[i+1]): f = np.linspace(np.max(excursion_time[i]) + 1e-3, np.min(excursion_time[i+1]), 1000) timeline = np.append(timeline, f) d = np.zeros(len(f)) proxy = np.append(proxy, d) if i == len(excursion_duration)-1: if timeline[-1] < np.max(time): f = np.linspace(timeline[-1]+1e-3, np.max(time), 1000) timeline = np.append(timeline, f) d = np.zeros(len(f)) proxy = np.append(proxy, d) # interpolate to original time vector proxy_fun = interp1d(timeline, proxy) interp_proxy = proxy_fun(time) + baseline return interp_proxy
[docs] def synthetic_sections(true_time, true_proxy, num_sections, num_samples, max_section_thickness, proxies = ['d13c'], noise = False, noise_amp = 0.1, min_constraints = 2, max_constraints = 3, seed = None, **kwargs): """ Function for generating synthetic proxy observations and age constraints using a predefined proxy signal. Parameters ---------- true_time: numpy.array(float) True time vector for input signal. true_proxy: numpy.array(float) or dict{numpy.array(float)} True proxy vector for input signal. If generating synthetic data for multiple proxies, pass as a dictionary with proxy names as keys. num_sections: int Number of synthetic sections to generate. num_samples: int Number of samples per synthetic section. max_section_thickness: float Maximum thickness of synthetic sections. proxies: str or list(str), optional Column name(s) for synthetic proxy observations in ``sample_df``. Defaults to 'd13c'. noise: bool, optional Whether to add white noise to proxy observations. Defaults to ``False``. noise_amp: float or dict{float}, optional Amplitude of white noise added to proxy observations (if ``noise`` is ``True``). To specify a different noise amplitude for each proxy, pass as a dictionary with proxy names as keys. Defaults to 0.1. min_constraints: int, optional Minimum number of age constraints per synthetic section (must be at least 2). Defaults to 2. max_constraints: int, optional Maximum number of age constraints per synthetic section. Defaults to 3. seed: int, optional Random seed used to generate synthetic sections. Returns ------- sample_df: pandas.DataFrame :class:`pandas.DataFrame` containing proxy data for synthetic sections. ages_df: pandas.DataFrame :class:`pandas.DataFrame` containing age constraints for synthetic sections. """ if type(proxies) == str: proxies = [proxies] if type(true_proxy) != dict: temp = true_proxy true_proxy = {} for proxy in proxies: true_proxy[proxy] = temp section_ages = [] proxy_vec = {} for proxy in proxies: proxy_vec[proxy] = [] heights = [] age_heights = [] ages = [] age_std = [] age_section_names = [] section_names = [] for i in np.arange(num_sections): proxy_temp = {} section = str(i) if seed is not None: random.seed(a = int(seed+i)) np.random.seed(seed+i) if 'age_constraints' in kwargs: # ages ages_dict = kwargs['age_constraints'] ages_std_dict = kwargs['age_constraints_std'] section_ages_temp = np.flip(np.sort(np.random.uniform(np.min(ages_dict[section]), np.max(ages_dict[section]), num_samples))) else: # sample ages section_ages_temp = np.flip(np.sort(np.random.uniform(np.min(true_time), np.max(true_time), num_samples))) # true_time must be strictly increasing for valid interpolation if not np.all(np.diff(np.flip(true_time)) > 0): for proxy in proxies: true_proxy[proxy] = true_proxy[proxy][np.argsort(true_time)] true_time = np.sort(true_time) # sample proxy for proxy in proxies: proxy_temp[proxy] = np.interp(section_ages_temp, true_time, true_proxy[proxy]) # sample heights heights_temp = np.random.uniform(0.5, max_section_thickness - np.random.choice(np.random.uniform(0, 0.75 * max_section_thickness, 100000), size = 1, replace = False), num_samples) heights_temp = np.sort(heights_temp) if 'age_constraints' in kwargs: ages_temp = ages_dict[section] sort_idx = np.argsort(ages_temp) ages_temp = np.flip(ages_temp[sort_idx]) age_std_temp = ages_std_dict[section] age_std_temp = np.flip(age_std_temp[sort_idx]) age_heights_temp = np.flip(np.interp(ages_temp, np.flip(section_ages_temp), np.flip(heights_temp))) if age_heights_temp[0] >= heights_temp[0]: age_heights_temp[0] = 0 if age_heights_temp[-1] <= heights_temp[-1]: age_heights_temp[-1] = heights_temp[-1] + 1 else: # number of age constraints num_ages = random.randrange(min_constraints,max_constraints)#random.randrange(2,3) # height of age constraints age_heights_temp = np.sort(np.random.choice(np.random.uniform(0, np.max(heights_temp), 100000), replace = False, size = num_ages)) if age_heights_temp[0] > heights_temp[0]: age_heights_temp = np.append(age_heights_temp, 0) num_ages += 1 if age_heights_temp[-1] < heights_temp[-1]: age_heights_temp = np.append(age_heights_temp, heights_temp[-1]+np.absolute(np.random.normal(3, 1, 1))) num_ages += 1 age_heights_temp.sort() # age of constraints ages_temp = np.interp(age_heights_temp, heights_temp, section_ages_temp) # age uncertainties age_std_temp = abs(np.random.normal(0.5, 0.5, num_ages)) age_section_names_temp = [str(i)]*len(age_heights_temp) section_names_temp = [str(i)]*len(heights_temp) section_ages = np.append(section_ages, section_ages_temp) if noise: if type(noise_amp) == float: temp = noise_amp noise_amp = {} for proxy in proxies: noise_amp[proxy] = np.ones(num_sections) * temp elif (type(noise_amp) == dict): if (type(noise_amp[proxies[0]]) == float) | (type(noise_amp[proxies[0]]) == int): for proxy in proxies: n = np.random.normal(0, noise_amp[proxy], len(proxy_temp[proxy])) proxy_temp[proxy] = proxy_temp[proxy] + n else: for proxy in proxies: n = np.random.normal(0, noise_amp[proxy][i], len(proxy_temp[proxy])) proxy_temp[proxy] = proxy_temp[proxy] + n for proxy in proxies: proxy_vec[proxy] = np.append(proxy_vec[proxy], proxy_temp[proxy]) heights = np.append(heights, heights_temp) ages = np.append(ages, ages_temp) age_heights = np.append(age_heights, age_heights_temp) age_std = np.append(age_std, age_std_temp) age_section_names = np.append(age_section_names, age_section_names_temp) section_names = np.append(section_names, section_names_temp) ages_df, sample_df = synthetic_signal_to_df(proxy_vec, heights, section_ages, section_names, ages, age_std, age_heights, age_section_names, proxies = proxies) return ages_df, sample_df
[docs] def synthetic_signal_to_df(proxy_vec, heights, section_ages, section_names, ages, age_std, age_heights, age_section_names, proxies = ['d13c']): """ Helper function for generating artificial sample and age data using :py:meth:`synthetic_sections() <stratmc.synthetics>`. Parameters ---------- proxy_vec: np.array(float) or dict{np.array(float)} Array of proxy observations. Pass as a dictionary if more than one proxy. heights: np.array(float) Array of heights corresponding to proxy observations in ``proxy_vec``. section_ages: np.array(float) Array of ages corresponding to proxy observations in ``proxy_vec``. section_names: np.array(str) Array of section names corresponding to proxy observations in ``proxy_vec``. ages: np.array(float) Array of age constraints. age_std: np.array(float) Array of uncertainties for each age constraint in ``ages``. age_heights: np.array(float) Array of heights for each age constraint in ``ages``. age_section_names: np.array(str) Array of section names corresponding to age constraints in ``ages``. proxies: str or list(str), optional Name(s) of proxies. Defaults to `d13c`. Returns ------- sample_df: pandas.DataFrame :class:`pandas.DataFrame` containing proxy data for synthetic sections. ages_df: pandas.DataFrame :class:`pandas.DataFrame` containing age constraints for synthetic sections. """ if type(proxies) == str: proxies = [proxies] if type(proxy_vec) != dict: temp = proxy_vec proxy_vec = {} proxy_vec[proxies[0]] = temp # ages dataframe: ages_df = {'section': age_section_names, 'height': age_heights, 'age': ages, 'age_std': age_std} ages_df = pd.DataFrame(data = ages_df) ages_df.sort_values(by = ['section', 'height'], inplace = True) # sample dataframe: sample_df = {'section': section_names, 'height': heights, 'age': section_ages, } for proxy in proxies: sample_df[proxy] = proxy_vec[proxy] sample_df = pd.DataFrame(data = sample_df) sample_df.sort_values(by = ['section', 'height'], inplace = True) ages_df['shared?'] = False ages_df['name'] = np.nan ages_df['intermediate detrital?'] = False ages_df['intermediate intrusive?'] = False ages_df['Exclude?'] = False ages_df['distribution_type'] = 'Normal' sample_df['Exclude?'] = False return ages_df, sample_df
[docs] def synthetic_observations_from_prior(age_vector, ages_df, sample_heights = None, uniform_heights = False, samples_per_section = 20, proxies = ['d13c'], proxy_std = 0.1, seed = None, ls_dist = 'Wald', ls_min = 0, ls_mu = 20, ls_lambda = 50, ls_sigma = 50, var_sigma = 10, white_noise_sigma = 1e-1, gp_mean_mu = 0, gp_mean_sigma = 10, approximate = False, hsgp_m = 15, hsgp_c = 1.3, offset_type = 'section', offset_prior = 'Laplace', offset_alpha = 0, offset_beta = 1, offset_sigma = 1, offset_mu = 0, offset_b = 2, noise_type = 'section', noise_prior = 'HalfCauchy', noise_beta = 1, noise_sigma = 1, noise_nu = 1, jitter = 0.001, **kwargs): """ Given age constraints for a set of stratigraphic sections in ``ages_df``, generate synthetic proxy observations by sampling the model prior. Accepts all arguments that can be passed to :py:meth:`build_model() <stratmc.model.build_model>` in :py:mod:`stratmc.model`. Parameters ---------- age_vector: np.array(float) Vector of ages at which to evaluate synthetic proxy signal(s). ages_df: pandas.DataFrame :class:`pandas.DataFrame` containing age constraints for synthetic sections. sample_heights: dict{list(float) or numpy.array(float)}, optional Sample heights for each stratigraphic section in ``ages_df``; must be a dictionary with section names as keys. Defaults to ``None``, which results in either uniformly spaced or randomly spaced sample heights (depending on the ``uniform_heights`` argument). uniform_heights: bool, optional Whether to generate uniformly spaced (set to ``True``) or randomly spaced (set to ``False``) sample heights if dictionary of ``sample_heights`` not provided. Defaults to ``False`` (randomly spaced samples). samples_per_section: int or dict(int), optional Number of samples per section to generate if ``sample_heights`` not provided; either an integer (if the same for all sections) or a dictionary with section names as keys. Defaults to 20. proxies: list(str), optional List of proxies to generate synthetic observations for. Defaults to `d13c`. proxy_std: float or dict(float), optional Measurement uncertainty for each proxy; pass a dictionary of floats with the elements of ``proxies`` as keys to use a different value for each proxy, or an integer to use the same value for all proxies. Defaults to 0.1. seed: int, optional Seed to use while generating synthetic observations. Returns ------- signals: dict(float) Tracers signals drawn from the model prior (evaluated at the points in ``age_vector``) used to generate synthetic observations; dictionary keys are ``proxies``. sample_df: pandas.DataFrame :class:`pandas.DataFrame` containing proxy data for synthetic stratigraphic sections. prior: arviz.InferenceData An :class:`arviz.InferenceData` object containing the prior draw from the model used to generate synthetic observations. model: pymc.Model :class:`pymc.model.core.Model` object used to generate synthetic observations. """ if 'sections' in kwargs: sections = list(kwargs['sections']) else: sections = list(np.unique(ages_df['section'])) np.random.seed(seed) if (len(proxies) == 1) & (type(proxy_std) != dict): std = proxy_std proxy_std = {} proxy_std[proxies[0]] = std elif (len(proxies) != 1) & (type(proxy_std) != dict): std = proxy_std proxy_std = {} for proxy in proxies: proxy_std[proxy] = std # if sample heights are not provided, then generate synthetic 'sample heights' with np.random.uniform in between the minimum and maximum age heights if sample_heights is None: if type(samples_per_section) == int: n = samples_per_section samples_per_section = {} for section in sections: samples_per_section[section] = n sample_heights = {} for section in sections: min_height = np.min(ages_df[ages_df['section']==section]['height'].values) max_height = np.max(ages_df[ages_df['section']==section]['height'].values) if uniform_heights: sample_heights[section] = np.linspace(min_height + 1, max_height - 1, samples_per_section[section]) else: sample_heights[section] = np.sort(np.random.uniform(min_height + 0.01, max_height - 0.01, size = samples_per_section[section])) # if sample heights not provided and samples per section not specified elif type(sample_heights) != dict: sys.exit(f"sample_heights must be a dictionary (keys = section names, values = array or list of sample heights)") sample_df_columns = ['section', 'height', 'Exclude?'] + [proxy for proxy in proxies] sample_df = pd.DataFrame(columns = sample_df_columns) for section in sections: section_dict = { 'section': [section] * len(sample_heights[section]), 'height': sample_heights[section], 'Exclude?': [False] * len(sample_heights[section]) } for proxy in proxies: section_dict[proxy] = [0] * len(sample_heights[section]) section_dict[proxy + '_std'] = np.ones(len(sample_heights[section])) * proxy_std[proxy] sample_df = pd.concat([sample_df, pd.DataFrame.from_dict(section_dict)], ignore_index = True) for proxy in proxies: sample_df[proxy] = sample_df[proxy].astype(float) sample_df[proxy + '_std'] = sample_df[proxy + '_std'].astype(float) # make sure ages_df has all of the required columns ages_df_columns = ['distribution_type', 'param_1', 'param_2', 'param_1_name', 'param_2_name', 'shared?', 'name', 'Exclude?', 'intermediate detrital?', 'intermediate intrusive?'] for col in ages_df_columns: if col not in list(ages_df): ages_df[col] = np.nan ages_df['shared?'] = False ages_df['intermediate detrital?'] = False ages_df['intermediate intrusive?'] = False ages_df['Exclude?'] = False ages_df['distribution_type'] = 'Normal' sample_df.sort_values(by = ['section', 'height'], inplace = True) sections = np.unique(sample_df['section']) sample_df['Exclude?'] = sample_df['Exclude?'].astype(bool) # build a model using the synthetic data (set proxy_observed = False in build_model) model, gp = build_model(sample_df, ages_df, sections = sections, proxies = proxies, proxy_sigma_default = proxy_std, ls_dist = ls_dist, ls_min = ls_min, ls_mu = ls_mu, ls_lambda = ls_lambda, ls_sigma = ls_sigma, var_sigma = var_sigma, white_noise_sigma = white_noise_sigma, gp_mean_mu = gp_mean_mu, gp_mean_sigma = gp_mean_sigma, approximate = approximate, offset_type = offset_type, offset_prior = offset_prior, offset_alpha = offset_alpha, offset_beta = offset_beta, offset_sigma = offset_sigma, offset_mu = offset_mu, offset_b = offset_b, noise_type = noise_type, noise_prior = noise_prior, noise_beta = noise_beta, noise_sigma = noise_sigma, noise_nu = noise_nu, hsgp_m = hsgp_m, hsgp_c = hsgp_c, jitter = jitter, proxy_observed = False ) with model: # single draw from the prior for proxy in proxies: f_pred = gp[proxy].conditional('f_pred_' + proxy, Xnew = age_vector, jitter = jitter) prior = pm.sample_prior_predictive(draws = 1, random_seed = seed) signals = {} for proxy in proxies: signals[proxy] = az.extract(prior.prior)['f_pred_' + proxy].values sample_df[proxy] = az.extract(prior.prior)[proxy + '_pred'].values sample_df['age'] = 0.0 for section in sections: sample_df.loc[sample_df['section']==section, 'age'] = az.extract(prior.prior)[section + '_ages'].values.ravel() return signals, sample_df, prior, model
[docs] def synthetic_signal_from_prior(ages, num_signals = 100, ls_dist = 'Wald', ls_min = 0, ls_mu = 20, ls_lambda = 50, ls_sigma = 50, var_sigma = 10, gp_mean_mu = 0, gp_mean_sigma = 5, seed = None): """ Draws synthetic signals from the model prior, and returns the signal conditioned over the points in ``ages``. To generate both signals and synthetic stratigraphic sections, instead use :py:meth:`synthetic_observations_from_prior() <stratmc.synthetics>`. Parameters ---------- ages: numpy.array(float) Array of ages over which to condition the signal. num_signals: int, optional Number of signals to draw from prior. Defaults to 100. ls_dist: str, optional Prior distribution for the lengthscale hyperparameter of the exponential quadratic covariance kernel (:class:`pymc.gp.cov.ExpQuad <pymc.gp.cov.ExpQuad>`); set to ``Wald`` (:class:`pymc.Wald`) or ``HalfNormal`` (:class:`pymc.HalfNormal`). Defaults to ``Wald`` with ``mu = 20`` and ``lambda = 50``; to change ``mu`` and ``lambda``, pass the ``ls_mu`` and ``ls_lambda`` parameters. For ``HalfNormal``, the variance defaults to ``sigma = 50``; change by passing ``ls_sigma``. ls_min: float, optional Minimum value for the lengthscale hyperparameter of the :class:`pymc.gp.cov.ExpQuad` covariance kernel; shifts the lengthscale prior by ``ls_min``. Defaults to 0. ls_mu: float, optional Mean (`mu`) of the :class:`pymc.gp.cov.ExpQuad` lengthscale prior if ``ls_dist = `Wald```. Defaults to 20. ls_lambda: float, optional Relative precision (`lam`) of the :class:`pymc.gp.cov.ExpQuad` lengthscale hyperparameter prior if ``ls_dist = `Wald```. Defaults to 50. ls_sigma: float, optional Scale parameter (`sigma`) of the :class:`pymc.gp.cov.ExpQuad` lengthscale hyperparameter prior if ``ls_dist = `HalfNormal```. Defaults to 50. var_sigma: float, optional Scale parameter (`sigma') of the covariance kernel variance hyperparameter prior, which is a :class:`pymc.HalfNormal` distribution. Defaults to 10. gp_mean_mu: float, optional Mean (`mu`) of the GP mean function prior, which is a :class:`pymc.Normal` distribution. Defaults to 0. gp_mean_sigma: float, optional Standard deviation (`sigma`) of the GP mean function prior, which is a :class:`pymc.Normal` distribution. Defaults to 5. seed: int, optional Random seed used to generate signals. Returns ------- signal: numpy.ndarray(float) Array with shape ``ages x number of signals`` containing the ``n = num_signals`` synthetic signals drawn from the prior. """ with pm.Model() as model: # proxy GP if ls_dist == 'Wald': gp_ls_unshifted = pm.Wald('gp_ls_unshifted', mu = ls_mu, lam = ls_lambda, shape = 1) elif ls_dist == 'HalfNormal': gp_ls_unshifted = pm.HalfNormal('gp_ls_unshifted', sigma = ls_sigma, shape = 1) gp_ls = pm.Deterministic('gp_ls', gp_ls_unshifted + ls_min) gp_var = pm.HalfNormal('gp_var', sigma = var_sigma, shape = 1) m_proxy = pm.Normal('m', mu = gp_mean_mu, sigma = gp_mean_sigma, shape = 1) # mean and covariance functions mean_fun = pm.gp.mean.Constant(m_proxy) cov1 = gp_var ** 2 * pm.gp.cov.ExpQuad(1, gp_ls) # GP prior gp = pm.gp.Latent(mean_func = mean_fun, cov_func = cov1) f = gp.prior('f', X=ages[:,None], reparameterize=True) prior = pm.sample_prior_predictive(draws = num_signals, random_seed = seed) signals = az.extract(prior.prior)['f'].values return signals, prior
[docs] def quantify_signal_recovery(full_trace, true_signal, proxy = 'd13c', mode = 'posterior'): """ Calculates the likelihood of the true proxy signal (for synthetic tests, where the true signal is known) conditioned on the posterior (default) or prior proxy signal inference. The likelihood is evaluated at each age (the posterior signal and the true signal must be evaluated at the same ages). Provides a measure of signal recovery. Parameters ---------- full_trace: arviz.InferenceData or list(arviz.InferenceData) An :class:`arviz.InferenceData` object containing the full set of prior and posterior samples from :py:meth:`get_trace() <stratmc.inference.get_trace>` in :py:mod:`stratmc.inference`. If passed as a list, the posterior draws for all traces will be combined when calculating `posterior_likelihood`. true_signal: np.array True values for the proxy signal, evaluated at the same ages as the posterior signal in ``full_trace``. proxy: str, optional Tracer signal to evaluate. Defaults to 'd13c'. mode: str, optional Whether to use the posterior or prior to calculate signal recovery. Defaults to 'posterior'. Returns ------- posterior_likelihood: np.array Array of posterior likelihoods (evaluated at each age). """ if mode == 'posterior': if type(full_trace) is list: for i in np.arange(len(full_trace)): if i == 0: post_proxy = az.extract(full_trace[i].posterior_predictive)['f_pred_' + proxy].values else: post_proxy_temp = az.extract(full_trace[i].posterior_predictive)['f_pred_' + proxy].values post_proxy = np.hstack([post_proxy, post_proxy_temp]) else: post_proxy = az.extract(full_trace.posterior_predictive)['f_pred_' + proxy].values elif mode == 'prior': if type(full_trace) is list: for i in np.arange(len(full_trace)): if i == 0: post_proxy = az.extract(full_trace[i].prior)['f_pred_' + proxy].values else: post_proxy_temp = az.extract(full_trace[i].prior)['f_pred_' + proxy].values post_proxy = np.hstack([post_proxy, post_proxy_temp]) else: post_proxy = az.extract(full_trace.prior)['f_pred_' + proxy].values if len(true_signal) != post_proxy.shape[0]: sys.exit(f"Length of true_signal does not match length of posterior predictive signal. Check that both are evaluated at the same ages (if not, interpolate true_signal to the ages at which the posterior signal was evaluated).") posterior_likelihood = [] for i in np.arange(post_proxy.shape[0]): X = post_proxy[i, :] kde = gaussian_kde(X) posterior_likelihood.append(kde.evaluate(true_signal[i])[0]) return np.array(posterior_likelihood)
[docs] def sample_age_recovery(full_trace, sample_df, sections = None, mode = 'posterior'): """ Calculates the likelihood of the true sample ages (for synthetic tests, where the true age of each sample is known) given draws from the posterior (default) or prior. Provides a measure of age model recovery. Parameters ---------- full_trace: arviz.InferenceData or list(arviz.InferenceData) An :class:`arviz.InferenceData` object containing the full set of prior and posterior samples from :py:meth:`get_trace() <stratmc.inference.get_trace>` in :py:mod:`stratmc.inference`. If passed as a list, the posterior draws for all traces will be combined when calculating `posterior_likelihood`. sample_df: pandas.DataFrame :class:`pandas.DataFrame` containing proxy data for synthetic sections. sections: list(str) or numpy.array(str), optional List of sections to evaluate. Defaults to all sections in sample_df. mode: str, optional Whether to use the posterior or prior age models. Defaults to 'posterior'. Returns ------- posterior_likelihood: dict{float} or np.array(float) Posterior likelihoods for the true age of each sample. Returned as an array if only one section is evaluated, or a dictionary of arrays if multiple sections are evaluated. """ # get list of proxies included in model from full_trace variables = [ l for l in list(full_trace["posterior"].data_vars.keys()) if (f"{'gp_ls_'}" in l) and (f"{'unshifted'}" not in l) ] proxies = [] for var in variables: proxies.append(var[6:]) if sections is None: sections = np.unique(sample_df.dropna(subset = proxies, how = 'all')['section']) if len(sections) > 1: posterior_likelihood = {} for section in sections: section_df = sample_df[sample_df['section'] == section] section_df.sort_values(by = 'height', inplace = True) true_ages = section_df['age'].values if mode == 'posterior': if type(full_trace) is list: for i in np.arange(len(full_trace)): if i == 0: post_ages = az.extract(full_trace[i].posterior)[str(section) + '_ages'].values else: post_ages_temp = az.extract(full_trace[i].posterior)[str(section) + '_ages'].values post_ages = np.hstack([post_ages, post_ages_temp]) else: post_ages = az.extract(full_trace.posterior)[str(section) + '_ages'].values elif mode == 'prior': if type(full_trace) is list: for i in np.arange(len(full_trace)): if i == 0: post_ages = az.extract(full_trace[i].prior)[str(section) + '_ages'].values else: post_ages_temp = az.extract(full_trace[i].prior)[str(section) + '_ages'].values post_ages = np.hstack([post_ages, post_ages_temp]) else: post_ages = az.extract(full_trace.prior)[str(section) + '_ages'].values if len(true_ages) != post_ages.shape[0]: sys.exit(f"Number of samples in sample_df does not match the number in the trace") if len(sections) > 1: posterior_likelihood[section] = [] else: posterior_likelihood = [] for i in np.arange(post_ages.shape[0]): X = post_ages[i, :] kde = gaussian_kde(X) if len(sections) > 1: posterior_likelihood[section].append(kde.evaluate(true_ages[i])[0]) else: posterior_likelihood.append(kde.evaluate(true_ages[i])[0]) return posterior_likelihood
[docs] def sample_age_residuals(full_trace, sample_df, sections = None, mode = 'posterior'): """ Calculates the residual (for each draw) between the true age and the posterior (default) or prior age of each sample. Parameters ---------- full_trace: arviz.InferenceData or list(arviz.InferenceData) An :class:`arviz.InferenceData` object containing the full set of prior and posterior samples from :py:meth:`get_trace() <stratmc.inference.get_trace>` in :py:mod:`stratmc.inference`. If passed as a list, the posterior draws for all traces will be combined when calculating `age_residuals`. sample_df: pandas.DataFrame :class:`pandas.DataFrame` containing proxy data for synthetic sections. sections: list(str) or numpy.array(str), optional List of sections to evaluate. Defaults to all sections in sample_df. mode: str, optional Whether to use the posterior or prior age models. Defaults to 'posterior'. Returns ------- age_residuals: np.array or dict{np.array} Sample age residuals; shape is (number of samples, number of posterior draws). Returned as an array if only one section is evaluated, or a dictionary of arrays if multiple sections are evaluated. """ # get list of proxies included in model from full_trace variables = [ l for l in list(full_trace["posterior"].data_vars.keys()) if (f"{'gp_ls_'}" in l) and (f"{'unshifted'}" not in l) ] proxies = [] for var in variables: proxies.append(var[6:]) if sections is None: sections = np.unique(sample_df.dropna(subset = proxies, how = 'all')['section']) if len(sections) > 1: age_residuals = {} for section in sections: true_ages = sample_df[sample_df['section'] == section]['age'].values if mode == 'posterior': if type(full_trace) is list: for i in np.arange(len(full_trace)): if i == 0: post_ages = az.extract(full_trace[i].posterior)[str(section) + '_ages'].values else: post_ages_temp = az.extract(full_trace[i].posterior)[str(section) + '_ages'].values post_ages = np.hstack([post_ages, post_ages_temp]) else: post_ages = az.extract(full_trace.posterior)[str(section) + '_ages'].values elif mode == 'prior': if type(full_trace) is list: for i in np.arange(len(full_trace)): if i == 0: post_ages = az.extract(full_trace[i].prior)[str(section) + '_ages'].values else: post_ages_temp = az.extract(full_trace[i].prior)[str(section) + '_ages'].values post_ages = np.hstack([post_ages, post_ages_temp]) else: post_ages = az.extract(full_trace.prior)[str(section) + '_ages'].values true_ages_array = np.repeat(true_ages, post_ages.shape[1]).reshape(post_ages.shape) if len(sections) > 1: age_residuals[section] = true_ages_array - post_ages else: age_residuals = true_ages_array - post_ages return age_residuals