Source code for CONAN.misc

import numpy as np
from types import SimpleNamespace, FunctionType

[docs] default_LCpars_dict = dict( Duration = 0.1, rho_star = None, RpRs = 0., Impact_para = 0, T_0 = 0, Period = 1, Eccentricity = 0., omega = 90, q1 = 0, q2 = 0, D_occ = 0, Fn = 0, ph_off = 0, A_ev = 0, f1_ev = 0, A_db = 0 )
[docs] default_RVpars_dict = dict( T_0 = 0, Period = 1, Eccentricity = 0., omega = 90, K = 0 )
[docs] filter_shortcuts = dict(kepler='Kepler/Kepler.k', tess='TESS/TESS.Red', cheops='CHEOPS/CHEOPS.band', wfc3_g141='HST/WFC3_IR.G141', wfc3_g102='HST/WFC3_IR.G102', sp36='Spitzer/IRAC.I1', sp45='Spitzer/IRAC.I2', ug='Geneva/Geneva.U', b1='Geneva/Geneva.B1', b2='Geneva/Geneva.B2', bg='Geneva/Geneva.B', gg='Geneva/Geneva.G', v1='Geneva/Geneva.V2', vg='Geneva/Geneva.V', sdss_g='SLOAN/SDSS.g', sdss_r='SLOAN/SDSS.r', sdss_i='SLOAN/SDSS.i', sdss_z='SLOAN/SDSS.z')
def _raise(exception_type, msg): raise exception_type(msg) def _print_output(self, section: str, file=None): """function to print to screen/file the different sections of CONAN setup""" prior_str = lambda v: 'None' if v==None else f'F({v})' if isinstance(v,(int,float)) else f"N({v[0]},{v[1]})" if len(v)==2 else f"U({v[0]},{v[1]},{v[2]})" if len(v)==3 else f"TN({v[0]},{v[1]},{v[2]},{v[3]})" lc_possible_sections = ["lc_baseline", "sinusoid", "gp", "planet_parameters", "custom_LCfunction","depth_variation","timing_variation", "phasecurve", "limb_darkening", "contamination","sinusoid"] rv_possible_sections = ["rv_baseline", "rv_gp", "custom_RVfunction"] fit_possible_sections = ["fit", "stellar_pars"] spacing = "" if file is None else "\t" if self._obj_type == "lc_obj": assert section in lc_possible_sections, f"{section} not a valid section of `lc_obj`. Section must be one of {lc_possible_sections}." max_name_len = max([len(n) for n in self._names]+[len("name")]) #max length of lc filename max_filt_len = max([len(n) for n in self._filters]+[len("flt")]) #max length of lc filter name max_wv_len = max([len(str(n)) for n in self._wl]+[4]) #max length of wavelength if self._obj_type == "rv_obj": assert section in rv_possible_sections, f"{section} not a valid section of `rv_obj`. Section must be one of {rv_possible_sections}." max_name_len = max([len(n) for n in self._names]+[6]) #max length of lc filename, if self._obj_type == "fit_obj": assert section in fit_possible_sections, f"{section} not a valid section of `fit_obj`. Section must be one of {fit_possible_sections}." if section == "lc_baseline": _print_lc_baseline = """# ============ Input lightcurves, filters baseline function =======================================================""" +\ f"""\n{spacing}{"name":{max_name_len}s} {"flt":{max_filt_len}s} {"𝜆_𝜇m":{max_wv_len}s} |{"Ssmp":4s} {"ClipOutliers":12s} {"scl_col":7s} |{"off":3s} {"col0":4s} {"col3":4s} {"col4":4s} {"col5":4s} {"col6":4s} {"col7":4s} {"col8":4s}|{"sin":3s} {"id":2s} {"GP":2s} {"spline":15s}""" #define print out format txtfmt = f"\n{spacing}{{0:{max_name_len}s}} {{1:{max_filt_len}s}} {{2:{max_wv_len}s}}"+" |{3:4s} {4:12s} {5:7s} |{6:3s} {7:4d} {8:4d} {9:4d} {10:4d} {11:4d} {12:4d} {13:4d}|{14:3s} {15:2d} {16:2s} {17:15s}" for i in range(len(self._names)): t = txtfmt.format(self._names[i], self._filters[i], str(self._wl[i]), self._ss[i].config,self._clipped_data.config[i], self._rescaled_data.config[i], " "+self._fit_offset[i], *self._bases[i][:-1], self._groups[i], self._useGPphot[i],self._lcspline[self._names[i]].conf, ) _print_lc_baseline += t print(_print_lc_baseline, file=file) if section == "sinusoid": DA = self._sine_dict _print_sinusoid = f"""# ============ Sinusoidal signals: Amp*trig(2𝜋/P*(x-x0)) - trig=sin or cos or both added==========================""" +\ f"""\n{spacing}{"name/filt":{max_name_len}s} {"trig":7s} {"n_harmonics":11s} {"x":4s} {"Amp[ppm]":18s} {"P":18s} {"x0":18s}""" #define print out format max_namefilt_len = max([len(n) for n in self._names+self._filters]+[9]) #max length of lcname/filtname txtfmt = f"\n{spacing}{{0:{max_namefilt_len}s}}"+" {1:7s} {2:11d} {3:4s} {4:18s} {5:18s} {6:18s}" for k,v in DA.items(): if v.trig is not None: t = txtfmt.format(v.name, v.trig, v.n, v.par, v.Amp.prior_str,v.P.prior_str, v.x0.prior_str) _print_sinusoid += t print(_print_sinusoid, file=file) if section == "gp": DA = self._GP_dict max_namefilt_len = max([len(n) for n in self._names+self._filters]+[9]) #max length of lcname/filtname _print_gp = f"""# ============ Photometry GP properties (start newline with name of * or + to Xply or add a 2nd gp to last file) =========""" _print_gp += f"""\n{spacing}{"name/filt":{max_namefilt_len}s} {"kern":5s} {'par':6s} {'h1:[Amp]':18s} {'h2:[len_scale1]':18s} {'h3:[Q,η,C,α,b]':18s} {'h4:[P]':12s} """ if DA != {}: if self._sameLCgp.filtflag: for f in self._sameLCgp.filters: lc = self._sameLCgp.LCs[f][0] ngp = DA[lc]["ngp"] txtfmt = f"\n{spacing}{{0:{max_namefilt_len}s}}"+" {1:5s} {2:6s} {3:18s} {4:18s} {5:18s} {6:12s}" val = [f, DA[lc]["amplitude0"].user_data.kernel, DA[lc]["amplitude0"].user_data.col, DA[lc]["amplitude0"].prior_str, DA[lc]["lengthscale0"].prior_str, DA[lc]["h30"].prior_str, DA[lc]["h40"].prior_str] for n in range(1,ngp): txtfmt += f"\n{spacing}{{{n*7+0}:{max_namefilt_len}s}}"+f" {{{n*7+1}:5s}} {{{n*7+2}:6s}} {{{n*7+3}:18s}} {{{n*7+4}:18s}} {{{n*7+5}:18s}} {{{n*7+6}:12s}}" val.extend([" "*(max_namefilt_len-3)+f'|{DA[lc]["op"][n-1]}|', DA[lc][f"amplitude{n}"].user_data.kernel, DA[lc][f"amplitude{n}"].user_data.col, DA[lc][f"amplitude{n}"].prior_str, DA[lc][f"lengthscale{n}"].prior_str, DA[lc][f"h3{n}"].prior_str, DA[lc][f"h4{n}"].prior_str] ) t = txtfmt.format(*val) _print_gp += t else: if self._allLCgp: #shortcut print just one line gp config if all LCs have the same GP equal_allgp = all([_compare_nested_structures(DA[list(DA.keys())[0]],DA[lc]) for lc in list(DA.keys())[1:]]) else: equal_allgp = False for lc in DA.keys(): ngp = DA[lc]["ngp"] txtfmt = f"\n{spacing}{{0:{max_namefilt_len}s}}"+" {1:5s} {2:6s} {3:18s} {4:18s} {5:18s} {6:12s}" val = ['same' if self._sameLCgp.flag else "all" if equal_allgp else lc, DA[lc]["amplitude0"].user_data.kernel, DA[lc]["amplitude0"].user_data.col, DA[lc]["amplitude0"].prior_str, DA[lc]["lengthscale0"].prior_str, DA[lc]["h30"].prior_str, DA[lc]["h40"].prior_str ] for n in range(1,ngp): txtfmt += f"\n{spacing}{{{n*7+0}:{max_namefilt_len}s}}"+f" {{{n*7+1}:5s}} {{{n*7+2}:6s}} {{{n*7+3}:18s}} {{{n*7+4}:18s}} {{{n*7+5}:18s}} {{{n*7+6}:12s}}" val.extend([" "*(max_namefilt_len-3)+f'|{DA[lc]["op"][n-1]}|', DA[lc][f"amplitude{n}"].user_data.kernel, DA[lc][f"amplitude{n}"].user_data.col, DA[lc][f"amplitude{n}"].prior_str, DA[lc][f"lengthscale{n}"].prior_str, DA[lc][f"h3{n}"].prior_str, DA[lc][f"h4{n}"].prior_str] ) t = txtfmt.format(*val) _print_gp += t if self._sameLCgp.flag or equal_allgp: #dont print the other lc GPs if same_GP is True break print(_print_gp, file=file) if section == "planet_parameters": DA = self._planet_pars notes = dict(RpRs="#range[-0.5,0.5]",Impact_para="#range[0,2]",K="#unit(same as RVdata)",T_0="#unit(days)",Period="#range[0,inf]days", Eccentricity="#choice in []|range[0,1]/range[-1,1]",omega="#choice in []|range[0,360]deg/range[-1,1]", sesinw="#choice in []|range[0,1]/range[-1,1]",secosw="#choice in []|range[0,360]deg/range[-1,1]") _print_planet_parameters = f"""# ============ Planet parameters (Transit and RV) setup ========================================================== """+\ f"""\n{spacing}{'name':25s} {'fit':3s} \t{'prior':35s}\tnote""" #define print out format txtfmt = f"\n{spacing}"+"{0:25s} {1:3s} \t{2:35s}\t{3}" #print line for stellar density or duration p = "rho_star" if "rho_star" in DA[f'pl{1}'].keys() else "Duration" popt = "[rho_star]/Duration" if "rho_star" in DA[f'pl{1}'].keys() else "rho_star/[Duration]" _print_planet_parameters += txtfmt.format( popt, DA[f'pl{1}'][p].to_fit, DA[f'pl{1}'][p].prior_str, "#choice in []|unit(gcm^-3/days)") _print_planet_parameters += f"\n{spacing}--------repeat this line & params below for multisystem, adding '_planet_number' to the names e.g RpRs_1 for planet 1, ..." #then cycle through parameters for each planet for n in range(1,self._nplanet+1): lbl = f"_{n}" if self._nplanet>1 else "" ecc_w_opt = {} ecc_w_opt["Eccentricity"] = ecc_w_opt["sesinw"] = f"Eccentricity{lbl}/[sesinw{lbl}]" if "sesinw" in DA[f'pl{1}'].keys() else f"[Eccentricity{lbl}]/sesinw{lbl}" ecc_w_opt["omega"] = ecc_w_opt["secosw"] = f"omega{lbl}/[secosw{lbl}]" if "secosw" in DA[f'pl{1}'].keys() else f"[omega{lbl}]/secosw{lbl}" for i,p in enumerate(self._TR_RV_parnames): if p in ["rho_star","Duration"]: continue elif p in ["Eccentricity","omega","sesinw","secosw"]: t = txtfmt.format( ecc_w_opt[p], DA[f'pl{n}'][p].to_fit, DA[f'pl{n}'][p].prior_str, notes[p]) else: t = txtfmt.format( p+lbl, DA[f'pl{n}'][p].to_fit, DA[f'pl{n}'][p].prior_str, notes[p]) _print_planet_parameters += t if n!=self._nplanet: _print_planet_parameters += f"\n{spacing}------------" print(_print_planet_parameters, file=file) if section == "custom_LCfunction": DA = self._custom_LCfunc flag = False if DA.func is None else True _print_custom_function = f"""# ============ Custom LC function (read from custom_LCfunc.py file)================================================""" #define print out format txtfmt = f"\n{spacing}{{0:16s}}: {{1:40s}}\t{{2}}" _print_custom_function += txtfmt.format("function", DA.func.__name__ if flag else 'None', "#custom function/class to combine with/replace LCmodel") _print_custom_function += txtfmt.format("x",DA.x if flag else 'None',"#independent variable [time, phase_angle, col3-8]") if flag: if DA.func_args == {}: # if custom func does not takein any additional arguments fa_str = 'None' else: fa = DA.func_args fa_str = [] for k in fa.keys(): if isinstance(fa[k],(int,float)): fa_str.append(f'{k}:F({fa[k]})') if isinstance(fa[k],tuple): fa_str.append(f"{k}:{'U' if len(fa[k])==3 else 'N' if len(fa[k])==2 else 'TN'}{str(fa[k]).replace(' ','')}" ) fa_str = ",".join(fa_str) else: fa_str = 'None' _print_custom_function += txtfmt.format("func_pars",fa_str,"#param names&priors e.g. A:U(0,1,2),P:N(2,1)") exa_str = [f"{k}:{v}" for k,v in DA.extra_args.items()] exa_str = ",".join(exa_str) if exa_str!=[] else 'None' _print_custom_function += txtfmt.format("extra_args",exa_str,"#extra args to func as a dict e.g ld_law:quad") _print_custom_function += txtfmt.format("op_func",'None' if (DA.replace_LCmodel or not flag) else DA.op_func.__name__ ,"#function to combine the LC and custom models") _print_custom_function += txtfmt.format("replace_LCmodel",str(DA.replace_LCmodel) if flag else 'False',"#if the custom function replaces the LC model") print(_print_custom_function, file=file) if section == "custom_RVfunction": DA = self._custom_RVfunc flag = False if DA.func is None else True _print_custom_function = f"""# ============ Custom RV function (read from custom_RVfunc.py file)================================================""" #define print out format txtfmt = f"\n{spacing}{{0:16s}}: {{1:40s}}\t{{2}}" _print_custom_function += txtfmt.format("function", DA.func.__name__ if flag else 'None', "#custom function/class to combine with/replace RVmodel") _print_custom_function += txtfmt.format("x",DA.x if flag else 'None',"#independent variable [time, true_anomaly,col3-5]") if flag: if DA.func_args == {}: # if custom func does not takein any additional arguments fa_str = 'None' else: fa = DA.func_args fa_str = [] for k in fa.keys(): if isinstance(fa[k],(int,float)): fa_str.append(f'{k}:F({fa[k]})') if isinstance(fa[k],tuple): fa_str.append(f"{k}:{'U' if len(fa[k])==3 else 'N' if len(fa[k])==2 else 'TN'}{str(fa[k]).replace(' ','')}" ) fa_str = ",".join(fa_str) else: fa_str = 'None' _print_custom_function += txtfmt.format("func_pars",fa_str,"#param names&priors e.g. A:U(0,1,2),P:N(2,1)") exa_str = [f"{k}:{v}" for k,v in DA.extra_args.items()] exa_str = ",".join(exa_str) if exa_str!=[] else 'None' _print_custom_function += txtfmt.format("extra_args",exa_str,"#extra args to func as a dict") _print_custom_function += txtfmt.format("op_func",'None' if (DA.replace_RVmodel or not flag) else DA.op_func.__name__ ,"#function to combine the RV and custom models") _print_custom_function += txtfmt.format("replace_RVmodel",str(DA.replace_RVmodel) if flag else 'False',"#if the custom function replaces the RV model") print(_print_custom_function, file=file) if section == "depth_variation": grnames = np.array(list(sorted(set(self._groups)))) ngroup = len(grnames) _print_depth_variation = f"""# ============ ddF setup ========================================================================================"""+\ f"""\n{spacing}{"Fit_ddFs":8s}\t{"dRpRs":16s}\tdiv_white""" #define print out format txtfmt = f"\n{spacing}"+"{0:8s}\t{1:16s}\t{2:3s}" # pri_ddf = f"N({self._ddfs.drprs.prior_mean},{self._ddfs.drprs.prior_width_lo})" if self._ddfs.drprs.prior=="p" else f"U({self._ddfs.drprs.bounds_lo},{self._ddfs.drprs.start_value},{self._ddfs.drprs.bounds_hi})" pri_ddf = prior_str(self._ddfs.drprs.user_input) t = txtfmt.format(self._ddfs.ddfYN, pri_ddf, self._ddfs.divwhite) _print_depth_variation += t print(_print_depth_variation, file=file) if section == "timing_variation": _print_timing_variation = f"""# ============ TTV setup ========================================================================================"""+\ f"""\n{spacing}{"Fit_TTVs":8s}\t{"dt_priors(deviation from linear T0)":35s}\t{"transit_baseline[P]":19s}\t{"per_LC_T0":10s}\tinclude_partial""" #define print out format txtfmt = f"\n{spacing}"+"{0:8s}\t{1:35s}\t{2:19.4f}\t{3:10s}\t{4}" pri_ttv = f"N{self._ttvs.dt}" if len(self._ttvs.dt)==2 else f"U{self._ttvs.dt}" t = txtfmt.format(self._ttvs.to_fit, pri_ttv.replace(" ",""),self._ttvs.baseline, str(self._ttvs.per_LC_T0), str(self._ttvs.include_partial)) _print_timing_variation += t print(_print_timing_variation, file=file) if section == "phasecurve": pars = ["D_occ", "Fn", "ph_off","A_ev","f1_ev","A_db"] # descr = ["occultation depth", "atmospheric amplitude", "phase offset in degrees","ellipsoidal variation"] _print_phasecurve = f"""# ============ Phase curve setup ================================================================================ """+\ f"""\n{spacing}{'flt':{max_filt_len}s} {'D_occ[ppm]':18s} {'Fn[ppm]':18s} {'ph_off[deg]':12s} {'A_ev[ppm]':15s} {'f1_ev[ppm]':13s} {'A_db[ppm]':15s} {'pc_model':8s}""" #define print out format txtfmt = f"\n{spacing}{{0:{max_filt_len}s}}"+" {1:18s} {2:18s} {3:12s} {4:15s} {5:13s} {6:15s} {7:8s}" DA = self._PC_dict for i,f in enumerate(self._filnames): pri_Docc = prior_str(DA['D_occ'][f].user_input) pri_Fn = prior_str(DA['Fn'][f].user_input) pri_phoff = prior_str(DA['ph_off'][f].user_input) pri_Aev = prior_str(DA['A_ev'][f].user_input) pri_f1ev = prior_str(DA['f1_ev'][f].user_input) pri_Adb = prior_str(DA['A_db'][f].user_input) t = txtfmt.format(f, pri_Docc, pri_Fn, pri_phoff, pri_Aev, pri_f1ev,pri_Adb,self._pcmodel[i]) _print_phasecurve += t print(_print_phasecurve, file=file) if section == "limb_darkening": DA = self._ld_dict _print_limb_darkening = f"""# ============ Limb darkening setup ============================================================================= """+\ f"""\n{spacing}{'filters':7s}\tfit\t{'q1':17s}\t{'q2':17s}""" #define print out format txtfmt = f"\n{spacing}"+"{0:7s}\t{1:3s}\t{2:17s}\t{3:17s}" for i in range(len(self._filnames)): pri_q1 = f"N({DA['q1'][i]},{DA['sig_lo1'][i]})" if DA['sig_lo1'][i] else f"U({DA['bound_lo1'][i]},{DA['q1'][i]},{DA['bound_hi1'][i]})" if DA['bound_hi1'][i] else f"F({DA['q1'][i]})" pri_q2 = f"N({DA['q2'][i]},{DA['sig_lo2'][i]})" if DA['sig_lo2'][i] else f"U({DA['bound_lo2'][i]},{DA['q2'][i]},{DA['bound_hi2'][i]})" if DA['bound_hi2'][i] else f"F({DA['q2'][i]})" to_fit = "y" if (pri_q1[0]!="F" or pri_q2[0]!="F") else "n" t = txtfmt.format(self._filnames[i], to_fit, pri_q1, pri_q2) _print_limb_darkening += t print(_print_limb_darkening, file=file) if section == "contamination": DA = self._contfact_dict _print_contamination = f"""# ============ contamination setup (give contamination as flux ratio) ======================================== """+\ f"""\n{spacing}{'flt':{max_filt_len}s}\tcontam_factor""" #define print out format txtfmt = f"\n{spacing}{{0:{max_filt_len}s}}"+"\t{1:20s}" for i,f in enumerate(self._filnames): t = txtfmt.format(f,prior_str(DA[f].user_input)) _print_contamination += t print(_print_contamination, file=file) if section == "stellar_pars": DA = self._stellar_dict _print_stellar_pars = f"""# ============ Stellar input properties ======================================================================"""+\ f"""\n{spacing}{'# parameter':13s} value """+\ f"""\n{spacing}{'Radius_[Rsun]':13s} N({DA['R_st'][0]},{DA['R_st'][1]})"""+\ f"""\n{spacing}{'Mass_[Msun]':13s} N({DA['M_st'][0]},{DA['M_st'][1]})"""+\ f"""\n{spacing}Input_method:[R+rho(Rrho), M+rho(Mrho)]: {DA['par_input']}""" print(_print_stellar_pars, file=file) if section == "fit": DA = self._fit_dict # if all DA['apply_LCjitter'] is the same, set it to one value app_jitt = DA['apply_LCjitter'][0] if len(set(DA['apply_LCjitter']))==1 else str(DA['apply_LCjitter']).replace(" ","") app_RVjitt = DA['apply_RVjitter'][0] if len(set(DA['apply_RVjitter']))==1 else str(DA['apply_RVjitter']).replace(" ","") _print_fit_pars = f"""# ============ FIT setup ====================================================================================="""+\ f"""\n{spacing}{'Number_steps':40s} {DA['n_steps']} \n{spacing}{'Number_chains':40s} {DA['n_chains']} \n{spacing}{'Number_of_processes':40s} {DA['n_cpus']} """+\ f"""\n{spacing}{'Burnin_length':40s} {DA['n_burn']} \n{spacing}{'n_live':40s} {DA['n_live']} \n{spacing}{'force_nlive':40s} {DA['force_nlive']} \n{spacing}{'d_logz':40s} {DA['dyn_dlogz']} """+\ f"""\n{spacing}{'Sampler(emcee/dynesty)':40s} {DA['sampler']} \n{spacing}{'emcee_move(stretch/demc/snooker)':40s} {DA['emcee_move']} """+\ f"""\n{spacing}{'nested_sampling(static/dynamic[pfrac])':40s} {DA['nested_sampling']} \n{spacing}{'leastsq_for_basepar(y/n)':40s} {DA['leastsq_for_basepar']} """+\ f"""\n{spacing}{'apply_LCjitter(y/n,list)':40s} {app_jitt} \n{spacing}{'apply_RVjitter(y/n,list)':40s} {app_RVjitt} """+\ f"""\n{spacing}{'LCjitter_loglims(auto/[lo,hi])':40s} {str(DA['LCjitter_loglims']).replace(" ","")} \n{spacing}{'RVjitter_lims(auto/[lo,hi])':40s} {str(DA['RVjitter_lims']).replace(" ","")} """+\ f"""\n{spacing}{'LCbasecoeff_lims(auto/[lo,hi])':40s} {str(DA['LCbasecoeff_lims']).replace(" ","")} \n{spacing}{'RVbasecoeff_lims(auto/[lo,hi])':40s} {str(DA['RVbasecoeff_lims']).replace(" ","")} """+\ f"""\n{spacing}{'Light_Travel_Time_correction(y/n)':40s} {DA['LTT_corr']} \n{spacing}{'apply_LC_GPndim_jitter(y/n)':40s} {DA['apply_LC_GPndim_jitter']} \n{spacing}{'apply_RV_GPndim_jitter(y/n)':40s} {DA['apply_RV_GPndim_jitter']} """ +\ f"""\n{spacing}{'apply_LC_GPndim_offset(y/n)':40s} {DA['apply_LC_GPndim_offset']} \n{spacing}{'apply_RV_GPndim_offset(y/n)':40s} {DA['apply_RV_GPndim_offset']} """ print(_print_fit_pars, file=file) if section == "rv_baseline": _print_rv_baseline = """# ============ Input RV curves, baseline function, GP, spline, gamma ============================================ """+\ f"""\n{spacing}{'name':{max_name_len}s} {'RVunit':6s} {"scl_col":7s} |{'col0':4s} {'col3':4s} {'col4':4s} {"col5":4s}| {'sin':3s} {"GP":2s} {"spline_config ":15s} | {f'gamma_{self._RVunit}':14s} """ if self._names != []: DA = self._rvdict txtfmt = f"\n{spacing}{{0:{max_name_len}s}}"+" {1:6s} {2:7s} |{3:4d} {4:4d} {5:4d} {6:4d}| {7:3d} {8:2s} {9:15s} | {10:14s}" for i in range(self._nRV): gam_pri_ = prior_str(DA["gamma"][i].user_input) t = txtfmt.format(self._names[i],self._RVunit,self._rescaled_data.config[i], *self._RVbases[i], self._useGPrv[i],self._rvspline[self._names[i]].conf,gam_pri_) _print_rv_baseline += t print(_print_rv_baseline, file=file) if section == "rv_gp": DA = self._rvGP_dict _print_gp = f"""# ============ RV GP properties (start newline with name of * or + to Xply or add a 2nd gp to last file) =======""" _print_gp += f"""\n{spacing}{"name":{max_name_len}s} {"kern":5s} {'par':6s} {'h1:[Amp_ppm]':18s} {'h2:[len_scale]':18s} {'h3:[Q,η,C,α,b]':18s} {'h4:[P]':12s} | {'h5:[Der_Amp_ppm]':16s} {'ErrCol':6s}""" if DA != {}: if self._allRVgp: #shortcut print just one line gp config if all RVs have the same GP equal_allrvgp = all([_compare_nested_structures(DA[list(DA.keys())[0]],DA[rv]) for rv in list(DA.keys())[1:]]) else: equal_allrvgp = False for rv in DA.keys(): ngp = DA[rv]["ngp"] txtfmt = f"\n{spacing}{{0:{max_name_len}s}}"+" {1:5s} {2:6s} {3:18s} {4:18s} {5:18s} {6:12s} | {7:16s} {8:6s}" val = ['same' if self._sameRVgp.flag else "all" if equal_allrvgp else rv, DA[rv]["amplitude0"].user_data.kernel, DA[rv]["amplitude0"].user_data.col, DA[rv]["amplitude0"].prior_str,DA[rv]["lengthscale0"].prior_str, DA[rv]["h30"].prior_str, DA[rv]["h40"].prior_str, DA[rv]["h50"].prior_str,DA[rv]["amplitude0"].user_data.errcol] for n in range(1,ngp): txtfmt += f"\n{spacing}{{{n*9+0}:{max_name_len}s}}"+f" {{{n*9+1}:5s}} {{{n*9+2}:6s}} {{{n*9+3}:18s}} {{{n*9+4}:18s}} {{{n*9+5}:18s}} {{{n*9+6}:12s}} | {{{n*9+7}:16s}} {{{n*9+8}:6s}}" val.extend([" "*(max_name_len-3)+f'|{DA[rv]["op"][n-1]}|', DA[rv][f"amplitude{n}"].user_data.kernel, DA[rv][f"amplitude{n}"].user_data.col, DA[rv][f"amplitude{n}"].prior_str, DA[rv][f"lengthscale{n}"].prior_str, DA[rv][f"h3{n}"].prior_str, DA[rv][f"h4{n}"].prior_str, DA[rv][f"h5{n}"].prior_str, DA[rv][f"amplitude{n}"].user_data.errcol]) t = txtfmt.format(*val) _print_gp += t if self._sameRVgp.flag or equal_allrvgp: #dont print the other GPs if same_GP is True or all break print(_print_gp, file=file) class _param_obj(): def __init__(self,to_fit,start_value,step_size, prior, prior_mean, prior_width_lo, prior_width_hi, bounds_lo, bounds_hi,user_input=None,user_data=None,prior_str=None): """ convenience class to create a parameter object with the following Attributes Parameters ----------- to_fit : str; 'y' or 'n' to fit or not fit the parameter. start_value : float; starting value for the parameter. step_size : float; step size for the parameter. prior : str; 'n' or 'p' to not use (n) or use (p) a normal prior. prior_mean : float; mean of the normal prior. prior_width_lo : float; lower sigma of the normal prior. prior_width_hi : float; upper sigma of the normal prior. bounds_lo : float; lower bound for the parameter. bounds_hi : float; upper bound for the parameter. user_data : any; any data to be stored in the parameter object. user_info: tuple, int, float: stores prior input given by the user prior_str : str; string representation of the prior for printing. Returns ----------- param_obj : object; object with the parameters. """ self.to_fit = to_fit if (to_fit in ["n","y"]) else _raise(ValueError, "to_fit (to_fit) must be 'n' or 'y'") self.start_value = start_value self.step_size = step_size self.prior = prior if (prior in ["n","p"]) else _raise(ValueError, "prior (prior) must be 'n' or 'p'") self.prior_mean = prior_mean self.prior_width_lo = prior_width_lo self.prior_width_hi = prior_width_hi self.bounds_lo = bounds_lo self.bounds_hi = bounds_hi self.user_input = user_input self.user_data = user_data self.prior_str = prior_str @classmethod def from_tuple(cls, param_in, step=None,lo=None, hi=None, user_input=None,user_data=None,prior_str=None,func_call=""): """ alternative method to initialize _param_obj using from a tuple. * if int/float is given Returns (to_fit="n",start_value=param_in,step_size=0,prior="n",prior_mean=param_in,prior_width_lo=0,prior_width_hi=0,bounds_lo=0,bounds_hi=0,user_input=None) * if tuple of len 2 it Returns (to_fit="y",start_value=param_in[0],step_size=0.1*param_in[1],prior="p",prior_mean=param_in[0],prior_width_lo=param_in[1],prior_width_hi=param_in[1],bounds_lo=param_in[0]-10*param_in[1],bounds_hi=param_in[0]+10*param_in[1],user_input=None) * if tuple of len 3 it Returns (to_fit="y",start_value=param_in[0],step_size=0.001*np.ptp(param_in),prior="n",prior_mean=param_in[0],prior_width_lo=0,prior_width_hi=0,bounds_lo=param_in[0],bounds_hi=param_in[2],user_input=None) * if tuple of len 4 it Returns (to_fit="y",start_value=param_in[2],step_size=0.1*param_in[3],prior="p",prior_mean=param_in[2],prior_width_lo=param_in[3],prior_width_hi=param_in[3],bounds_lo=param_in[0],bounds_hi=param_in[1],user_input=None) Parameters ----------- param_in : int, float,tuple,None; input float/tuple with the parameters for the object. step : float,None; step size for the parameter to override the default value derived from param lo : float,None; lower bound for the parameter to override the default value derived from param_in hi : float; upper bound for the parameter to override the default value derived from param_in user_input; tuple, int, float: stores prior input given by the user user_data : any; any data to be stored in the parameter object. func_call : str; name of the function calling this method, to be used in error messages. Returns ----------- param_obj : object; object with the parameters. Example ----------- >>> RpRs = (0.1,0.002) >>> param_obj = _param_obj.from_tuple(RpRs,func_call="planet_parameters():") """ assert isinstance(func_call,str),f"_param_obj.from_tuple() func_call must be a string but {func_call} given." v = param_in if isinstance(v, (int, float)): #fixed parameter params = ["n",v,0.,"n",v,0.,0.,0.,0.,v,user_data,f'F({v})'] elif isinstance(v, tuple): if len(v)==2: #normal prior assert v[1]>0,f"{func_call} wrongly defined normal prior. must be of form (mean,std) with std>0 but {v} given." step = 0.1*v[1] if step==None else step lo_lim = v[0]-10*v[1] if lo==None else lo hi_lim = v[0]+10*v[1] if hi==None else hi params = ["y",v[0],step,"p",v[0],v[1],v[1],lo_lim,hi_lim,v,user_data,f'N({v[0]},{v[1]})'] elif len(v)==3: #uniform prior assert v[0]<=v[1]<=v[2],f"{func_call} wrongly defined uniform prior. must be of form (min,start,max) with min<=start<=max but {v} given." step = min(0.001,0.001*np.ptp(v)) if step==None else step lo_lim = v[0] if lo==None else lo hi_lim = v[2] if hi==None else hi params = ["y",v[1],step,"n",v[1],0,0,lo_lim,hi_lim,v,user_data,f'U({v[0]},{v[1]},{v[2]})' if prior_str==None else prior_str] elif len(v)==4 and v[-1]=="LU": #loguniform prior assert v[0]<=v[1]<=v[2],f"{func_call} wrongly defined uniform prior. must be of form (min,start,max) with min<=start<=max but {v} given." step = min(0.001,0.001*np.ptp(v[:-1])) if step==None else step lo_lim = v[0] if lo==None else lo hi_lim = v[2] if hi==None else hi params = ["y",v[1],step,"n",v[1],0,0,lo_lim,hi_lim,v[:-1],user_data,f'LU({v[0]},{v[1]},{v[2]})'] elif len(v)==4: #truncated normal prior assert v[0]<=v[2]<=v[1] and v[3]>0,f"{func_call} wrongly defined trucated normal prior. must be of form (min,max,mean,std) with min<=mean<=max and std>0, but {v} given." step = 0.1*v[3] if step==None else step params = ["y",v[2],step,"p",v[2],v[3],v[3],v[0],v[1],v,user_data,f'TN({v[0]},{v[1]},{v[2]},{v[3]})'] else: raise TypeError(f"{func_call} tuple must have 2,3 or 4 elements but {v} given") elif v==None: params = ["n",None,0,"n",None,0,0,0,0,None,user_data,'None'] else: raise TypeError(f"{func_call} input must be an int, float, tuple or None") return cls(*params) def _set(self, par_list): return self.__init__(*par_list) def __repr__(self): return f"{self.__dict__}" def _get_list(self): return [p for p in self.__dict__.values()] class _text_format: PURPLE = '\033[95m' CYAN = '\033[96m' DARKCYAN = '\033[36m' BLUE = '\033[94m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' UNDERLINE = '\033[4m' END = '\033[0m' def _compare_nested_structures(obj1, obj2, ignore=[], verbose=False): """ Compare two nested structures (e.g. dictionaries, lists, etc.) for equality. """ if isinstance(obj1, dict) and isinstance(obj2, dict): if obj1.keys() != obj2.keys(): if verbose: print(f"keys differ in {set(obj1) - set(obj2)}") return False comp_bool = all(_compare_nested_structures(obj1[key], obj2[key], ignore) for key in obj1 if key not in ignore) if not comp_bool and verbose: for key in obj1: if key not in ignore: res = _compare_nested_structures(obj1[key], obj2[key], ignore) if not res: print(f"{key:25s}: {res}") return comp_bool elif isinstance(obj1, list) and isinstance(obj2, list): if len(obj1) != len(obj2): return False return all([_compare_nested_structures(item1, item2) for item1, item2 in zip(obj1, obj2)]) elif isinstance(obj1, np.ndarray) and isinstance(obj2, np.ndarray): return np.array_equal(obj1, obj2) elif isinstance(obj1, SimpleNamespace) and isinstance(obj2, SimpleNamespace): return all([_compare_nested_structures(vars(obj1)[key], vars(obj2)[key]) for key in vars(obj1) if key not in ignore]) elif isinstance(obj1, FunctionType) and isinstance(obj2, FunctionType): return (obj1.__code__.co_code == obj2.__code__.co_code and obj1.__code__.co_consts == obj2.__code__.co_consts and obj1.__code__.co_names == obj2.__code__.co_names and obj1.__code__.co_varnames == obj2.__code__.co_varnames) elif ("CONAN" in str(type(obj1))) and ("CONAN" in str(type(obj2))): return all([_compare_nested_structures(vars(obj1)[key], vars(obj2)[key]) for key in vars(obj1) if key not in ignore]) else: return obj1 == obj2 def compare_objs(obj1,obj2, ignore=[]): """ compare two objects for equality """ res = _compare_nested_structures(obj1,obj2,ignore) if res: return True else: for k,v in obj1.__dict__.items(): if k not in ignore: res = _compare_nested_structures(obj1.__dict__[k], obj2.__dict__[k], ignore) if not res: print(f"{k:25s}: {res}") return False