Source code for pygwb.statistical_checks

"""
The ``statistical_checks`` module performs various tests by plotting different quantities and saving these plots. 
This allows the user to check for consistency with expected results. Concretely, the following tests and plots
can be generated: running point estimate, running sigma, (cumulative) point estimate integrand, real and imaginary 
part of point estimate integrand, FFT of the point estimate integrand, (cumulative) sensitivity, evolution of omega 
and sigma as a function of time, omega and sigma distribution, KS test, and a linear trend analysis of omega in time. 
Furthermore, part of these plots compares the values of these quantities before and after the delta sigma cut.

For additional information on how to run the statistical checks, and interpret them, we refer the user to the dedicatedplot_
tutorials and demos, as well as the `pygwb paper <https://arxiv.org/pdf/2303.15696.pdf>`_.
"""
import json
import warnings
from os import listdir
from os.path import isfile, join
from pathlib import Path

import matplotlib
import matplotlib.pyplot as plt
import matplotlib.transforms as mt
from astropy.time import Time
from loguru import logger

matplotlib.rcParams['figure.figsize'] = (8,6)
matplotlib.rcParams['axes.grid'] = True
matplotlib.rcParams['grid.linestyle'] = ':'
matplotlib.rcParams['grid.color'] = 'grey'
matplotlib.rcParams['lines.linewidth'] = 2
matplotlib.rcParams['legend.handlelength'] = 3

from matplotlib import rc

rc('font', **{'family': 'serif', 'serif': ['Computer Modern']})
# FIXME - changed to get workflow running
rc('text', usetex=False)

import numpy as np
import seaborn as sns
from scipy import integrate, stats
from scipy.optimize import curve_fit

from pygwb.baseline import Baseline
from pygwb.notch import StochNotchList
from pygwb.parameters import Parameters
from pygwb.util import StatKS, calc_bias, effective_welch_averages, get_window_tuple


[docs]class StatisticalChecks:
[docs] def __init__( self, sliding_times_all, sliding_omega_all, sliding_sigmas_all, naive_sigmas_all, coherence_spectrum, coherence_n_segs, point_estimate_spectrum, sigma_spectrum, frequencies, badGPSTimes, delta_sigmas, plot_dir, baseline_name, param_file, frequency_mask = None, coherence_far = 1.0, gates_ifo1 = None, gates_ifo2 = None, file_tag = None, legend_fontsize = 16, convention = 'pygwb', seaborn_palette = 'tab10', ): """ Instantiate a StatisticalChecks object. Parameters ========== sliding_times_all: ``array_like`` Array of GPS times before the bad GPS times from the delta sigma cut are applied. sliding_omega_all: ``array_like`` Array of sliding omegas before the bad GPS times from the delta sigma cut are applied. sliding_sigmas_all: ``array_like`` Array of sliding sigmas before the bad GPS times from the delta sigma cut are applied. naive_sigmas_all: ``array_like`` Array of naive sigmas before the bad GPS times from the delta sigma cut are applied. coherence_spectrum: ``array_like`` Array containing a coherence spectrum. Each entry in this array corresponds to the 2-detector coherence spectrum evaluated at the corresponding frequency in the frequencies array. coherence_n_segs: ``int`` Number of segments used for coherence calculation. point_estimate_spectrum: ``array_like`` Array containing the point estimate spectrum. Each entry in this array corresponds to the point estimate spectrum evaluated at the corresponding frequency in the frequencies array. sigma_spectrum: ``array_like`` Array containing the sigma spectrum. Each entry in this array corresponds to the sigma spectrum evaluated at the corresponding frequency in the frequencies array. frequencies: ``array_like`` Array containing the frequencies. badGPStimes: ``array_like`` Array of bad GPS times, i.e. times that do not pass the delta sigma cut. delta_sigmas: ``array_like`` Array containing the value of delta sigma for all times in sliding_times_all. plot_dir: ``str`` String with the path to which the output of the statistical checks (various plots) will be saved. baseline_name: ``str`` Name of the baseline under consideration. param_file: ``str`` String with path to the file containing the parameters that were used for the analysis run. frequency_mask: ``array_like`` Boolean mask applied to the specrtra in broad-band analyses. coherence_far: ``float`` Target false alarm rate for number of frequency bins in the coherence spectrum exceeding the coherence threshold. gates_ifo1/gates_ifo2: ``list`` List of gates applied to interferometer 1/2. file_tag: ``str`` Tag to be used in file naming convention. legend_fontsize: ``int`` Font size for plot legends. Default is 16. All other fonts are scaled to this font. """ self.params = Parameters() self.params.update_from_file(param_file) self.sliding_times_all = sliding_times_all self.days_all = (sliding_times_all - sliding_times_all[0]) / 86400.0 self.sliding_omega_all = sliding_omega_all self.sliding_sigmas_all = sliding_sigmas_all self.naive_sigmas_all = naive_sigmas_all self.badGPStimes = badGPSTimes self.delta_sigmas_all = delta_sigmas self.sliding_deviate_all = ( self.sliding_omega_all - np.nanmean(self.sliding_omega_all) ) / self.sliding_sigmas_all self.gates_ifo1 = gates_ifo1 self.gates_ifo2 = gates_ifo2 self.frequencies = frequencies if frequency_mask is not None: self.frequency_mask = frequency_mask else: self.frequency_mask = True self.coherence_spectrum = coherence_spectrum self.coherence_far = coherence_far if coherence_n_segs is not None: # FFT length in seconds fftlength = int(1.0 / (self.frequencies[1] - self.frequencies[0])) # FFT number of samples nFFT = int(fftlength*self.params.new_sample_rate) # Number of samples in a segment used for calculating PSD nSamples = int(self.params.segment_duration*self.params.new_sample_rate) window_tuple = get_window_tuple(self.params.window_fft_dict_welch) # Total number of samples included in the all coherence segments combined. This is only approximate # since the coherences are combined across several discrete science segments. A more accurate count # could be obtained by saving this quantity for each science segment. N_tot = int(nSamples + (coherence_n_segs - 1)*(1 - self.params.overlap_factor)*nSamples) # Total number of effective segments - need to carefully check how many independent segments # there should be. self.n_segs = effective_welch_averages(N_tot, nFFT, window_tuple, self.params.overlap_factor_welch) # The old method, gives a slightly larger number of segments # self.n_segs = coherence_n_segs*(1.-self.params.overlap_factor) # * int(np.floor(self.params.segment_duration/(fftlength*(1.-self.params.overlap_factor_welch)))-1) if self.params.coarse_grain_csd: # Note: this breaks down when self.params.segment_duration/fftlength < 3 self.n_segs = coherence_n_segs*(1.-self.params.overlap_factor) * int(np.floor(self.params.segment_duration/(fftlength))) self.n_segs_statement = r"The number of segments is" + f" {self.n_segs}." self.sigma_spectrum = sigma_spectrum self.point_estimate_spectrum = point_estimate_spectrum self.plot_dir = Path(plot_dir) self.baseline_name = baseline_name self.segment_duration = self.params.segment_duration self.deltaF = self.params.frequency_resolution self.new_sample_rate = self.params.new_sample_rate self.deltaT = 1.0 / self.new_sample_rate self.fref = self.params.fref self.flow = self.params.flow self.fhigh = self.params.fhigh self.alpha = self.params.alpha ( self.sliding_times_cut, self.days_cut, self.sliding_omega_cut, self.sliding_sigma_cut, self.naive_sigma_cut, self.delta_sigmas_cut, self.sliding_deviate_cut, self.sliding_deviate_KS, ) = self.get_data_after_dsc() self.dsc_percent = (len(self.sliding_times_all) - len(self.sliding_times_cut))/len(self.sliding_times_all) * 100 self.dsc_statement = r"The $\Delta\sigma$ cut removed" + f"{float(f'{self.dsc_percent:.2g}'):g}% of the data." tot_tot_segs = ((sliding_times_all - sliding_times_all[0])[-1])/(self.params.segment_duration*(1-self.params.overlap_factor)) self.percent_obs_segs = len(self.naive_sigmas_all)/tot_tot_segs * 100 ( self.running_pt_estimate, self.running_sigmas, ) = self.compute_running_quantities() t0_gps = Time(self.sliding_times_all[0], format='gps') t0 = Time(t0_gps, format='iso', scale='utc', precision=0, out_subfmt='date_hm') tf_gps = Time(self.sliding_times_all[-1], format='gps') tf = Time(tf_gps, format='iso', scale='utc', precision=0, out_subfmt='date_hm') self.time_tag = f"{t0}"+" $-$ "+f"{tf}" if file_tag: self.file_tag = file_tag else: self.file_tag = f"{int(t0_gps.value)}-{int(tf_gps.value)}" self.legend_fontsize = legend_fontsize self.axes_labelsize = legend_fontsize + 2 self.title_fontsize = legend_fontsize + 4 self.annotate_fontsize = legend_fontsize - 4 ## convention: stochmon if convention == 'stochmon': self.days_all = self.days_all*24 + t0.ymdhms.hour + t0.ymdhms.minute/60 self.days_cut = self.days_cut*24 + t0.ymdhms.hour + t0.ymdhms.minute/60 self.xaxis = f"Hours since {t0}" else: self.xaxis = f"Days since {t0}" self.sea = sns.color_palette(seaborn_palette)
[docs] def get_data_after_dsc(self): """ Function that returns the GPS times, the sliding omegas, the sliding sigmas, the naive sigmas, the delta sigmas and the sliding deviates after the bad GPS times from the delta sigma cut were applied. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.sliding_times_all`). Returns ======= sliding_times_cut: ``array_like`` Array of GPS times after the bad GPS times were applied. days_cut: ``array_like`` Array of days after the bad GPS times were applied. sliding_omega_cut: ``array_like`` Array of the sliding omega values after the bad GPS times were applied. sliding_sigma_cut: ``array_like`` Array of sliding sigmas after the bad GPS times were applied. naive_sigma_cut: ``array_like`` Array of naive sigmas after the bad GPS times were applied. delta_sigma_cut: ``array_like`` Array of the delta sigma values after the bad GPS times were applied. sliding_deviate_cut: ``array_like`` Array of the deviates after the bad GPS times were applied. """ bad_gps_times = self.badGPStimes bad_gps_mask = np.array([(t not in bad_gps_times) for t in self.sliding_times_all]) bad_gps_mask[~np.isfinite(self.sliding_omega_all)] = False sliding_times_cut = self.sliding_times_all.copy() days_cut = self.days_all.copy() sliding_omega_cut = self.sliding_omega_all.copy() sliding_sigma_cut = self.sliding_sigmas_all.copy() naive_sigma_cut = self.naive_sigmas_all.copy() delta_sigma_cut = self.delta_sigmas_all.copy() sliding_deviate_cut = self.sliding_deviate_all.copy() sliding_times_cut = sliding_times_cut[bad_gps_mask] days_cut = days_cut[bad_gps_mask] sliding_omega_cut = sliding_omega_cut[bad_gps_mask] sliding_sigma_cut = sliding_sigma_cut[bad_gps_mask] naive_sigma_cut = naive_sigma_cut[bad_gps_mask] delta_sigma_cut = delta_sigma_cut[bad_gps_mask] sliding_deviate_cut = ( sliding_omega_cut - np.nanmean(self.sliding_omega_all) ) / sliding_sigma_cut sliding_deviate_KS = ( sliding_omega_cut - np.nanmean(sliding_omega_cut) ) / sliding_sigma_cut return ( sliding_times_cut, days_cut, sliding_omega_cut, sliding_sigma_cut, naive_sigma_cut, delta_sigma_cut, sliding_deviate_cut, sliding_deviate_KS, )
[docs] def compute_running_quantities(self): """ Function that computes the running point estimate and running sigmas from the sliding point estimate and sliding sigmas. This is done only for the values after the delta sigma cut. This method does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.sliding_sigma_cut`). Returns ======= running_pt_estimate: ``array_like`` Array containing the values of the running point estimate. running_sigmas: ``array_like`` Array containing the values of the running sigmas. """ running_pt_estimate = self.sliding_omega_cut.copy() running_sigmas = self.sliding_sigma_cut.copy() ii = 0 while ii < self.sliding_times_cut.shape[0] - 1: ii += 1 numerator = np.nansum([running_pt_estimate[ii - 1] / ( running_sigmas[ii - 1] ** 2 ) , self.sliding_omega_cut[ii] / (self.sliding_sigma_cut[ii] ** 2)]) denominator = np.nansum([1.0 / (running_sigmas[ii - 1] ** 2) , 1 / ( self.sliding_sigma_cut[ii] ** 2 )]) running_pt_estimate[ii] = numerator / denominator running_sigmas[ii] = np.sqrt(1.0 / denominator) return running_pt_estimate, running_sigmas
[docs] def compute_ifft_integrand(self): """ Function that computes the inverse Fourier transform of the point estimate integrand. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.point_estimate_integrand`). Returns ======= t_array: ``array_like`` Array containing the time lag values (in seconds). omega_t: ``array_like`` Array containing the See also -------- numpy.fft.fft More information `here <https://numpy.org/doc/stable/reference/generated/numpy.fft.fft.html>`_. """ numFreqs = self.point_estimate_spectrum.shape[0] fhigh = self.flow + self.deltaF * numFreqs fNyq = 1 / (2 * self.deltaT) numFreqs_pre = np.floor(self.flow / self.deltaF) - 1 f_pre = self.deltaF * np.arange(1, numFreqs_pre + 1) numFreqs_post = np.floor((fNyq - fhigh) / self.deltaF) f_post = fhigh + self.deltaF * np.arange(0, numFreqs_post) fp = np.concatenate((f_pre, self.frequencies, f_post)) fn = -np.flipud(fp) f_tot = np.concatenate((fn, np.array([0]), fp)) integrand_pre = np.zeros(int(numFreqs_pre)) integrand_post = np.zeros(int(numFreqs_post)) integrand_p = np.concatenate( (integrand_pre, 0.5 * self.point_estimate_spectrum, integrand_post) ) integrand_n = np.flipud(np.conj(integrand_p)) integrand_tot = np.concatenate((np.array([0]), integrand_p, integrand_n)) fft_integrand = np.fft.fftshift(np.fft.fft(self.deltaF * integrand_tot)) t_array = np.arange( -1.0 / (2 * self.deltaF) + self.deltaT, 1.0 / (2 * self.deltaF), self.deltaT ) # Take the real part to eliminate residual imaginary parts omega_t = np.real(np.flipud(fft_integrand)) return t_array, omega_t
[docs] def plot_running_point_estimate(self, ymin=None, ymax=None): """ Generates and saves a plot of the running point estimate. The plotted values are the ones after the delta sigma cut. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.days_cut`). Parameters ======= ymin: ``array_like`` Minimum value on the y-axis. ymax: ``array_like`` Maximum value on the y-axis. """ if self.days_cut.size==0: return fig = plt.figure(figsize=(10, 8)) plt.plot( self.days_cut, self.running_pt_estimate, ".", color="black", markersize=2, label=self.baseline_name, ) plt.plot( self.days_cut, self.running_pt_estimate + 1.65 * self.running_sigmas, ".", color=self.sea[2], markersize=2, ) plt.plot( self.days_cut, self.running_pt_estimate - 1.65 * self.running_sigmas, ".", color=self.sea[0], markersize=2, ) plt.grid(True) plt.xlim(self.days_cut[0], self.days_cut[-1]) if ymin and ymax: plt.ylim(ymin, ymax) plt.xlabel(self.xaxis, size=self.axes_labelsize) plt.ylabel(r"Point estimate $\pm 1.65 \sigma$", size=self.axes_labelsize) plt.xticks(fontsize=self.legend_fontsize) plt.yticks(fontsize=self.legend_fontsize) plt.annotate( f"baseline time: {float(f'{self.percent_obs_segs:.2g}'):g}%", xy=(0.5, 0.9), xycoords="axes fraction", size = self.annotate_fontsize, bbox=dict(boxstyle="round", facecolor="white", alpha=1), ) plt.title(f'Running point estimate in {self.time_tag}', fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-running_point_estimate.png", bbox_inches='tight' ) plt.close()
[docs] def plot_running_sigma(self): """ Generates and saves a plot of the running sigma. The plotted values are the ones after the delta sigma cut. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.days_cut`). """ if self.days_cut.size==0: return fig = plt.figure(figsize=(10, 8)) plt.plot( self.days_cut, self.running_sigmas, '.', markersize=2, color=self.sea[0], label=self.baseline_name ) plt.grid(True) plt.yscale("log") plt.xlim(self.days_cut[0], self.days_cut[-1]) plt.xlabel(self.xaxis, size=self.axes_labelsize) plt.ylabel(r"$\sigma$", size=self.axes_labelsize) plt.xticks(fontsize=self.legend_fontsize) plt.yticks(fontsize=self.legend_fontsize) plt.title(r'Running $\sigma$ ' + f'in {self.time_tag}', fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-running_sigma.png", bbox_inches = 'tight' ) plt.close()
[docs] def plot_IFFT_point_estimate_integrand(self): """ Generates and saves a plot of the IFFT of the point estimate integrand. The IFFT of the point estimate integrand is computed using the method "compute_ifft_integrand". This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.point_estimate_integrand`). """ t_array, omega_array = self.compute_ifft_integrand() if len(t_array) != len(omega_array): warnings.warn("Times and Omega arrays don't match in the IFFT. No plot could be generated. Investigation is highly recommended.") return fig = plt.figure(figsize=(10, 8)) plt.plot(t_array, omega_array, color=self.sea[0], label=self.baseline_name) plt.grid(True) plt.xlim(t_array[0], t_array[-1]) plt.xlabel("Lag (s)", size=self.axes_labelsize) plt.ylabel(r"$\Omega$ integrand IFFT", size=self.axes_labelsize) plt.xticks(fontsize=self.legend_fontsize) plt.yticks(fontsize=self.legend_fontsize) plt.title(r"$\Omega$ integrand IFFT" + f" in {self.time_tag}", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-IFFT_point_estimate_integrand.png", bbox_inches='tight' ) plt.close()
[docs] def plot_SNR_spectrum(self): """ Generates and saves a plot of the point estimate integrand. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.point_estimate_integrand`). """ if np.isnan(self.point_estimate_spectrum).all() or not np.real(self.point_estimate_spectrum).any(): return fig, axs = plt.subplots(figsize=(10, 8)) axs.semilogy( self.frequencies, abs(self.point_estimate_spectrum / self.sigma_spectrum), color=self.sea[0], ) trans = mt.blended_transform_factory(axs.transData, axs.transAxes) axs.vlines(self.frequencies[~self.frequency_mask], ymin=0, ymax=1, linewidth=1, linestyle=':', color='black', alpha=0.5, transform=trans) axs.set_xlabel("Frequency (Hz)", size=self.axes_labelsize) axs.set_ylabel(r"$|{\rm SNR}(f)|$", size=self.axes_labelsize) plt.xticks(fontsize=self.legend_fontsize) plt.yticks(fontsize=self.legend_fontsize) axs.set_xscale("log") plt.title(f"|SNR| in {self.time_tag}", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-abs_point_estimate_integrand.png", bbox_inches="tight", ) plt.close()
[docs] def plot_cumulative_SNR_spectrum(self): """ Generates and saves a plot of the cumulative point estimate integrand. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.point_estimate_integrand`). """ pt_est_cumul = self.point_estimate_spectrum.copy() pt_est_cumul[~self.frequency_mask] = 0 cum_pt_estimate = integrate.cumtrapz( np.abs(pt_est_cumul/ self.sigma_spectrum), self.frequencies ) cum_pt_estimate = cum_pt_estimate / cum_pt_estimate[-1] fig, axs = plt.subplots(figsize=(10, 8)) axs.plot(self.frequencies[:-1], cum_pt_estimate, color=self.sea[0]) axs.set_xlabel("Frequency (Hz)", size=self.axes_labelsize) axs.set_ylabel(r"Cumulative $|{\rm SNR}(f)|$", size=self.axes_labelsize) axs.set_xscale("log") plt.xticks(fontsize=self.legend_fontsize) plt.yticks(fontsize=self.legend_fontsize) plt.title(f"Cumulative SNR in {self.time_tag}", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-cumulative_SNR_spectrum.png", bbox_inches="tight", ) plt.close()
[docs] def plot_real_SNR_spectrum(self): """ Generates and saves a plot of the real part of the SNR spectrum. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.point_estimate_spectrum` and `self.sigma_spectrum`). """ fig, axs = plt.subplots(figsize=(10, 8)) axs.plot( self.frequencies, np.real(self.point_estimate_spectrum / self.sigma_spectrum), color=self.sea[0], ) trans = mt.blended_transform_factory(axs.transData, axs.transAxes) axs.vlines(self.frequencies[~self.frequency_mask], ymin=0, ymax=1, linewidth=1, linestyle=':', color='black', alpha=0.5, transform=trans) axs.set_xlabel("Frequency (Hz)", size=self.axes_labelsize) axs.set_ylabel(r"Re$({\rm SNR}(f))$", size=self.axes_labelsize) axs.set_xscale("log") plt.xticks(fontsize=self.legend_fontsize) plt.yticks(fontsize=self.legend_fontsize) plt.title(f"Real SNR in {self.time_tag}", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-real_SNR_spectrum.png", bbox_inches="tight", ) plt.close()
[docs] def plot_imag_SNR_spectrum(self): """ Generates and saves a plot of the imaginary part of the SNR spectrum. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.point_estimate_spectrum` and `self.sigma_spectrum`). """ fig, axs = plt.subplots(figsize=(10, 8)) axs.plot( self.frequencies, np.imag(self.point_estimate_spectrum / self.sigma_spectrum), color=self.sea[0], ) trans = mt.blended_transform_factory(axs.transData, axs.transAxes) axs.vlines(self.frequencies[~self.frequency_mask], ymin=0, ymax=1, linewidth=1, linestyle=':', color='black', alpha=0.5, transform=trans) axs.set_xlabel("Frequency (Hz)", size=self.axes_labelsize) axs.set_ylabel(r"Im$({\rm SNR}(f))$", size=self.axes_labelsize) axs.set_xscale("log") plt.xticks(fontsize=self.legend_fontsize) plt.yticks(fontsize=self.legend_fontsize) plt.title(f"Imaginary SNR in {self.time_tag}", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-imag_SNR_spectrum.png", bbox_inches="tight", ) plt.close()
[docs] def plot_sigma_spectrum(self): """ Generates and saves a plot of the sigma spectrum. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.sigma_spectrum`). """ if np.isinf(self.sigma_spectrum).all() or not np.real(self.point_estimate_spectrum).any(): return fig, axs = plt.subplots(figsize=(10, 8)) axs.plot(self.frequencies, self.sigma_spectrum, color=self.sea[0]) trans = mt.blended_transform_factory(axs.transData, axs.transAxes) axs.vlines(self.frequencies[~self.frequency_mask], ymin=0, ymax=1, linewidth=1, linestyle=':', color='black', alpha=0.5, transform=trans) axs.set_xlabel("Frequency (Hz)", size=self.axes_labelsize) axs.set_ylabel(r"$\sigma(f)$", size=self.axes_labelsize) axs.set_xscale("log") axs.set_yscale("log") plt.xticks(fontsize=self.legend_fontsize) plt.yticks(fontsize=self.legend_fontsize) plt.title(r"Total $\sigma$ spectrum" + f" in {self.time_tag}", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-sigma_spectrum.png", bbox_inches="tight", ) plt.close()
[docs] def coherence_pdf(self, gamma): """ Theoretical pdf of coherences assuming Gaussian noise Parameters ========== gamma: ``array_like`` Array of coherence values Returns ========== coherence_pdf: ``array_like`` Value of PDF at each gamma """ return (self.n_segs - 1) * (1 - gamma)**(self.n_segs - 2)
[docs] def coherence_pvalue(self, gamma): """ Upper tail p-value of the given coherences assuming Gaussian noise Parameters ========== gamma: ``array_like`` Array of coherence values Returns ========== coherence_pvalue: ``array_like`` p-value of each gamma """ return (1 - gamma)**(self.n_segs - 1)
[docs] def coherence_threshold(self): """ Returns the coherence threshold corresponding to the given FAR. """ threshold = 1 - (self.coherence_far/len(self.frequencies))**(1/(self.n_segs - 1)) return threshold
[docs] def plot_coherence_spectrum(self, flow=None, fhigh=None): """ Generates and saves a plot of the coherence spectrum, if present. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `coherence_spectrum`). """ if self.coherence_spectrum is None or self.coherence_spectrum.size==1: return flow = flow or self.flow fhigh = fhigh or self.fhigh threshold = self.coherence_threshold() plt.figure(figsize=(10, 8)) plt.plot(self.frequencies, self.coherence_spectrum, color=self.sea[0]) # Plot a reference line representing the mean of the theoretical coherence plt.axhline(y=1./self.n_segs,dashes=(4,3),color='black') # Plot a line representing the coherence threshold plt.axhline(y=threshold,dashes=(4,3),color='red') plt.xlim(flow, fhigh) plt.xlabel("Frequency (Hz)", size=self.axes_labelsize) plt.ylabel(r"Coherence", size=self.axes_labelsize) plt.xscale("log") plt.yscale("log") plt.xticks(fontsize=self.legend_fontsize) plt.yticks(fontsize=self.legend_fontsize) plt.annotate( f"{self.params.channel}, threshold $\gamma = ${threshold:.3f}", xy=(0.01, 0.03), xycoords="axes fraction", size = self.annotate_fontsize, bbox=dict(boxstyle="round", facecolor="white", alpha=1), ) plt.title(r"Coherence ($\Delta f$ = " + f"{float(f'{self.deltaF:.5g}'):g} Hz) in {self.time_tag}", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-coherence_spectrum.png", bbox_inches="tight", ) plt.close() plt.figure(figsize=(10, 8)) plt.plot(self.frequencies, self.coherence_spectrum, color=self.sea[0]) # Plot a reference line representing the mean of the theoretical coherence plt.axhline(y=1./self.n_segs,dashes=(4,3),color='black') # Plot a line representing the coherence threshold plt.axhline(y=threshold,dashes=(4,3),color='red') plt.xlim(flow, 200) plt.xlabel("Frequency (Hz)", size=self.axes_labelsize) plt.ylabel(r"Coherence", size=self.axes_labelsize) plt.yscale("log") plt.xticks(fontsize=self.legend_fontsize) plt.yticks(fontsize=self.legend_fontsize) plt.annotate( f"{self.params.channel}, threshold $\gamma =$ {threshold:.3f}", xy=(0.01, 0.03), xycoords="axes fraction", size = self.annotate_fontsize, bbox=dict(boxstyle="round", facecolor="white", alpha=1), ) plt.title(r"Coherence ($\Delta f$ = " + f"{float(f'{self.deltaF:.5g}'):g} Hz) in {self.time_tag}", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-coherence_spectrum_zoom.png", bbox_inches="tight", ) plt.close()
[docs] def plot_hist_coherence(self,total_bins = None): r""" Generates and saves a histogram of the coherence distribution. The plot shows the data after the delta-sigma cut (bad GPS times) was applied. This function does not require any input parameters, as it accesses the data through the attributes of the class. Furthermore, it also saves a text file which contains the frequencies at which outliers of the coherence distribution were identified, i.e. spectral artefacts. """ if self.coherence_spectrum is None or self.coherence_spectrum.size==1: return coherence = self.coherence_spectrum coherence_clipped = np.ones(len(coherence)) # For zoomed plots, coherence is clipped at 50 times the theoretical mean clip_val = 50*(1/self.n_segs) for i in range(len(coherence_clipped)): if coherence[i] >= clip_val: coherence_clipped[i] = clip_val else: coherence_clipped[i] = coherence[i] frequencies = self.frequencies if total_bins is None: total_bins = 250 # Bins are chosen so that the highest coherence value is the centre of the last bin upper_edge = max(coherence)*total_bins/(total_bins - 0.5) bins = np.linspace(0, upper_edge, total_bins, endpoint=True) delta_coherence = bins[1] - bins[0] upper_edge_clipped = max(coherence_clipped)*total_bins/(total_bins - 0.5) bins_clipped = np.linspace(0, upper_edge_clipped, total_bins, endpoint=True) delta_coherence_clipped = bins_clipped[1] - bins_clipped[0] # Note that the number of frequencies should be equal to the total number of counts # for the un-notched coherences, but is different to the total number of counts # for notched coherences. Care may be needed when comparing predicted counts # with the notched coherence histogram. n_frequencies = len(frequencies) resolution = frequencies[1] - frequencies[0] coherence_notched = coherence[self.frequency_mask] coherence_notched_clipped = coherence_clipped[self.frequency_mask] # Coherence values aligned with bin centres up to twice the max value of the coherence coherence_highres = np.linspace(delta_coherence/2, 2*upper_edge + delta_coherence/2, num=2*total_bins, endpoint=True) # Theoretical pdf of coherences assuming Gaussian noise predicted_highres = self.coherence_pdf(coherence_highres) # Threshold to give a false-alarm rate of FAR frequency bins per coherence spectrum threshold = self.coherence_threshold() fig, axs = plt.subplots(nrows=1, ncols=1, figsize=(10, 8)) axs.hist( coherence, bins, color=self.sea[3], ec="k", lw = 0.1, zorder=1, density = True, label = 'Before notching', ) axs.hist( coherence_notched, bins, color=self.sea[0], ec="k", lw = 0.1, zorder=2, density = True, label = 'After notching', ) axs.plot( coherence_highres, predicted_highres, color=self.sea[1], zorder=3, alpha = 0.8, label="Predicted", ) axs.axvline( np.abs(threshold), zorder=4, color=self.sea[8], linestyle='dashed', label=f"Threshold ($\\gamma = ${threshold:.3f})", ) axs.set_xlabel(r"Coherence", size=self.axes_labelsize) axs.set_ylabel(r"Probability distribution", size=self.axes_labelsize) axs.legend(fontsize=self.legend_fontsize) axs.set_yscale("log") max_coh = max(np.append(coherence, threshold)) # Go up to nearest 10th axs.set_xlim(0, np.ceil(10*max_coh)/10) axs.set_ylim(10**np.floor(np.log10(100.0/n_frequencies)), 10**np.ceil(np.log10(predicted_highres[0]))) axs.tick_params(axis="x", labelsize=self.legend_fontsize) axs.tick_params(axis="y", labelsize=self.legend_fontsize) plt.title(r"Coherence hist ($\Delta f$ = " + f"{resolution:.5f} Hz) in" f" {self.time_tag}", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-histogram_coherence.png", bbox_inches = 'tight' ) plt.close() fig, axs = plt.subplots(nrows=1, ncols=1, figsize=(10, 8)) axs.hist( coherence_clipped, bins_clipped, color=self.sea[3], ec="k", lw = 0.1, zorder=1, density = True, label = 'Before notching', ) axs.hist( coherence_notched_clipped, bins_clipped, color=self.sea[0], ec="k", lw = 0.1, zorder=2, density = True, label = 'After notching', ) axs.plot( coherence_highres, predicted_highres, color=self.sea[1], zorder=3, alpha = 0.8, label="Predicted", ) axs.axvline( np.abs(threshold), zorder=4, color=self.sea[8], linestyle='dashed', label=f"Threshold ($\\gamma = ${threshold:.3f})", ) axs.set_xlabel(r"Coherence", size=self.axes_labelsize) axs.set_ylabel(r"Probability distribution", size=self.axes_labelsize) axs.legend(fontsize=self.legend_fontsize) axs.set_yscale("log") max_coh = max(np.append(coherence_clipped, threshold)) # For zoomed plot, go up to nearest 100th axs.set_xlim(0, np.ceil(100*max_coh)/100) axs.set_ylim(10**np.floor(np.log10(100.0/n_frequencies)), 10**np.ceil(np.log10(predicted_highres[0]))) axs.tick_params(axis="x", labelsize=self.legend_fontsize) axs.tick_params(axis="y", labelsize=self.legend_fontsize) plt.title(r"Coherence hist (zoomed) ($\Delta f$ = " + f"{resolution:.5f} Hz) in" f" {self.time_tag}", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-histogram_coherence_zoom.png", bbox_inches = 'tight' ) plt.close() outlier_coherence = [] for i in range(len(coherence)): if (coherence[i] > np.abs(threshold) and self.frequency_mask[i] == True): try: outlier_coherence.append((i, frequencies[i], coherence[i], n_frequencies*self.coherence_pvalue(coherence[i]))) except IndexError as err: warnings.warn( '\n In outlier_coherence, Frequency now is %f, and coherence is %f, which is out of the boundary 1, please check it' %(frequencies[i],coherence[i]) ) outlier_coherence_notched.append((frequencies[i], coherence[i],'nan')) outlier_coherence_notched = [] for i in range(len(coherence)): if (coherence[i] > np.abs(threshold) and self.frequency_mask[i] == False): try: outlier_coherence_notched.append((i, frequencies[i], coherence[i], n_frequencies*self.coherence_pvalue(coherence[i]))) except IndexError as err: warnings.warn( '\n In outlier_coherence_notched, Frequency now is %f, and coherence is %f, which is out of the boundary 1, please check it' %(frequencies[i],coherence[i]) ) outlier_coherence_notched.append((frequencies[i], coherence[i], 'nan')) n_outlier = len(outlier_coherence) file_name = f"{self.plot_dir / self.baseline_name}-{self.file_tag}-list_coherence_outlier.txt" with open(file_name, 'w') as f: f.write('Bin \tFrequency \tCoherence \tExpected counts above this coherence\n') for tup in outlier_coherence: f.write(f'{tup[0]}\t{tup[1]}\t{tup[2]}\t{tup[3]}\n') f.write('\n The outliers below are already included in the applied version of the notch-list\n') for tup in outlier_coherence_notched: f.write(f'{tup[0]}\t{tup[1]}\t{tup[2]}\t{tup[3]}\n')
[docs] def plot_cumulative_sensitivity(self): """ Generates and saves a plot of the cumulative sensitivity. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.sigma_spectrum`). """ if np.isinf(self.sigma_spectrum).all() or not np.real(self.point_estimate_spectrum).any(): return sigma_cumul = self.sigma_spectrum.copy() sigma_cumul[~self.frequency_mask] = np.inf cumul_sens = integrate.cumtrapz((1 / sigma_cumul ** 2), self.frequencies) cumul_sens = cumul_sens / cumul_sens[-1] plt.figure(figsize=(10, 8)) plt.plot(self.frequencies[:-1], cumul_sens, color=self.sea[0]) plt.xlabel("Frequency (Hz)", size=self.axes_labelsize) plt.ylabel("Cumulative sensitivity", size=self.axes_labelsize) plt.xscale("log") plt.xticks(fontsize=self.legend_fontsize) plt.yticks(fontsize=self.legend_fontsize) plt.title(r"$1/\sigma^2$ " + f"in {self.time_tag}", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-cumulative_sigma_spectrum.png", bbox_inches="tight", ) plt.close()
[docs] def plot_omega_sigma_in_time(self): r""" Generates and saves a panel plot with a scatter plot of :math:`\sigma` vs :math:`\Delta{\rm SNR}_i`, as well as the evolution of :math:`\Omega`, :math:`\sigma`, and :math:`(\Omega-\langle\Omega\rangle)/\sigma` as a function of the days since the start of the run. All plots show the data before and after the delta-sigma cut (bad GPS times) was applied. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.sliding_sigmas_all`). """ fig, axs = plt.subplots(nrows=3, ncols=1, figsize=(10, 15), constrained_layout=True) fig.suptitle(r"$\Omega$, $\sigma$, and" + f" SNR variations in {self.time_tag} with/out " + r"$\Delta\sigma$ cut", fontsize=self.title_fontsize) axs[0].plot(self.days_all, self.sliding_omega_all, color=self.sea[3], linewidth=1, alpha=0.5, label="All data") axs[0].plot( self.days_cut, self.sliding_omega_cut, color=self.sea[0], linewidth=1, alpha=0.5, label=r"Data after $|\Delta\sigma|/\sigma$ outlier cut", ) axs[0].plot(self.days_all, self.sliding_omega_all, '.', color=self.sea[3]) axs[0].plot( self.days_cut, self.sliding_omega_cut, '.', color=self.sea[0], ) axs[0].set_xlabel(self.xaxis, size=self.axes_labelsize) axs[0].set_ylabel(r"$\Omega$", size=self.axes_labelsize) axs[0].legend(loc="upper left", fontsize=self.legend_fontsize) axs[0].set_xlim(0, self.days_all[-1]) axs[0].tick_params(axis="x", labelsize=self.legend_fontsize) axs[0].tick_params(axis="y", labelsize=self.legend_fontsize) axs[0].yaxis.offsetText.set_fontsize(self.legend_fontsize) axs[1].plot(self.days_all, self.sliding_sigmas_all, color=self.sea[3], linewidth=1, alpha=0.5, label="All data") axs[1].plot( self.days_cut, self.sliding_sigma_cut, color=self.sea[0], linewidth=1, alpha=0.5, label=r"Data after $|\Delta\sigma|/\sigma$ outlier cut", ) axs[1].plot(self.days_all, self.sliding_sigmas_all,'.', color=self.sea[3]) axs[1].plot( self.days_cut, self.sliding_sigma_cut,'.', color=self.sea[0] ) axs[1].set_xlabel(self.xaxis, size=self.axes_labelsize) axs[1].set_ylabel(r"$\sigma$", size=self.axes_labelsize) axs[1].legend(loc="upper left", fontsize=self.legend_fontsize) axs[1].set_xlim(0, self.days_all[-1]) axs[1].set_yscale('log') axs[1].tick_params(axis="x", labelsize=self.legend_fontsize) axs[1].tick_params(axis="y", labelsize=self.legend_fontsize) axs[1].yaxis.offsetText.set_fontsize(self.legend_fontsize) axs[2].plot(self.days_all, self.sliding_deviate_all, color=self.sea[3], linewidth=1, alpha=0.5, label="All data") axs[2].plot( self.days_cut, self.sliding_deviate_cut, color=self.sea[0], linewidth=1, alpha=0.5, label=r"Data after $|\Delta\sigma|/\sigma$ outlier cut", ) axs[2].plot(self.days_all, self.sliding_deviate_all, '.', color=self.sea[3]) axs[2].plot( self.days_cut, self.sliding_deviate_cut, '.', color=self.sea[0], ) axs[2].set_xlabel(self.xaxis, size=self.axes_labelsize) axs[2].set_ylabel(r"$\Delta{\rm SNR}_i$", size=self.axes_labelsize) axs[2].legend(loc="upper left", fontsize=self.legend_fontsize) axs[2].set_xlim(0, self.days_all[-1]) axs[2].set_yscale("symlog") axs[2].tick_params(axis="x", labelsize=self.legend_fontsize) axs[2].tick_params(axis="y", labelsize=self.legend_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-omega_sigma_time.png", bbox_inches='tight' ) plt.close()
[docs] def plot_hist_sigma_dsc(self): r""" Generates and saves a panel plot with a histogram of :math:`|\Delta\sigma|/\sigma`, as well as a histogram of :math:`\sigma`. Both plots show the data before and after the delta-sigma cut (bad GPS times) was applied. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.delta_sigmas_all`). """ fig, axs = plt.subplots(nrows=2, ncols=1, figsize=(10, 14), constrained_layout=True) fig.suptitle(r"$\Delta\sigma$ and $\sigma$ distributions in" f" {self.time_tag} with/out " + r"$\Delta\sigma$ cut", fontsize=self.title_fontsize) axs[0].hist( self.delta_sigmas_all, bins=80, color=self.sea[3], ec="k", lw=0.5, label="All data", range=(0.0001, 1), ) axs[0].hist( self.delta_sigmas_cut, bins=80, color=self.sea[0], alpha = 0.6, ec="k", lw=0.5, label=r"Data after $|\Delta\sigma|/\sigma$ outlier cut", range=(0.0001, 1), ) axs[0].set_xlabel(r"$|\Delta\sigma|/\sigma$", size=self.axes_labelsize) axs[0].set_ylabel(r"count", size=self.axes_labelsize) axs[0].legend(fontsize=self.legend_fontsize) axs[0].tick_params(axis="x", labelsize=self.legend_fontsize) axs[0].tick_params(axis="y", labelsize=self.legend_fontsize) axs[0].yaxis.offsetText.set_fontsize(self.legend_fontsize) if self.sliding_sigma_cut.size==0: minx1 = min(self.sliding_sigmas_all) maxx1 = max(self.sliding_sigmas_all) else: minx1 = min(self.sliding_sigma_cut) maxx1 = max(self.sliding_sigma_cut) nx = 50 axs[1].hist( self.sliding_sigmas_all, bins=nx, color=self.sea[3], ec="k", lw=0.5, label="All data", range=(minx1, maxx1), ) axs[1].hist( self.sliding_sigma_cut, bins=nx, color=self.sea[0], alpha = 0.6, ec="k", lw=0.5, label=r"Data after $|\Delta\sigma|/\sigma$ outlier cut", range=(minx1, maxx1), ) axs[1].set_xlabel(r"$\sigma$", size=self.axes_labelsize) axs[1].set_ylabel(r"count", size=self.axes_labelsize) axs[1].legend(fontsize=self.legend_fontsize) axs[1].set_yscale("log") axs[1].tick_params(axis="x", labelsize=self.legend_fontsize) axs[1].tick_params(axis="y", labelsize=self.legend_fontsize) axs[1].xaxis.offsetText.set_fontsize(self.legend_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-histogram_sigma_dsc.png", bbox_inches='tight' ) plt.close()
[docs] def plot_scatter_sigma_dsc(self): """ Generates and saves a scatter plot of :math:`|\Delta\sigma]/\sigma` vs :math:`\sigma`. The plot shows the data before and after the delta-sigma cut (bad GPS times) was applied. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.delta_sigmas_all`). """ fig, axs = plt.subplots(nrows=1, ncols=1, figsize=(10, 8)) axs.scatter( self.delta_sigmas_all, self.naive_sigmas_all, marker=".", color=self.sea[3], label="All data", s=5, ) axs.scatter( self.delta_sigmas_cut, self.naive_sigma_cut, marker=".", color=self.sea[0], label=r"Data after $|\Delta\sigma|/\sigma$ outlier cut", s=5, ) axs.set_xlabel(r"$|\Delta\sigma|/\sigma$", size=self.axes_labelsize) axs.set_ylabel(r"$\sigma$", size=self.axes_labelsize) axs.set_yscale("log") axs.set_xscale("log") axs.legend(fontsize=self.legend_fontsize) axs.tick_params(axis="x", labelsize=self.legend_fontsize) axs.tick_params(axis="y", labelsize=self.legend_fontsize) axs.annotate( r"Data cut by $\Delta\sigma$ cut"+f": {float(f'{self.dsc_percent:.2g}'):g}%", xy=(0.05, 0.8), xycoords="axes fraction", size = self.annotate_fontsize, bbox=dict(boxstyle="round", facecolor="white", alpha=1), ) plt.title(r"$\Delta\sigma$ distribution in" f" {self.time_tag} with/out " + r"$\Delta\sigma$ cut", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-scatter_sigma_dsc.png", bbox_inches = 'tight' ) plt.close()
[docs] def plot_scatter_omega_sigma_dsc(self): r""" Generates and saves a panel plot with scatter plots of :math:`|\Delta\sigma|/\sigma` vs :math:`\Delta{\rm SNR}_i`, as well as :math:`\sigma` vs :math:`(\Omega-\langle\Omega\rangle)/\sigma`. All plots show the data before and after the delta-sigma cut (bad GPS times) was applied. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.delta_sigmas_all`). """ fig, axs = plt.subplots(nrows=2, ncols=1, figsize=(10, 13), constrained_layout=True) fig.suptitle(r"$\Delta$SNR spread" + f" in {self.time_tag} with/out " + r"$\Delta\sigma$ cut", fontsize=self.title_fontsize) if self.delta_sigmas_cut.size==0: maxx0 = max(self.delta_sigmas_all) maxx0 += maxx0 / 10.0 minx0 = min(self.delta_sigmas_all) minx0 -= minx0 / 10.0 maxy0 = np.nanmax(self.sliding_deviate_all) maxy0 += maxy0 / 10.0 miny0 = np.nanmin(self.sliding_deviate_all) miny0 -= miny0 / 10.0 maxx1 = max(self.sliding_sigmas_all) maxx1 += maxx1 / 10.0 minx1 = min(self.sliding_sigmas_all) minx1 -= minx1 / 10.0 maxy1 = max(self.sliding_deviate_all) maxy1 += maxy1 / 10.0 miny1 = min(self.sliding_deviate_all) miny1 -= miny1 / 10.0 else: maxx0 = max(self.delta_sigmas_cut) maxx0 += maxx0 / 10.0 minx0 = min(self.delta_sigmas_cut) minx0 -= minx0 / 10.0 maxy0 = np.nanmax(self.sliding_deviate_cut) maxy0 += maxy0 / 10.0 miny0 = np.nanmin(self.sliding_deviate_cut) miny0 -= miny0 / 10.0 maxx1 = max(self.sliding_sigma_cut) maxx1 += maxx1 / 10.0 minx1 = min(self.sliding_sigma_cut) minx1 -= minx1 / 10.0 maxy1 = max(self.sliding_deviate_cut) maxy1 += maxy1 / 10.0 miny1 = min(self.sliding_deviate_cut) miny1 -= miny1 / 10.0 axs[0].scatter( self.delta_sigmas_all, self.sliding_deviate_all, marker=".", color=self.sea[3], label="All data", s=3, ) axs[0].scatter( self.delta_sigmas_cut, self.sliding_deviate_cut, marker=".", color=self.sea[0], label=r"Data after $|\Delta\sigma|/\sigma$ outlier cut", s=3, ) axs[0].set_xlabel(r"$|\Delta\sigma|/\sigma$", size=self.axes_labelsize) axs[0].set_ylabel(r"$\Delta{\rm SNR}_i$", size=self.axes_labelsize) axs[0].set_xlim(minx0, maxx0) axs[0].set_ylim(miny0, maxy0) axs[0].legend(fontsize=self.legend_fontsize) axs[0].tick_params(axis="x", labelsize=self.legend_fontsize) axs[0].tick_params(axis="y", labelsize=self.legend_fontsize) axs[1].scatter( self.sliding_sigmas_all, self.sliding_deviate_all, marker=".", color=self.sea[3], label="All data", s=3, ) axs[1].scatter( self.sliding_sigma_cut, self.sliding_deviate_cut, marker=".", color=self.sea[0], label=r"Data after $|\Delta\sigma/\sigma|$ outlier cut", s=3, ) axs[1].set_xlabel(r"$\sigma$", size=self.axes_labelsize) axs[1].set_ylabel(r"$\Delta{\rm SNR}_i$", size=self.axes_labelsize) axs[1].legend(fontsize=self.legend_fontsize) axs[1].set_xlim(minx1, maxx1) axs[1].set_ylim(miny1, maxy1) axs[1].tick_params(axis="x", labelsize=self.legend_fontsize) axs[1].tick_params(axis="y", labelsize=self.legend_fontsize) axs[1].xaxis.offsetText.set_fontsize(self.legend_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-scatter_omega_sigma_dsc.png", bbox_inches = 'tight') plt.close()
[docs] def plot_hist_omega_pre_post_dsc(self): r""" Generates and saves a histogram of the :math:`\Delta{\rm SNR}_i` distribution. The plot shows the data before and after the delta-sigma cut (bad GPS times) was applied. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.sliding_deviate_all`). """ fig, axs = plt.subplots(nrows=1, ncols=1, figsize=(10, 8)) # nan-safing the histograms for good measure... bins=np.histogram(np.hstack((self.sliding_deviate_all[~np.isnan(self.sliding_deviate_all)], self.sliding_deviate_cut[~np.isnan(self.sliding_deviate_cut)])), bins=202)[1] axs.hist( self.sliding_deviate_all, bins, color=self.sea[3], ec="k", lw=0.5, label="All data", ) axs.hist( self.sliding_deviate_cut, bins, color=self.sea[0], alpha = 0.6, ec="k", lw=0.5, label=r"Data after $|\Delta\sigma|/\sigma$ outlier cut", ) axs.set_xlabel(r"$\Delta{\rm SNR}_i$", size=self.axes_labelsize) axs.set_ylabel(r"count", size=self.axes_labelsize) axs.legend(fontsize=self.legend_fontsize) axs.set_yscale("log") axs.tick_params(axis="x", labelsize=self.legend_fontsize) axs.tick_params(axis="y", labelsize=self.legend_fontsize) plt.title(r"$\Delta$SNR distribution in" f" {self.time_tag} with/out " + r"$\Delta\sigma$ cut", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-histogram_omega_dsc.png", bbox_inches = 'tight' ) plt.close()
[docs] def plot_KS_test(self, bias_factor=None): """ Generates and saves a panel plot with results of the Kolmogorov-Smirnov test for Gaussianity. The cumulative distribution of the data (after the delta-sigma (bad GPS times) cut) is compared to the one of Gaussian data, where the bias factor for the sigmas is taken into account. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.sliding_deviate_cut`). Parameters ======= bias_factor: ``float``, optional Bias factor to consider in the KS calculation. Defaults to None, in which case it computes the bias factor on the fly. See also -------- pygwb.util.calc_bias pygwb.util.StatKS """ if self.delta_sigmas_cut.size==0: return if bias_factor is None: bias_factor = calc_bias(self.segment_duration, self.deltaF, self.deltaT) dof_scale_factor = 1.0 / (1.0 + 3.0 / 35.0) lx = len(self.sliding_deviate_cut) sorted_deviates = np.sort(self.sliding_deviate_KS / bias_factor) nanmask = ~np.isnan(sorted_deviates) sorted_deviates_nansafe = sorted_deviates[nanmask] count, bins_count = np.histogram(sorted_deviates_nansafe, bins=500) pdf = count / sum(count) cdf = np.cumsum(pdf) normal = stats.norm(0, 1) normal_cdf = normal.cdf(bins_count[1:]) dks_x = max(abs(cdf - normal_cdf)) lx_eff = lx * dof_scale_factor lam = (np.sqrt(lx_eff) + 0.12 + 0.11 / np.sqrt(lx_eff)) * dks_x pval_KS = StatKS(lam) fig, axs = plt.subplots(nrows=2, ncols=1, figsize=(10, 8), constrained_layout=True) fig.suptitle(f"Kolmogorov-Smirnov test in {self.time_tag}", fontsize=self.title_fontsize) axs[0].plot(bins_count[1:], cdf, "k", label="Data") axs[0].plot( bins_count[1:], normal_cdf, color=self.sea[3], label=r"Erf with $\sigma$=" + str(round(bias_factor, 2)), ) axs[0].text( bins_count[1], 0.8, "KS-statistic: " + str(round(dks_x, 3)) + "\n" + "p-value: " + str(round(pval_KS, 3)), ) axs[0].legend(loc="lower right", fontsize=self.legend_fontsize) axs[0].tick_params(axis="x", labelsize=self.legend_fontsize) axs[0].tick_params(axis="y", labelsize=self.legend_fontsize) axs[1].plot( bins_count[1:], cdf - normal_cdf, ) axs[1].annotate( "Maximum absolute difference: " + str(round(dks_x, 3)), xy=(0.025, 0.9), xycoords="axes fraction", size = self.annotate_fontsize ) axs[1].tick_params(axis="x", labelsize=self.legend_fontsize) axs[1].tick_params(axis="y", labelsize=self.legend_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-KS_test.png", bbox_inches = 'tight' ) plt.close()
[docs] def plot_hist_sigma_squared(self, max_val = 5, total_bins=100, label_number = 6): """ Generates and saves a histogram of :math:`\sigma^2/\langle\sigma^2\rangle`. The plot shows data after the delta-sigma (bad GPS times) cut. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. self.sliding_sigma_cut). """ if self.delta_sigmas_cut.size==0: return values = 1 / np.nanmean(self.sliding_sigma_cut ** 2) * self.sliding_sigma_cut ** 2 values_clipped = np.ones(len(values)) for i in range(len(values_clipped)): if values[i] >= max_val: values_clipped[i] = max_val else: values_clipped[i] = values[i] bins = np.linspace(0, max(values_clipped), total_bins) fig, axs = plt.subplots(nrows=1, ncols=1, figsize=(10, 8)) _, bins, patches = plt.hist( values_clipped, bins=bins, color=self.sea[0], ec="k", lw=0.5, label=r"Data after $|\Delta\sigma|/\sigma$ outlier cut", ) axs.set_xlabel(r"$\sigma^2/\langle\sigma^2\rangle$", size=self.axes_labelsize) axs.set_ylabel(r"count", size=self.axes_labelsize) axs.set_yscale("log") axs.set_xlim(0, max_val) axs.legend(fontsize=self.legend_fontsize) axs.tick_params(axis="x", labelsize=self.legend_fontsize) axs.tick_params(axis="y", labelsize=self.legend_fontsize) xticks_tmp = np.linspace(0, max_val, label_number) labels = [str(i) for i in xticks_tmp] labels[-1]+="+" plt.xticks(xticks_tmp, labels) plt.title(f"Relative variance in {self.time_tag}", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-histogram_sigma_squared.png", bbox_inches = 'tight' ) plt.close()
[docs] def plot_omega_time_fit(self): """ Generates and saves a plot of :math:`\Omega` as a function of time and fits the data to perform a linear trend analysis. The plot shows data after the delta-sigma (bad GPS times) cut. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. self.sliding_omega_cut). """ if self.days_cut.size==0: return fig, axs = plt.subplots(nrows=1, ncols=1, figsize=(10, 8)) t_obs = self.days_cut[-1] scale = np.sqrt(np.var(self.sliding_omega_cut)) def func(x, a, b): return a * (x - t_obs / 2) * t_obs + b nanmask = ~np.isnan(self.sliding_omega_cut) sliding_omega_cut_nansafe = self.sliding_omega_cut[nanmask] sliding_sigma_cut_nansafe = self.sliding_sigma_cut[nanmask] popt, pcov = curve_fit( func, self.days_cut[nanmask], sliding_omega_cut_nansafe, sigma=sliding_sigma_cut_nansafe, ) c1, c2 = popt[0], popt[1] axs.plot(self.days_cut, func(self.days_cut, c1, c2), color=self.sea[3]) axs.plot(self.days_cut, self.sliding_omega_cut, '.', color=self.sea[0], markersize=1) axs.plot(self.days_cut, 3 * self.sliding_sigma_cut, color=self.sea[0], linewidth=1.5) axs.plot(self.days_cut, -3 * self.sliding_sigma_cut, color=self.sea[0], linewidth=1.5) axs.set_xlabel(self.xaxis, size=self.axes_labelsize) axs.set_ylabel(r"$\Omega_i$", size=self.axes_labelsize) axs.set_xlim(self.days_cut[0], self.days_cut[-1]) axs.annotate( r"Linear trend analysis: $\Omega(t) = C_1 (t-T_{\rm obs}/2) T_{\rm obs} + C_2 C_1 = $" + str(f"{c1:.3e}") + "\nC$_2$ = " + str(f"{c2:.3e}"), xy=(0.05, 0.05), xycoords="axes fraction", size = self.annotate_fontsize, bbox=dict(boxstyle="round", facecolor="white", alpha=1), ) axs.tick_params(axis="x", labelsize=self.legend_fontsize) axs.tick_params(axis="y", labelsize=self.legend_fontsize) axs.xaxis.offsetText.set_fontsize(self.legend_fontsize) plt.title(r"Time evolution of $\Omega$ " + f"in {self.time_tag}", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-omega_time_fit.png", bbox_inches = 'tight' ) plt.close()
[docs] def plot_sigma_time_fit(self): """ Generates and saves a plot of :math:`\sigma` as a function of time and fits the data to perform a linear trend analysis. The plot shows data after the delta-sigma (bad GPS times) cut. This function does not require any input parameters, as it accesses the data through the attributes of the class (e.g. `self.sliding_sigma_cut`). """ if self.days_cut.size==0: return fig, axs = plt.subplots(nrows=1, ncols=1, figsize=(10, 8)) t_obs = self.days_cut[-1] scale = np.sqrt(np.var(self.sliding_sigma_cut)) def func(x, a, b): return a * (x - t_obs / 2) * t_obs + b nanmask = ~np.isnan(self.sliding_sigma_cut) sliding_sigma_cut_nansafe = self.sliding_sigma_cut[nanmask] mean_sigma = np.nanmean(self.sliding_sigma_cut) popt, pcov = curve_fit( func, self.days_cut[nanmask], sliding_sigma_cut_nansafe, ) c1, c2 = popt[0], popt[1] axs.plot(self.days_cut, func(self.days_cut, c1, c2), color=self.sea[3]) axs.plot(self.days_cut, self.sliding_sigma_cut, ".", color=self.sea[0], markersize=1) axs.set_xlabel(self.xaxis, size=self.axes_labelsize) axs.set_ylabel(r"$\sigma_i$", size=self.axes_labelsize) axs.set_xlim(self.days_cut[0], self.days_cut[-1]) axs.set_ylim(mean_sigma - 1.2 * mean_sigma, mean_sigma + 2.2 * mean_sigma) axs.annotate( r"Linear trend analysis: $\sigma(t) = C_1 (t-T_{\rm obs}/2) T_{\rm obs} + C_2 C_1 = $" + str(f"{c1:.3e}") + "\nC$_2$ = " + str(f"{c2:.3e}"), xy=(0.05, 0.05), xycoords="axes fraction", size = self.annotate_fontsize, bbox=dict(boxstyle="round", facecolor="white", alpha=1), ) axs.tick_params(axis="x", labelsize=self.legend_fontsize) axs.tick_params(axis="y", labelsize=self.legend_fontsize) plt.title(r"Time evolution of $\sigma$ " + f"in {self.time_tag}", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-sigma_time_fit.png", bbox_inches = 'tight' ) plt.close()
def plot_gates_in_time(self): if self.gates_ifo1 is None and self.gates_ifo2 is None: self.gates_ifo1_statement=None self.gates_ifo2_statement=None return fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(10, 8)) if self.gates_ifo1 is None: self.gates_ifo1_statement=None else: self.total_gated_time_ifo1 = np.sum(self.gates_ifo1[:,1]-self.gates_ifo1[:,0]) self.total_gated_percent_ifo1 = self.total_gated_time_ifo1/(int(self.sliding_times_all[-1])- int(self.sliding_times_all[0]))*100 gate_times_in_days_ifo1 = (np.array(self.gates_ifo1[:,0]) - self.sliding_times_all[0]) / 86400.0 self.gates_ifo1_statement= f"Data gated out: {self.total_gated_time_ifo1} s\n" f"Percentage: {float(f'{self.total_gated_percent_ifo1:.2g}'):g}%" gatefig1 = ax.plot(gate_times_in_days_ifo1, self.gates_ifo1[:,1]-self.gates_ifo1[:,0], 's', color=self.sea[3], label="IFO1:\n" f"{self.gates_ifo1_statement}") first_legend = ax.legend(handles=gatefig1, loc=(0.05,0.75), fontsize = self.axes_labelsize) ax.add_artist(first_legend) if self.gates_ifo2 is None: self.gates_ifo2_statement=None else: self.total_gated_time_ifo2 = np.sum(self.gates_ifo2[:,1]-self.gates_ifo2[:,0]) self.total_gated_percent_ifo2 = self.total_gated_time_ifo2/(int(self.sliding_times_all[-1])- int(self.sliding_times_all[0]))*100 gate_times_in_days_ifo2 = (np.array(self.gates_ifo2[:,0]) - self.sliding_times_all[0]) / 86400.0 self.gates_ifo2_statement= f"Data gated out: {self.total_gated_time_ifo2} s\n" f"Percentage: {float(f'{self.total_gated_percent_ifo2:.2g}'):g}%" gatefig2 = ax.plot(gate_times_in_days_ifo2, self.gates_ifo2[:,1]-self.gates_ifo2[:,0], 's', color=self.sea[0], label="IFO2:\n" f"{self.gates_ifo2_statement}") ax.legend(handles=gatefig2, loc=(0.05, 0.1), fontsize = self.axes_labelsize) ax.set_xlabel(self.xaxis, size=self.axes_labelsize) ax.set_ylabel("Gate length (s)", size=self.axes_labelsize) plt.xticks(fontsize=self.legend_fontsize) plt.yticks(fontsize=self.legend_fontsize) plt.title(f"Gates applied to {self.baseline_name} in {self.time_tag}", fontsize=self.title_fontsize) plt.savefig( f"{self.plot_dir / self.baseline_name}-{self.file_tag}-gates_time.png", bbox_inches="tight", ) plt.close()
[docs] def save_all_statements(self): """ Saves all useful statements gathered throughout the checks to a json file. """ statements = {} statements['dsc'] = self.dsc_statement statements['gates_ifo1'] = self.gates_ifo1_statement statements['gates_ifo2'] = self.gates_ifo2_statement statements['n_segs'] = self.n_segs_statement with open("stats_statements.json", "w") as outfile: json.dump(statements, outfile)
[docs] def generate_all_plots(self): """ Generates and saves all the statistical analysis plots. """ self.plot_running_point_estimate() self.plot_running_sigma() self.plot_IFFT_point_estimate_integrand() self.plot_SNR_spectrum() self.plot_cumulative_SNR_spectrum() self.plot_real_SNR_spectrum() self.plot_imag_SNR_spectrum() self.plot_sigma_spectrum() self.plot_cumulative_sensitivity() self.plot_omega_sigma_in_time() self.plot_hist_sigma_dsc() self.plot_scatter_sigma_dsc() self.plot_scatter_omega_sigma_dsc() self.plot_hist_omega_pre_post_dsc() self.plot_KS_test() self.plot_hist_sigma_squared() self.plot_omega_time_fit() self.plot_sigma_time_fit() self.plot_gates_in_time() if self.coherence_spectrum is not None: self.plot_coherence_spectrum() self.plot_hist_coherence() self.save_all_statements()
[docs]def sortingFunction(item): return float(item[5:].partition("-")[0])
[docs]def run_statistical_checks_from_file( combine_file_path, dsc_file_path, plot_dir, param_file, coherence_far=1.0, legend_fontsize=16, coherence_file_path = None, file_tag = None, convention='pygwb', seaborn_palette='tab10', ): """ Method to generate an instance of the statistical checks class from a set of files. Parameters ======= combine_file_path: ``str`` Full path to the file containing the output of the pygwb.combine script, i.e., with the combined results of the run. dsc_file_path: ``str`` Full path to the file containing the results of the delta sigma cut. plot_dir: ``str`` Full path where the plots generated by the statistical checks module should be saved. param_file: ``str`` Full path to the parameter file that was used for the analysis. coherence_far: ``float`` Coherence false alarm rate legend_fontsize: ``int``, optional Fontsize used in the plots generated by the module. Defaults to 16. See also -------- pygwb.notch.StochNotchList pygwb.parameters.Parameters """ params = Parameters() params.update_from_file(param_file) spectra_file = np.load(combine_file_path) dsc_file = np.load(dsc_file_path) badGPStimes = dsc_file["badGPStimes"] delta_sigmas = dsc_file["delta_sigma_values"] sliding_times = dsc_file["delta_sigma_times"] naive_sigma_all = dsc_file["naive_sigma_values"] gates_ifo1 = dsc_file["ifo_1_gates"] gates_ifo2 = dsc_file["ifo_2_gates"] if gates_ifo1.size==0: gates_ifo1=None if gates_ifo2.size==0: gates_ifo2=None sliding_omega_all, sliding_sigmas_all = ( spectra_file["point_estimates_seg_UW"], spectra_file["sigmas_seg_UW"], ) frequencies = np.arange( 0, params.new_sample_rate / 2.0 + params.frequency_resolution, params.frequency_resolution, ) frequency_cut = (params.fhigh >= frequencies) & (frequencies >= params.flow) try: frequency_mask = spectra_file["frequency_mask"] except KeyError: try: notch_list_path = params.notch_list_path logger.debug("loading notches from " + notch_list_path) notch_list = StochNotchList.load_from_file(notch_list_path) frequency_mask = notch_list.get_notch_mask(frequencies[frequency_cut]) except: frequency_mask = None spectrum_file = np.load(combine_file_path, mmap_mode="r") point_estimate_spectrum = spectrum_file["point_estimate_spectrum"] sigma_spectrum = spectrum_file["sigma_spectrum"] baseline_name = params.interferometer_list[0] + params.interferometer_list[1] # select alpha for statistical checks delta_sigmas_sel = delta_sigmas[1] naive_sigmas_sel = naive_sigma_all[1] if coherence_file_path is not None: coh_data = np.load(coherence_file_path, allow_pickle=True) coherence_spectrum = coh_data['coherence'] coherence_n_segs = coh_data['n_segs_coh'] else: coherence_spectrum = None coherence_n_segs = None return StatisticalChecks( sliding_times, sliding_omega_all, sliding_sigmas_all, naive_sigmas_sel, coherence_spectrum, coherence_n_segs, point_estimate_spectrum, sigma_spectrum, frequencies[frequency_cut], badGPStimes, delta_sigmas_sel, plot_dir, baseline_name, param_file, frequency_mask, coherence_far, gates_ifo1, gates_ifo2, file_tag=file_tag, legend_fontsize=legend_fontsize, convention=convention, seaborn_palette=seaborn_palette, )