Source code for CONAN.geepee

import numpy as np
from celerite import terms
import george, celerite, spleaf
import spleaf.term as term
from types import SimpleNamespace as SN

[docs] gp_h3h4names = SN( h3 = {"sho":"Q", "exps2":"η", "rquad":"α", "qp":"η", "qp_mp":"η", "qp_sc":"b", "qp_ce":"C"}, h4 = {"qp":"P", "qp_sc":"P", "qp_ce":"P", "qp_mp":"P"})
#to introduce new GP kernel into CONAN # - simply add the kernel to the dictionary of possible kernels below. # with a shortcut name for calling the kernel in the code. # the kernel should be callable directly from the gp package, or if not, define a class/function for the kernel # - add the conversion function to the gp_params_convert class # the conversion function should take in the data type (lc or rv), amplitude, lengthscale, h3 and h4 # and return the parameters in the order required by the kernel's set_param function # new celerite kernels class celerite_QPTerm(terms.Term): # This is a celerite term that implements an approximate quasiperiodic kernel # eqn 56 of the celerite paper https://arxiv.org/pdf/1703.09710 # implemented in the notebook of the celerite paper https://github.com/dfm/celerite/blob/master/paper/figures/rotation/rotation.ipynb parameter_names = ("log_amp", "log_timescale", "factor", "log_period") def get_real_coefficients(self, params): log_amp, log_timescale, factor, log_period = params f = factor return ( np.exp(log_amp) * (1.0 + f) / (2.0 + f), np.exp(-log_timescale), ) def get_complex_coefficients(self, params): log_amp, log_timescale, factor, log_period = params f = factor return ( np.exp(log_amp) / (2.0 + f), 0.0, np.exp(-log_timescale), 2 * np.pi * np.exp(-log_period), ) class celerite_cosine(terms.Term): parameter_names = ("log_a", "log_P") def get_real_coefficients(self, params): log_a, log_nu = params return ( 0.0, 0.0, ) def get_complex_coefficients(self, params): log_a, log_nu = params return ( np.exp(log_a), 0.0, 0.0, np.exp(log_nu), ) # spleaf kernel translations def translate_param_cos(a,nu): return dict(a=a,b=0,la=0,nu=nu) def spleaf_cosine(a, nu): """ Cosine kernel with variance a and amplitude nu """ return term.TransformKernel(inner_kernel = term.QuasiperiodicKernel(1,0,0,1), translate_param = translate_param_cos, a = a, nu = nu) def translate_param_exps2(sig,P,eta): return dict(sig=sig,P=P,rho=1e100,eta=eta) def spleaf_exp_sine2(sig, P, eta, nharm=4): """expsine2 kernel from ESP kernel """ return term.TransformKernel(inner_kernel = term.ESPKernel(1,1,1e100,1,nharm), translate_param = translate_param_exps2, sig = sig, P = P, eta = eta) def translate_param_qp(sig,rho,eta,P): return dict(sig=sig, P=P, rho=rho,eta=eta) def spleaf_qp(sig, rho, eta, P, nharm=4): """ qp kernel from ESP kernel """ return term.TransformKernel(inner_kernel = term.ESPKernel(1,1,1,1,nharm), translate_param = translate_param_qp, sig = sig, rho=rho, eta = eta, P = P) def spleaf_mep(sig, rho, eta, P, nharm=4): """ qp kernel from MEP kernel """ return term.TransformKernel(inner_kernel = term.MEPKernel(1,1,1,1), translate_param = translate_param_qp, sig = sig, rho=rho, eta = eta, P = P) def translate_param_qpsc(a,la,b,nu): return dict(a=a,b=b,la=la,nu=nu) def spleaf_qpsc(a, la, b, nu): """ quasiperiodic kernel from sin and cos """ return term.TransformKernel(inner_kernel = term.QuasiperiodicKernel(1,1,1,1), translate_param = translate_param_qpsc, a = a, la = la, b = b, nu = nu) #========= possible kernels=========
[docs] george_kernels = dict( mat32 = george.kernels.Matern32Kernel, mat52 = george.kernels.Matern52Kernel, exp = george.kernels.ExpKernel, cos = george.kernels.CosineKernel, expsq = george.kernels.ExpSquaredKernel, exps2 = george.kernels.ExpSine2Kernel, qp = (george.kernels.ExpSquaredKernel,george.kernels.ExpSine2Kernel), # eq 55 (https://arxiv.org/pdf/1703.09710) rquad = george.kernels.RationalQuadraticKernel)
[docs] celerite_kernels = dict(mat32 = celerite.terms.Matern32Term, exp = celerite.terms.RealTerm, cos = celerite_cosine, sho = celerite.terms.SHOTerm, qp_ce = celerite_QPTerm)
[docs] spleaf_kernels = dict( mat32 = term.Matern32Kernel, mat52 = term.Matern52Kernel, exp = term.ExponentialKernel, cos = spleaf_cosine, sho = term.SHOKernel, expsq = term.ESKernel, #exp_sine kernel exps2 = spleaf_exp_sine2, qp = spleaf_qp, qp_sc = term.QuasiperiodicKernel, qp_mp = term.MEPKernel)
[docs] npars_gp = dict(mat32 = 2, mat52 = 2, exp = 2, cos = 2, expsq = 2, exps2 = 3, sho = 3, rquad = 3, qp = 4, qp_sc = 4, qp_mp = 4, qp_ce = 4 )
class gp_params_convert: """ object to convert gp parameters to required values for different kernels """ def __init__(self): self._allowed_kernels = dict( ge_ = [f"ge_{k}" for k in george_kernels.keys()], ce_ = [f"ce_{k}" for k in celerite_kernels.keys()], sp_ = [f"sp_{k}" for k in spleaf_kernels.keys()] ) def get_values(self, kernels, data, pars, gp_dim=1): """ transform pars into required values for given kernels. Parameters ----------- kernels: list,str kernel for which parameter transformation is performed. the kernels are prepended with "ge_","ce_" or "sp_" to indicate the package used. e.g. ["ge_mat32","ce_exp","sp_expsq"] data: str, one of ["lc","rv"] pars: iterable, parameters (amplitude,lengthscale,h3,h4) for each kernel in kernels. Returns -------- log_pars: iterable, log parameters to be used to set parameter vector of the gp object. """ assert data in ["lc","rv"],f'data can only be one of ["lc","rv"]' if isinstance(kernels, str): kernels= [kernels] conv_pars = [] alphas, betas = [], [] for i,kern in enumerate(kernels): assert kern[:3] in ["ge_","ce_","sp_"], f'gp_params_convert(): kernel must start with "ge_","ce_" or "sp_" but "{kern}" given' assert kern in self._allowed_kernels[kern[:3]], f'gp_params_convert(): `{kern[:2]}` kernel to convert must be one of {self._allowed_kernels[kern[:3]]} but "{kern}" given' # call class function with the name kern if kern[:3]=="sp_" and gp_dim>1: if i == 0: p,alpha,beta = self.__getattribute__(kern)(data,*pars[i*5:i*5+5]) conv_pars.extend([1]+p) alphas.append(alpha) betas.append(beta) else: alpha = pars[i*5]*1e-6 if data=="lc" else pars[i*5] beta = pars[i*5+4]*1e-6 if data=="lc" else pars[i*5+4] alphas.append(alpha) betas.append(beta) else: npars = 4 if data=='lc' else 5 p = self.__getattribute__(kern)(data,*pars[i*npars:i*npars+npars]) conv_pars.append(p) if kern[:3]=="sp_" and gp_dim>1: return np.array(conv_pars+alphas+betas) else: return np.concatenate(conv_pars) #======spleaf kernels======= # the kernels here are a direct function of the distance between points def sp_expsq(self, data, amplitude, lengthscale, h3=None, h4=None, h5=None): """ exponential sine kernel. spleaf ESKernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude sig = amplitude rho = lengthscale if h5==None or np.isnan(h5): return sig, rho else: h5 = h5*1e-6 if data == "lc" else h5 return [rho], sig, h5 def sp_exp(self, data, amplitude, lengthscale, h3=None, h4=None, h5=None): """ spleaf ExponentialKernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude a = amplitude**2 #variance la = 1/lengthscale if h5==None or np.isnan(h5): return a, la else: h5 = h5*1e-6 if data == "lc" else h5 return [la], a, h5 def sp_sho(self, data, amplitude, lengthscale, h3, h4=None, h5=None): """ simple harmonic oscillator kernel. spleaf SHOKernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude sig = amplitude P0 = lengthscale Q = h3 if h5==None or np.isnan(h5): return sig, P0, Q else: h5 = h5*1e-6 if data == "lc" else h5 return [P0, Q], sig, h5 def sp_mat32(self, data, amplitude, lengthscale, h3=None, h4=None, h5=None): """ Matern32 kernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude sig = amplitude rho = lengthscale if h5==None or np.isnan(h5): return sig, rho else: h5 = h5*1e-6 if data == "lc" else h5 return [rho], sig, h5 def sp_mat52(self, data, amplitude, lengthscale, h3=None, h4=None, h5=None): """ Matern52 kernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude sig = amplitude rho = lengthscale if h5==None or np.isnan(h5): return sig, rho else: h5 = h5*1e-6 if data == "lc" else h5 return [rho], sig, h5 def sp_cos(self, data, amplitude, lengthscale, h3=None, h4=None, h5=None): """ Cosine kernel built from spleaf QuasiperiodicKernel, with b and la set to 0 """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude variance = amplitude**2 nu = 2*np.pi/lengthscale if h5==None or np.isnan(h5): return variance, nu else: h5 = h5*1e-6 if data == "lc" else h5 return [nu], variance, h5 def sp_exps2(self, data, amplitude, lengthscale, h3, h4=None, h5=None): """ expsine2 kernel. derived from spleaf ESPKernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude sig = amplitude P = lengthscale eta = h3 if h5==None or np.isnan(h5): return sig, P, eta else: h5 = h5*1e-6 if data == "lc" else h5 return [P, eta], sig, h5 def sp_qp(self, data, amplitude, lengthscale, h3, h4, h5=None): """ Exponential-sine periodic (ESP) kernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude sig = amplitude rho = lengthscale eta = h3 P = h4 if h5==None or np.isnan(h5): return sig, rho, eta, P else: h5 = h5*1e-6 if data == "lc" else h5 return [rho, eta, P], sig, h5 def sp_qp_mp(self, data, amplitude, lengthscale, h3, h4, h5=None): """ MEP kernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude sig = amplitude rho = lengthscale eta = h3 P = h4 if h5==None or np.isnan(h5): return sig, rho, eta, P else: h5 = h5*1e-6 if data == "lc" else h5 return [rho, eta, P], sig, h5 def sp_qp_sc(self, data, amplitude, lengthscale, h3, h4, h5=None): """ Quasiperiodic kernel with sine and cosine components """ if amplitude==-1: amplitude = 1 #a else: amplitude = amplitude*1e-6 if data == "lc" else amplitude amplitude2 = h3 #b if amplitude2==-1: amplitude2 = 1 else: amplitude2 = amplitude2*1e-6 if data == "lc" else amplitude2 a = amplitude**2 la = 1/lengthscale b = amplitude2**2 nu = 2*np.pi/h4 if h5==None or np.isnan(h5): return a, la, b, nu else: h5 = h5*1e-6 if data == "lc" else h5 return [la,b,nu], a, h5 #=======celerite kernels ======= # the kernels here are directly a function of the distance between points, # so we do not square the lengthscale def ce_sho(self, data, amplitude, lengthscale, h3, h4=None, h5=None): """ amplitude: the standard deviation of the process lengthscale: the undamped period of the oscillator for quality factor Q > 1/2, the characteristic oscillation freq(or period) is not equal to the freq(period) of the undamped oscillator, ω0 see transformation here: https://celerite2.readthedocs.io/en/latest/api/python/#celerite2.terms.SHOTerm """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude Q = h3 w0 = 2*np.pi/lengthscale S0 = amplitude**2/(w0*Q) log_S0, log_w0, log_Q = np.log(S0), np.log(w0), np.log(Q) return log_S0, log_Q, log_w0 def ce_cos(self, data, amplitude, lengthscale, h3=None, h4=None, h5=None): """ CosineKernel implementation in celerite """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude log_var = np.log(amplitude**2) log_nu = np.log(2*np.pi/lengthscale) return log_var, log_nu def ce_exp(self, data, amplitude, lengthscale, h3=None, h4=None, h5=None): """ celerite real term: same as an exponential kernel like in George """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude c = 1/lengthscale log_c = np.log(c) log_a = np.log(amplitude**2) #log_variance return log_a, log_c def ce_mat32(self, data, amplitude, lengthscale, h3=None, h4=None, h5=None): """ celerite mat32 """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude log_sigma = np.log(amplitude) rho = lengthscale log_rho = np.log(rho) return log_sigma, log_rho def ce_qp_ce(self, data, amplitude, lengthscale, h3, h4, h5=None): """ celerite approximation of quasiperiodic kernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude log_B = np.log(amplitude**2) log_L = np.log(1/lengthscale) # log_C = np.log(h3) C = h3 log_P = np.log(h4) return log_B, log_L, C, log_P, #=====george kernels======= # stationary kernels depend on the square of the distance between points r^2 = (x1-x2)^2 # in this case, the metric=lengthscale^2, since it scales r^2 # Non-stationary kernels depend on the distance between points themselves, r # and the amplitude is the standard deviation of the process. but george takes in the variance, so we need to square it def ge_mat32(self, data, amplitude, lengthscale, h3=None, h4=None, h5=None): """ George mat32, stationary kernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude log_var = np.log(amplitude**2) metric = lengthscale**2 log_metric = np.log(metric) return log_var, log_metric def ge_cos(self, data, amplitude, lengthscale, h3=None, h4=None, h5=None): """ George CosineKernel, non-stationary kernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude log_var = np.log(amplitude**2) log_period = np.log(lengthscale) return log_var, log_period def ge_mat52(self, data, amplitude, lengthscale, h3=None, h4=None, h5=None): """ George mat52, stationary kernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude log_var = np.log(amplitude**2) metric = lengthscale**2 log_metric = np.log(metric) return log_var, log_metric def ge_expsq(self, data, amplitude, lengthscale, h3=None, h4=None, h5=None): """ George expsq, stationary kernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude log_var = np.log(amplitude**2) metric = lengthscale**2 log_metric = np.log(metric) return log_var, log_metric def ge_exp(self, data, amplitude, lengthscale, h3=None, h4=None, h5=None): """ George exp, stationary kernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude log_var = np.log(amplitude**2) metric = lengthscale**2 log_metric = np.log(metric) return log_var, log_metric def ge_exps2(self, data, amplitude, lengthscale, h3, h4=None, h5=None): """ George expsine2 kernel, non-stationary kernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude eta = h3 log_var = np.log(amplitude**2) gamma = 1/(2*eta**2) period = lengthscale log_period = np.log(period) return log_var, gamma, log_period def ge_qp(self, data, amplitude, lengthscale, h3, h4, h5=None): """ George quasiperiodic. ge_qp = ge_expsq * ge_exps2 """ eta, P = h3, h4 log_var, log_metric = self.ge_expsq(data, amplitude, lengthscale) log_var2, gamma, log_period = self.ge_exps2(data, -1, P, eta) return log_var, log_metric, log_var2, gamma, log_period def ge_rquad(self, data, amplitude, lengthscale, h3, h4=None, h5=None): """ George rationalquadratic, stationary kernel """ if amplitude==-1: amplitude = 1 else: amplitude = amplitude*1e-6 if data == "lc" else amplitude log_var = np.log(amplitude**2) log_alpha = np.log(h3) metric = lengthscale**2 log_metric = np.log(metric) return log_var, log_alpha, log_metric def __repr__(self): return 'object to convert gp amplitude and lengthscale to required value for different kernels' class GPSaveObj: """ object to save the state of a GP kernel for post-fit manipulation. """ def __init__(self, fname,gp_pck, gp, x, gp_pars, ndim_gp, gp_cols, gp_cols_err, residuals, kernel=None): """ Parameters ---------- fname: str, optional The filename where the GP object is saved. gp_pck: str, optional The package used for the GP, e.g., 'ge' for George, 'ce' for celerite, 'sp' for spleaf. gp: object, optional The GP object itself x: array-like, optional The x values at which the GP is evaluated. gp_pars: dict The parameters of the GP, such as amplitude, lengthscale, h3, h4, h5 ndim_gp: int, optional The number of dimensions of the GP. gp_cols: list, optional The columns of the GP data. gp_cols_err: list, optional The columns of the GP data errors. residuals: array-like, optional The residuals to which the GP is fit. kernel: str, optional The kernel used for the GP. """ self.fname = fname self.gp = gp self.gp_pck = gp_pck self.x = x self.ndim_gp = ndim_gp self.resid = residuals self.kernel = kernel self.gp_cols = gp_cols self.gp_cols_err = gp_cols_err self.gp_pars = gp_pars def get_param_dict(self, original=False): """ Get the GP parameters as a dictionary. Parameters ---------- original: bool, optional If True, return the original package values. If False, return the converted values. Returns ------- dict A dictionary of the GP parameters. """ if original: if self.gp_pck in ['ge','ce']: return dict(self.gp.get_parameter_dict()) else: return {k:v for k,v in zip(self.gp.param, self.gp.get_param())} else: return self.gp_pars def predict(self, x=None,return_var=False, series_id=0): """ Predict the GP at the given x values. Parameters ---------- x: array-like, optional The x values at which to predict the GP. If None, use the saved x values. return_var: bool, optional If True, return the variance of the GP prediction. series_id: int, optional For multiseries GP fit with Spleaf, the series ID (dimension) for the GP prediction. Default is 0. Returns ------- array-like The GP prediction at the given x values. If return_var is True, returns a tuple of (mean, variance). If return_var is False, returns only the mean. """ x = self.x if x is None else x if self.gp_pck in ['ge','ce']: srt_x = np.argsort(x) if x.ndim==1 else np.argsort(x[:,0]) unsrt_x = np.argsort(srt_x) #indices to unsort the gp axis return self.gp.predict(self.resid, t=x[srt_x], return_var=return_var, return_cov=False)[unsrt_x] elif self.gp_pck == 'sp': if self.ndim_gp == 1: srt_x = np.argsort(x) unsrt_x = np.argsort(srt_x) #indices to unsort the gp axis return self.gp.conditional(self.resid, x[srt_x], calc_cov=return_var)[unsrt_x] else: self.gp.kernel["GP"].set_conditional_coef(series_id=series_id) calc_cov = "diag" if return_var else False return self.gp.conditional(self.resid, x, calc_cov=calc_cov) def __repr__(self): return f"GPSaveObj({self.fname}: kernel={self.kernel}, ndim={self.ndim_gp}, cols={self.gp_cols}, cols_err={self.gp_cols_err})" def __str__(self): return self.__repr__()