"""
The ``Baseline`` module is by design the core of the pygwb stochastic analysis. Its main role is to manage the cross-
correlation between ``Interferometer`` data products, combine these into a single cross-spectrum, which represents the
point estimate of the analysis, and calculate the associated error.
The ``Baseline`` object relies on the ``pygwb.spectral`` module to calculate cross-correlations between the data
streams. Similarly, it relies on the ``pygwb.postprocessing`` module to obtain the point estimate and its variance.
Calculating these, as well as performing parameter estimation on the gravitational-wave background (GWB) spectrum, requires the two-detector
overlap reduction function (ORF). The ORF is calculated using the ``pygwb.orfs`` module at ``Baseline`` object
initialization, then stored as an attribute.
Examples
--------
To show how a ``Baseline`` object can be instantiated, we start by importing the relevant
packages:
>>> import numpy as np
>>> from pygwb.detector import Interferometer
>>> from pygwb.baseline import Baseline
For concreteness, we work with the LIGO Hanford and Livingston detectors, which we
instantiate through:
>>> H1 = Interferometer.get_empty_interferometer("H1")
>>> L1 = Interferometer.get_empty_interferometer("L1")
The standard initialization of a ``Baseline`` object then simply requires a pair of
``Interferometer`` objects:
>>> H1L1_baseline = baseline.Baseline("H1-L1", H1, L1)
"""
import json
import pickle
import warnings
import h5py
import numpy as np
from gwpy.frequencyseries import FrequencySeries
from loguru import logger
from .coherence import calculate_coherence
from .constants import h0
from .delta_sigma_cut import run_dsc
from .notch import StochNotchList
from .omega_spectra import OmegaSpectrogram, OmegaSpectrum
from .orfs import calc_orf
from .postprocessing import (
calc_Y_sigma_from_Yf_sigmaf,
calculate_point_estimate_sigma_spectra,
postprocess_Y_sigma,
)
from .spectral import cross_spectral_density
[docs]class Baseline:
[docs] def __init__(
self,
name,
interferometer_1,
interferometer_2,
duration=None,
frequencies=None,
calibration_epsilon=0,
notch_list_path="",
coarse_grain_psd=False,
coarse_grain_csd=True,
overlap_factor_welch=0.5,
overlap_factor=0.5,
window_fftgram_dict={"window_fftgram": "hann"},
window_fftgram_dict_welch={"window_fftgram": "hann"},
N_average_segments_psd=2,
sampling_frequency=None,
):
"""
Instantiate a Baseline.
Parameters
=======
name: ``str``
Name for the baseline, e.g H1H2
interferometer_1/2: ``bilby.Interferometer`` object
The two detectors spanning the baseline.
duration: ``float``, optional
The duration in seconds of each data segment in the interferometers.
None by default, in which case duration is inherited from the interferometers.
frequencies: ``array_like``, optional
The frequency array for the Baseline and
interferometers.
calibration_epsilon: ``float``, optional
Calibration uncertainty for this baseline.
notch_list_path: ``str``, optional
File path of the baseline notch list.
coarse_grain_psd: ``bool``, optional
Whether to apply coarse graining to obtain PSD spectra. Default is False.
coarse_grain_csd: ``bool``, optional
Whether to apply coarse graining to obtain CSD spectra. Default is True.
overlap_factor_welch: ``float``, optional
Overlap factor to use when if using Welch's method to estimate spectra (NOT coarsegraining).
For \"hann\" window use 0.5 overlap_factor and for \"boxcar"\ window use 0 overlap_factor. Default is 0.5 (50% overlap),
which is optimal when using Welch's method with a \"hann\" window.
overlap_factor: ``float``, optional
Factor by which to overlap the segments in the psd and csd estimation.
Default is 1/2, if set to 0 no overlap is performed.
window_fftgram_dict: ``dict``, optional
Dictionary containing name and parameters describing which window to use when producing fftgrams for csds (and psds if these are coarse-grained). Default is \"hann\".
window_fftgram_dict_welch: ``dict``, optional
Dictionary containing name and parameters describing which window to use when producing fftgrams with Welch's method. Default is \"hann\".
N_average_segments_psd: ``int``, optional
Number of segments used for PSD averaging (from both sides of the segment of interest)
N_avg_segs should be even and >= 2.
"""
self.name = name
self.interferometer_1 = interferometer_1
self.interferometer_2 = interferometer_2
self.calibration_epsilon = calibration_epsilon
self.notch_list_path = notch_list_path
self.coarse_grain_psd = coarse_grain_psd
self.coarse_grain_csd = coarse_grain_csd
self.overlap_factor_welch = overlap_factor_welch
self.overlap_factor = overlap_factor
self.window_fftgram_dict = window_fftgram_dict
self.window_fftgram_dict_welch = window_fftgram_dict_welch
self.N_average_segments_psd = N_average_segments_psd
self._tensor_orf_calculated = False
self._vector_orf_calculated = False
self._scalar_orf_calculated = False
self._gamma_v_calculated = False
self._orf_polarization_set = False
self._point_estimate_spectrogram_set = False
self._point_estimate_spectrum_set = False
self._point_estimate_set = False
self._sigma_spectrogram_set = False
self._sigma_spectrum_set = False
self._sigma_set = False
self._coherence_spectrum_set = False
self.sampling_frequency = sampling_frequency
self.duration = duration
self.frequencies = frequencies
self.minimum_frequency = max(
interferometer_1.minimum_frequency, interferometer_2.minimum_frequency
)
self.maximum_frequency = min(
interferometer_1.maximum_frequency, interferometer_2.maximum_frequency
)
# if CSD is estimated by coarse-graining, it must be zeropaded.
self.zeropad_csd = self.coarse_grain_csd
# if PSD is estimated by coarse-graining, no overlap is used between PSD estimates. This is required for the bias factor calculation.
if self.coarse_grain_psd:
self.overlap_factor_psd = 0.0
self.window_fftgram_dict_psd = self.window_fftgram_dict
self.window_fftgram_dict_for_bias_factors = {"window_fftgram": "boxcar"}
else:
self.overlap_factor_psd = self.overlap_factor_welch
self.window_fftgram_dict_psd = self.window_fftgram_dict_welch
self.window_fftgram_dict_for_bias_factors = self.window_fftgram_dict_psd
if self.coarse_grain_csd:
self.window_fftgram_dict_csd = self.window_fftgram_dict
else:
self.window_fftgram_dict_csd = self.window_fftgram_dict_welch
# throw a warning if overlap factors are unsupported
if self.overlap_factor>0.5:
warnings.warn("Overlap factor not fully supported. Overlap factor should be overlap_factor <= 0.5.")
if self.overlap_factor_welch>0.5:
warnings.warn("Overlap factor for spectral estimation using Welch's method not fully supported. Overlap factor should be overlap_factor_welch <= 0.5.")
def __eq__(self, other):
if not type(self) == type(other):
return False
else:
return all(
[
getattr(self, key) == getattr(other, key)
for key in [
"name",
"interferometer_1",
"interferometer_2",
"calibration_epsilon",
"duration",
"frequencies",
]
]
)
@property
def overlap_reduction_function(self):
"""Overlap reduction function associated to this baseline, calculated for the requested polarization."""
if self._orf_polarization == "tensor":
return self.tensor_overlap_reduction_function
elif self._orf_polarization == "vector":
return self.vector_overlap_reduction_function
elif self._orf_polarization == "scalar":
return self.scalar_overlap_reduction_function
elif self._orf_polarization == "right_left":
return self.gamma_v
else:
raise AttributeError(
"Overlap reduction function to be used has not yet been set. To set it, set the orf_polarization property."
)
@property
def orf_polarization(self):
"""Overlap reduction function polarization"""
if self._orf_polarization_set:
return self._orf_polarization
else:
raise AttributeError(
"Overlap reduction function polarization han not yet been set."
)
@orf_polarization.setter
def orf_polarization(self, pol):
self._orf_polarization = pol
self._orf_polarization_set = True
@property
def tensor_overlap_reduction_function(self):
"""Overlap reduction function calculated for tensor polarization."""
if not self._tensor_orf_calculated:
self._tensor_orf = self.calc_baseline_orf(
polarization="tensor", frequencies=self.frequencies
)
self._tensor_orf_calculated = True
return self._tensor_orf
@property
def vector_overlap_reduction_function(self):
"""Overlap reduction function calculated for vector polarization."""
if not self._vector_orf_calculated:
self._vector_orf = self.calc_baseline_orf(polarization="vector")
self._vector_orf_calculated = True
return self._vector_orf
@property
def scalar_overlap_reduction_function(self):
"""Overlap reduction function calculated for scalar polarization."""
if not self._scalar_orf_calculated:
self._scalar_orf = self.calc_baseline_orf(polarization="scalar")
self._scalar_orf_calculated = True
return self._scalar_orf
[docs] def set_frequency_mask(self, notch_list_path="", apply_notches=True):
"""
Set frequency mask to frequencies attribute.
Parameters
=======
notch_list_path: ``str``, optional
Path to notch list to apply to frequency array. If not
provided, no notching will be applied at this point.
apply_notches: ``bool``, optional
If True, apply frequency notches. Default is True.
See also
--------
pygwb.notch.StochNotchList : Used to read in the frequency notches.
"""
mask = (self.frequencies >= self.minimum_frequency) & (
self.frequencies <= self.maximum_frequency
)
if apply_notches:
if notch_list_path:
self.notch_list_path = notch_list_path
if self.notch_list_path:
logger.debug("loading notches from " + str(self.notch_list_path))
notch_list = StochNotchList.load_from_file(self.notch_list_path)
notch_mask = notch_list.get_notch_mask(self.frequencies)
mask = np.logical_and(mask, notch_mask)
else:
logger.debug("no notching will be applied at this point.")
self.frequency_mask = mask
@property
def gamma_v(self, frequencies=None):
"""
Overlap reduction function for asymmetrically polarised backgrounds,
as described in https://arxiv.org/pdf/0707.0535.pdf
"""
if not self._gamma_v_calculated:
self._gamma_v = self.calc_baseline_orf(polarization="right_left")
self._gamma_v_calculated = True
return self._gamma_v
@property
def duration(self):
"""Duration in seconds of a unit segment of data stored in the baseline detectors."""
if self._duration_set:
return self._duration
else:
raise AttributeError("Duration not yet set.")
@duration.setter
def duration(self, dur):
"""
Sets the duration for the Baseline and interferometers.
Parameters
=======
dur: ``float``
The duration to set for the Baseline and interferometers.
Notes
-----
If `duration` is passed, check that it matches the `duration`
in the interferometers, if present.
If not passed, check that the durations in the interferometers
match each other, if present, and set the Baseline duration from
the interferometer durations.
If not passed and only one of the interferometers has the duration
set, set the Baseline duration and duration for the other
interferometer from that.
Requires that either `duration` is not None or at least one of the
interferometers has the `duration` set.
"""
if dur is not None:
self._check_durations_match_baseline_ifos(dur)
self._duration = dur
if not self.interferometer_1.duration:
self.interferometer_1.duration = dur
if not self.interferometer_2.duration:
self.interferometer_2.duration = dur
self._duration_set = True
elif self.interferometer_1.duration and self.interferometer_2.duration:
self._check_ifo_durations_match()
self.duration = self.interferometer_1.duration
self._duration_set = True
elif self.interferometer_1.duration:
self.duration = self.interferometer_1.duration
self.interferometer_2.duration = self.interferometer_1.duration
self._duration_set = True
elif self.interferometer_2.duration:
self.duration = self.interferometer_2.duration
self.interferometer_1.duration = self.interferometer_2.duration
self._duration_set = True
else:
warnings.warn("Neither baseline nor interferometer duration is set.")
self._duration = dur
self._duration_set = True
@property
def csd_segment_offset(self):
"""CSD segment offset to use for this baseline."""
if self._duration_set:
stride = self.duration * (1 - self.overlap_factor)
return int(np.ceil(self.duration / stride)) * int(self.N_average_segments_psd/2)
else:
raise ValueError("Trying to calculate CSD segment offset before setting duration. Need to set duration before attempting this.")
@property
def frequencies(self):
"""Frequency array associated to this baseline."""
if self._frequencies_set:
return self._frequencies
else:
raise AttributeError("frequencies have not yet been set.")
@frequencies.setter
def frequencies(self, freqs):
self._frequencies = freqs
self._frequencies_set = True
self.frequency_mask = None
# delete the orfs, set the calculated flag to zero
if self._tensor_orf_calculated:
delattr(self, "_tensor_orf")
self._tensor_orf_calculated = False
if self._scalar_orf_calculated:
delattr(self, "_scalar_orf")
self._scalar_orf_calculated = False
if self._vector_orf_calculated:
delattr(self, "_vector_orf")
self._vector_orf_calculated = False
@property
def point_estimate_spectrogram(self):
"""Point estimate spectrogram (in Omega units) calculated using data in this baseline."""
if self._point_estimate_spectrogram_set:
return self._point_estimate_spectrogram
else:
raise AttributeError(
"Omega point estimate spectrogram not yet set. To set it, use `set_point_estimate_sigma_spectrogram` method."
)
@point_estimate_spectrogram.setter
def point_estimate_spectrogram(self, pt_est):
self._point_estimate_spectrogram = pt_est
self._point_estimate_spectrogram_set = True
@property
def sigma_spectrogram(self):
"""Sigma spectrogram (in Omega units) calculated using data in this baseline."""
if self._sigma_spectrogram_set:
return self._sigma_spectrogram
else:
raise AttributeError(
"Omega sigma spectrogram not yet set. To set it, use `set_point_estimate_sigma_spectrogram` method."
)
@sigma_spectrogram.setter
def sigma_spectrogram(self, sig):
self._sigma_spectrogram = sig
self._sigma_spectrogram_set = True
@property
def point_estimate_spectrum(self):
"""Point estimate spectrum (in Omega units) calculated using data in this baseline."""
if self._point_estimate_spectrum_set:
return self._point_estimate_spectrum
else:
raise AttributeError(
"Omega point estimate spectrum not yet set. To set it, use `set_point_estimate_sigma_spectrum` method."
)
@point_estimate_spectrum.setter
def point_estimate_spectrum(self, pt_est):
self._point_estimate_spectrum = pt_est
self._point_estimate_spectrum_set = True
@property
def sigma_spectrum(self):
"""Sigma spectrum (in Omega units) calculated using data in this baseline."""
if self._sigma_spectrum_set:
return self._sigma_spectrum
else:
raise AttributeError(
"Omega sigma spectrum not yet set. To set it, use `set_point_estimate_sigma_spectrum` method."
)
@sigma_spectrum.setter
def sigma_spectrum(self, sig):
self._sigma_spectrum = sig
self._sigma_spectrum_set = True
@property
def point_estimate(self):
"""Point estimate (in Omega units) calculated using data in this baseline."""
if self._point_estimate_set:
return self._point_estimate
else:
raise AttributeError(
"Omega point estimate not yet set. To set it, use `set_point_estimate_sigma` method."
)
@point_estimate.setter
def point_estimate(self, pt_est):
self._point_estimate = pt_est
self._point_estimate_set = True
@property
def sigma(self):
"""Sigma (in Omega units) calculated using data in this baseline."""
if self._sigma_set:
return self._sigma
else:
raise AttributeError(
"Omega sigma not yet set. To set it, use `set_point_estimate_sigma` method."
)
@sigma.setter
def sigma(self, sig):
self._sigma = sig
self._sigma_set = True
@property
def coherence_spectrum(self):
"""Coherence spectrum calculated using data in this baseline."""
if self._coherence_spectrum_set:
return self._coherence_spectrum
else:
raise AttributeError(
"Coherence spectrum not yet set. To set it, use `set_coherence_spectrum` method."
)
@coherence_spectrum.setter
def coherence_spectrum(self, coh):
self._coherence_spectrum = coh
self._coherence_spectrum_set = True
@property
def coherence_dict(self):
"""Coherence dictionary based on data in this baseline."""
if self._coherence_spectrum_set:
return self._coherence_dict
else:
raise AttributeError(
"Coherence spectrum not yet set. To set it, use `set_coherence_spectrum` method."
)
@coherence_dict.setter
def coherence_dict(self, cohdict):
self._coherence_dict = cohdict
def _check_durations_match_baseline_ifos(self, duration):
"""
Checks whether the baseline duration matches the duration set in
the interferometers of the baseline.
Parameters
=======
duration: ``float``
Duration of the baseline.
"""
if self.interferometer_1.duration and self.interferometer_2.duration:
self._check_ifo_durations_match()
if not duration == self.interferometer_1.duration:
raise AssertionError(
"Interferometer durations do not match given Baseline duration!"
)
elif self.interferometer_1.duration:
if not duration == self.interferometer_1.duration:
raise AssertionError(
"Interferometer_1 duration does not match given Baseline duration!"
)
elif self.interferometer_2.duration:
if not duration == self.interferometer_2.duration:
raise AssertionError(
"Interferometer_2 duration does not match given Baseline duration!"
)
def _check_ifo_durations_match(self):
"""
Checks whether the duration in both interferometers agree.
"""
if not (self.interferometer_1.duration == self.interferometer_2.duration):
raise AssertionError("Interferometer durations do not match each other!")
@property
def sampling_frequency(self):
"""Sampling frequency of the data stored in this baseline. This must match the
sampling frequency stored in this baseline's interferometers."""
if hasattr(self, "_sampling_frequency"):
return self._sampling_frequency
else:
raise AttributeError("sampling frequency not set.")
@sampling_frequency.setter
def sampling_frequency(self, sampling_frequency):
"""Sets the sampling_frequency for the Baseline and interferometers
Parameters
=======
sampling_frequency: ``float``, optional
The sampling frequency to set for the Baseline and interferometers
Warning
-------
If `sampling_frequency` is passed, check that it matches the `sampling_frequency`
in the interferometers, if present.
If not passed, check that the sampling_frequencies in the interferometers
match each other, if present, and set the Baseline sampling_frequency from
the sampling_frequencies.
If not passed and only one of the interferometers has the sampling_frequency
set, set the Baseline sampling_frequency and sampling_frequency for the other
interferometer from that.
Requires that either `sampling_frequency` is not None or at least one of the
interferometers has the `sampling_frequency` set.
"""
if sampling_frequency is not None:
self.check_sampling_frequencies_match_baseline_ifos(sampling_frequency)
self._sampling_frequency = sampling_frequency
if not self.interferometer_1.sampling_frequency:
self.interferometer_1.sampling_frequency = sampling_frequency
if not self.interferometer_2.sampling_frequency:
self.interferometer_2.sampling_frequency = sampling_frequency
self._sampling_frequency_set = True
elif (
self.interferometer_1.sampling_frequency
and self.interferometer_2.sampling_frequency
):
self._check_ifo_sampling_frequencies_match()
self._sampling_frequency = self.interferometer_1.sampling_frequency
self._sampling_frequency_set = True
elif self.interferometer_1.sampling_frequency:
self.sampling_frequency = self.interferometer_1.sampling_frequency
self.interferometer_2.sampling_frequency = (
self.interferometer_1.sampling_frequency
)
self._sampling_frequency_set = True
elif self.interferometer_2.sampling_frequency:
self._sampling_frequency = self.interferometer_2.sampling_frequency
self.interferometer_1.sampling_frequency = (
self.interferometer_2.sampling_frequency
)
self._sampling_frequency_set = True
else:
warnings.warn(
"Neither baseline nor interferometer sampling_frequency is set."
)
self._sampling_frequency = sampling_frequency
self._sampling_frequency_set = True
@property
def badGPStimes(self):
"""GPS times flagged by delta sigma cut."""
if hasattr(self, "_badGPStimes"):
return self._badGPStimes
else:
raise AttributeError(
"bad GPS times are not set - need to run delta_sigma_cut first."
)
@badGPStimes.setter
def badGPStimes(self, badGPStimes):
self._badGPStimes = badGPStimes
@property
def delta_sigmas(self):
"""Values of delta sigmas for data segments in the baseline."""
if hasattr(self, "_delta_sigmas"):
return self._delta_sigmas
else:
raise AttributeError(
"delta_sigmas are not set - need to run delta_sigma_cut first."
)
@delta_sigmas.setter
def delta_sigmas(self, delta_sigmas):
self._delta_sigmas = delta_sigmas
[docs] def check_sampling_frequencies_match_baseline_ifos(self, sampling_frequency):
"""Check that the sampling frequency of the two interferometers in this Baseline match the Baseline sampling frequency.
Parameters
=======
sampling_frequency: ``float``
The sampling frequency that is being set for the Baseline.
Notes
-----
If the sampling frequency passed is `None`, the Baseline sampling frequency will be set to that of the interferometers, if these
match. If these don't match, an error will be raised. If the sampling frequency of the interferometers is also `None`, then no
sampling frequency will be set, and the user can set it at a later time.
"""
if (
self.interferometer_1.sampling_frequency
and self.interferometer_2.sampling_frequency
):
self._check_ifo_sampling_frequencies_match()
if not sampling_frequency == self.interferometer_1.sampling_frequency:
raise AssertionError(
"Interferometer sampling_frequencies do not match given Baseline sampling_frequency!"
)
elif self.interferometer_1.sampling_frequency:
if not sampling_frequency == self.interferometer_1.sampling_frequency:
raise AssertionError(
"Interferometer_1 sampling_frequency does not match given Baseline sampling_frequency!"
)
elif self.interferometer_2.sampling_frequency:
if not sampling_frequency == self.interferometer_2.sampling_frequency:
raise AssertionError(
"Interferometer_2 sampling_frequency does not match given Baseline sampling_frequency!"
)
def _check_ifo_sampling_frequencies_match(self):
"""
Check whether the sampling frequencies of the interferometers match.
"""
if not (
self.interferometer_1.sampling_frequency
== self.interferometer_2.sampling_frequency
):
raise AssertionError(
"Interferometer sampling_frequencies do not match each other!"
)
[docs] def calc_baseline_orf(self, polarization="tensor", frequencies=None):
"""
Calculate the overlap reduction function for this baseline.
Wraps the ORF module.
Parameters
=======
polarization: ``str``, optional
Polarization of the signal to consider (scalar, vector, tensor) for the ORF calculation.
Default is tensor.
frequencies: ``array_like``, optional
Frequency array to use in the calculation of the ORF. By default, self.frequencies is used.
Returns
=======
orf: ``array_like``
Overlap reduction function for the required polarization.
See also
--------
pygwb.orfs.calc_orf
Method to compute the overlap reduction function.
"""
if frequencies is not None:
return calc_orf(
frequencies,
self.interferometer_1.vertex,
self.interferometer_2.vertex,
self.interferometer_1.x,
self.interferometer_2.x,
self.interferometer_1.y,
self.interferometer_2.y,
polarization,
)
elif self.frequencies is not None:
return calc_orf(
self.frequencies,
self.interferometer_1.vertex,
self.interferometer_2.vertex,
self.interferometer_1.x,
self.interferometer_2.x,
self.interferometer_1.y,
self.interferometer_2.y,
polarization,
)
else:
raise ValueError(
"Frequencies have not been provided for the orf calculation; user should either pass frequencies in or set them for this Baseline."
)
[docs] @classmethod
def from_interferometers(
cls,
interferometers,
duration=None,
calibration_epsilon=0,
):
"""
Load a Baseline from a list of interferometers.
Parameters
=======
interferometers: ``list``
List of two bilby Interferometer objects.
duration: ``float``, optional
Segment duration in seconds. Default is None.
calibration_epsilon: ``float``, optional
Calibration uncertainty for this baseline. Default is 0.
Returns
=======
Baseline: cls
Baseline class
"""
name = "".join([ifo.name for ifo in interferometers])
return cls(
name=name,
interferometer_1=interferometers[0],
interferometer_2=interferometers[1],
duration=duration,
calibration_epsilon=calibration_epsilon,
)
[docs] @classmethod
def from_parameters(
cls,
interferometer_1,
interferometer_2,
parameters,
frequencies=None,
):
"""
Load a Baseline from a Parameters object.
Parameters
=======
interferometer_1/2: ``bilby.Interferometer`` object
The two detectors spanning this baseline.
parameters: ``pygwb.parameters`` object
Parameters object containing necessary parameters for
the instantiation of the baseline, and subsequent
analyses.
frequencies: ``array_like``, optional
Frequency array to use in the instantiation of this baseline.
Default is None.
Returns
=======
Baseline: cls
Baseline class
"""
name = interferometer_1.name + interferometer_2.name
return cls(
name=name,
interferometer_1=interferometer_1,
interferometer_2=interferometer_2,
duration=parameters.segment_duration,
calibration_epsilon=parameters.calibration_epsilon,
frequencies=frequencies,
notch_list_path=parameters.notch_list_path,
coarse_grain_psd=parameters.coarse_grain_psd,
coarse_grain_csd=parameters.coarse_grain_csd,
overlap_factor_welch=parameters.overlap_factor_welch,
overlap_factor=parameters.overlap_factor,
window_fftgram_dict=parameters.window_fft_dict,
window_fftgram_dict_welch=parameters.window_fft_dict_welch,
N_average_segments_psd=parameters.N_average_segments_psd,
sampling_frequency=parameters.new_sample_rate,
)
[docs] @classmethod
def load_from_pickle(cls, filename):
"""
Load baseline object from pickle file.
Parameters
=======
filename: ``str``
Filename (inclusive of path) to load the pickled baseline from.
Returns
=======
Baseline: ``cls``
Baseline class.
"""
with open(filename, "rb") as f:
return pickle.load(f)
[docs] def save_to_pickle(self, filename, wipe=True):
"""
Save baseline object to pickle file.
Parameters
=======
filename: ``str``
Filename (inclusive of path) to save the pickled baseline to.
"""
if wipe:
self.interferometer_1.timeseries = None
self.interferometer_2.timeseries = None
with open(filename, "wb") as f:
pickle.dump(self, f)
[docs] def set_cross_and_power_spectral_density(self, frequency_resolution):
"""
Set the power spectral density in each interferometer
and the cross spectral density for the baseline object when data are available.
Parameters
=======
frequency_resolution: ``float``
The frequency resolution at which the cross and power spectral densities are calculated.
See also
--------
pygwb.spectral.cross_spectral_density
"""
try:
self.interferometer_1.set_psd_spectrogram(
frequency_resolution,
coarse_grain=self.coarse_grain_psd,
overlap_factor=self.overlap_factor,
window_fftgram_dict=self.window_fftgram_dict_psd,
overlap_factor_welch=self.overlap_factor_welch,
)
except AttributeError:
raise AssertionError(
"Interferometer {self.interferometer_1.name} has no timeseries data! Need to set timeseries data in the interferometer first."
)
try:
self.interferometer_2.set_psd_spectrogram(
frequency_resolution,
coarse_grain=self.coarse_grain_psd,
overlap_factor=self.overlap_factor,
window_fftgram_dict=self.window_fftgram_dict_psd,
overlap_factor_welch=self.overlap_factor_welch,
)
except AttributeError:
raise AssertionError(
"Interferometer {self.interferometer_2.name} has no timeseries data! Need to set timeseries data in the interferometer first."
)
self.csd = cross_spectral_density(
self.interferometer_1.timeseries,
self.interferometer_2.timeseries,
self.duration,
frequency_resolution,
coarse_grain=self.coarse_grain_csd,
overlap_factor=self.overlap_factor,
zeropad=self.zeropad_csd,
window_fftgram_dict=self.window_fftgram_dict_csd,
overlap_factor_welch=self.overlap_factor_welch,
)
# TODO: make this less fragile.
# For now, reset frequencies,
# recalculate ORF in case frequencies have changed.
self._tensor_orf_calculated = False
self.frequencies = self.csd.frequencies.value
[docs] def set_average_power_spectral_densities(self):
"""If psds have been calculated, sets the average psd in each ifo."""
try:
self.interferometer_1.set_average_psd(self.N_average_segments_psd)
self.interferometer_2.set_average_psd(self.N_average_segments_psd)
except AttributeError:
print(
"PSDs have not been calculated yet! Need to set_cross_and_power_spectral_density first."
)
# TODO: make this less fragile.
# For now, recalculate ORF in case frequencies have changed.
# self._tensor_orf_calculated = False
# self.frequencies = self.csd.frequencies.value
[docs] def set_average_cross_spectral_density(self):
"""If csd has been calculated, sets the average csd for the baseline."""
try:
self.average_csd = self.csd[
self.csd_segment_offset : -(self.csd_segment_offset + 1) + 1
]
except AttributeError:
print(
"CSD has not been calculated yet! Need to set_cross_and_power_spectral_density first."
)
# TODO: make this less fragile.
# For now, recalculate ORF in case frequencies have changed.
self._tensor_orf_calculated = False
self.frequencies = self.csd.frequencies.value
[docs] def crop_frequencies_average_psd_csd(self, flow, fhigh):
"""
Crop frequencies of average PSDs and CSDS. Done in place. This is not completely implemented yet.
Parameters
=======
flow: ``float``
Low frequency to crop.
fhigh: ``float``
High frequency to crop.
"""
deltaF = self.frequencies[1] - self.frequencies[0]
# reset frequencies using the same calculation as in crop_frequencies so we get
# consistent frequency ranges
idx0 = int(float(flow - self.frequencies[0]) // deltaF)
idx1 = int(float(fhigh + deltaF - self.frequencies[0]) // deltaF)
self.frequencies = self.frequencies[idx0:idx1]
if hasattr(self.interferometer_1, "average_psd"):
self.interferometer_1.average_psd = (
self.interferometer_1.average_psd.crop_frequencies(flow, fhigh + deltaF)
)
if hasattr(self.interferometer_2, "average_psd"):
self.interferometer_2.average_psd = (
self.interferometer_2.average_psd.crop_frequencies(flow, fhigh + deltaF)
)
if hasattr(self, "average_csd"):
self.average_csd = self.average_csd.crop_frequencies(flow, fhigh + deltaF)
if self._point_estimate_spectrogram_set:
self.point_estimate_spectrogram = (
self.point_estimate_spectrogram.crop_frequencies(flow, fhigh + deltaF)
)
if self._sigma_spectrogram_set:
self.sigma_spectrogram = self.sigma_spectrogram.crop_frequencies(
flow, fhigh + deltaF
)
if self._point_estimate_spectrum_set:
self.point_estimate_spectrum = self.point_estimate_spectrum.crop(
flow, fhigh + deltaF
)
if self._sigma_spectrum_set:
self.sigma_spectrum = self.sigma_spectrum.crop(flow, fhigh + deltaF)
if self._point_estimate_set:
self.set_point_estimate_sigma()
if self._coherence_spectrum_set:
self.coherence_spectrum = self.coherence_spectrum.crop(flow, fhigh + deltaF)
[docs] def set_point_estimate_sigma_spectrogram(
self, alpha=0.0, fref=25, flow=20, fhigh=1726, polarization="tensor"
):
"""
Set point estimate and sigma spectrogram. Resulting spectrogram
*does not include frequency weighting for alpha*.
Parameters
=======
alpha: ``float``, optional
Spectral index to use in the weighting. Default is 0.
fref: ``float``, optional
Reference frequency to use in the weighting calculation.
Final result refers to this frequency. Default is 25 Hz.
flow: ``float``, optional
Lowest frequency to consider. Default is 20 Hz.
fhigh: ``float``, optional
Highest frequency to consider. Default is 1726 Hz.
polarization: ``str``, optional
Polarization of the signal to consider (scalar, vector, tensor) for the ORF calculation.
Default is tensor.
See also
--------
pygwb.omega_spectra.OmegaSpectrogram
pygwb.postprocessing.calculate_point_estimate_sigma_spectra
"""
# set CSD if not set
# self.set_average_cross_spectral_density()
# set PSDs if not set
# self.set_average_power_spectral_densities()
self.crop_frequencies_average_psd_csd(flow, fhigh)
if not self._orf_polarization_set:
self.orf_polarization = polarization
Y_fs, var_fs = calculate_point_estimate_sigma_spectra(
freqs=self.frequencies,
csd=self.average_csd,
avg_psd_1=self.interferometer_1.average_psd,
avg_psd_2=self.interferometer_2.average_psd,
orf=self.overlap_reduction_function,
sample_rate=self.sampling_frequency,
segment_duration=self.duration,
fref=fref,
alpha=alpha,
overlap_factor=self.overlap_factor,
window_fftgram_dict=self.window_fftgram_dict_csd
)
sigma_name = f"{self.name} sigma spectrogram alpha={alpha}"
self.point_estimate_spectrogram = OmegaSpectrogram(
Y_fs,
times=self.average_csd.times,
frequencies=self.average_csd.frequencies,
name=self.name + f" with alpha={alpha}",
alpha=alpha,
fref=fref,
h0=h0,
)
self.sigma_spectrogram = OmegaSpectrogram(
np.sqrt(var_fs),
times=self.average_csd.times,
frequencies=self.average_csd.frequencies,
name=sigma_name,
alpha=alpha,
fref=fref,
h0=h0,
)
[docs] def set_point_estimate_sigma_spectrum(
self,
badtimes=None,
alpha=0.0,
fref=25,
flow=20,
fhigh=1726,
notch_list_path="",
polarization="tensor",
apply_dsc=True,
apply_notches=True,
):
"""
Set time-integrated point estimate spectrum and variance in each frequency bin.
Parameters
=======
badtimes: ``array_like``, optional
Array of times to exclude from point estimate/sigma calculation.
If no times are passed, none will be excluded.
alpha: ``float``, optional
Spectral index to use in the re-weighting. Default is 0.
fref: ``float``, optional
Reference frequency to use in the re-weighting. Default is 25 Hz.
flow: ``float``, optional
Low frequency. Default is 20 Hz.
fhigh: ``float``, optional
High frequency. Default is 1726 Hz.
notch_list_path: ``str``, optional
Path to the notch list to use in the spectrum. Default is empty string.
polarization: ``str``, optional
Polarization of the signal to consider (scalar, vector, tensor) for the orf calculation.
Default is tensor.
apply_dsc: ``bool``, optional
Apply delta sigma cut flag; if True, removes the badGPStimes from the spectra calculations.
Default is True.
apply_notches: ``bool``, optional
Apply spectral notches flag; if True, remove the notches specified in the notch_list from the spectra calculations.
Default is True.
See also
--------
pygwb.postprocessing.postprocess_Y_sigma
pygwb.omega_spectra.OmegaSpectrum
"""
if apply_dsc:
if not hasattr(self, "badGPStimes"):
warnings.warn(
"Delta sigma cut has not been calculated yet, hence no delta sigma cut will be applied... If this is a mistake, please run `calculate_delta_sigma_cut` first, then re-calculate point estimate/sigma spectra."
)
if self._point_estimate_spectrogram_set:
# reweight based on alpha that has been supplied
self.point_estimate_spectrogram.reweight(new_alpha=alpha, new_fref=fref)
self.sigma_spectrogram.reweight(new_alpha=alpha, new_fref=fref)
else:
logger.info(
"Point estimate and sigma spectrograms are not set yet. setting now..."
)
self.set_point_estimate_sigma_spectrogram(
alpha=alpha,
fref=fref,
flow=flow,
fhigh=fhigh,
polarization=polarization,
)
deltaF = self.frequencies[1] - self.frequencies[0]
# should be True for each bad time
bad_times_indexes = self._get_bad_times_indexes(times=self.point_estimate_spectrogram.times.value, badtimes=badtimes, apply_dsc=apply_dsc)
logger.info(f"{np.sum(bad_times_indexes)} bad segments removed.")
# start time, for metadata
epoch = self.point_estimate_spectrogram.times[0]
if self.sampling_frequency is None:
raise AttributeError(
"the sampling frequency is not set! Cannot proceed with spectrum calculation."
)
if apply_dsc:
if len(self.delta_sigmas["times"]) == len(self.badGPStimes):
warnings.warn(
"The delta sigma cut has flagged all times in this dataset. No point estimate/sigma values can be calculated."
)
self.point_estimate_spectrum = np.zeros(len(self.frequencies))
self.sigma_spectrum = np.inf * np.ones(len(self.frequencies))
return
# setting the frequency mask for the before/after calculation
self.set_frequency_mask(
notch_list_path=notch_list_path, apply_notches=apply_notches
)
point_estimate, sigma = postprocess_Y_sigma(
self.point_estimate_spectrogram.value,
self.sigma_spectrogram.value ** 2,
self.duration,
deltaF,
self.sampling_frequency,
frequency_mask=self.frequency_mask,
window_fftgram_dict=self.window_fftgram_dict,
window_fftgram_dict_welch=self.window_fftgram_dict_for_bias_factors,
badtimes_mask=bad_times_indexes,
overlap_factor=self.overlap_factor,
overlap_factor_welch=self.overlap_factor_psd,
N_avg_segs=self.N_average_segments_psd,
)
self.point_estimate_spectrum = OmegaSpectrum(
point_estimate,
frequencies=self.frequencies,
name=self.name + " point estimate spectrum",
epoch=epoch,
alpha=alpha,
fref=fref,
h0=h0,
)
self.sigma_spectrum = OmegaSpectrum(
sigma,
frequencies=self.frequencies,
name=self.name + " sigma spectrum",
epoch=epoch,
alpha=alpha,
fref=fref,
h0=h0,
)
self.point_estimate_alpha = 0
[docs] def set_point_estimate_sigma(
self,
badtimes=None,
alpha=0.0,
fref=25,
flow=20,
fhigh=1726,
notch_list_path="",
polarization="tensor",
apply_dsc=True,
apply_notches=True,
):
"""
Set point estimate sigma based on a set of parameters. This is estimate of omega_gw in each frequency bin.
Parameters
=======
badtimes: ``array_like``, optional
Array of times to exclude from point estimate/sigma calculation.
If no times are passed, none will be excluded.
alpha: ``float``, optional
Spectral index to use in the re-weighting. Default is 0.
fref: ``float``, optional
Reference frequency to use in the re-weighting. Default is 25.
flow: ``float``, optional
Low frequency. Default is 20 Hz.
fhigh: ``float``, optional
High frequency. Default is 1726 Hz.
notch_list_path: ``str``, optional
Path to the notch list to use in the spectrum; if the notch_list isn't set in the baseline,
user can pass it directly here. If it is not set and if none is passed no notches will be applied.
polarization: ``str``, optional
Polarization of the signal to consider (scalar, vector, tensor) for the orf calculation.
Default is Tensor.
apply_dsc: ``bool``, optional
Apply delta sigma cut flag; if True, removes the badGPStimes from the spectra calculations.
Default is True.
apply_notches: ``bool``, optional
Apply spectral notches flag; if True, remove the notches specified in the notch_list from the spectra calculations.
Default is True.
See also
--------
pygwb.postprocessing.calc_Y_sigma_from_Yf_sigmaf
"""
if self._point_estimate_spectrum_set:
self.point_estimate_spectrum.reweight(new_alpha=alpha, new_fref=fref)
self.sigma_spectrum.reweight(new_alpha=alpha, new_fref=fref)
self.point_estimate_spectrogram.reweight(new_alpha=alpha, new_fref=fref)
self.sigma_spectrogram.reweight(new_alpha=alpha, new_fref=fref)
else:
logger.info(
"Point estimate and sigma spectra have not been set before. Setting it now..."
)
logger.debug(
"No weighting supplied in setting of spectrum. Supplied when combining for final sigma"
)
self.set_point_estimate_sigma_spectrum(
badtimes=badtimes,
alpha=alpha,
fref=fref,
flow=flow,
fhigh=fhigh,
polarization=polarization,
apply_dsc=apply_dsc,
notch_list_path=notch_list_path,
)
self.set_frequency_mask(
notch_list_path=notch_list_path, apply_notches=apply_notches
)
if self.sigma_spectrum[0] == np.infty:
self.sigma = np.inf
self.point_estimate = 0
return
Y, sigma = calc_Y_sigma_from_Yf_sigmaf(
self.point_estimate_spectrum,
self.sigma_spectrum,
frequency_mask=self.frequency_mask,
alpha=alpha,
fref=fref,
)
self.point_estimate = Y
self.sigma = sigma
[docs] def reweight(self, new_alpha=None, new_fref=None):
"""Reweight all the frequency-weighted attributes of this Baseline, if these are set.
Parameters
=======
new_alpha: ``float``, optional
New alpha to weight the spectra to. Default is None.
new_fref: ``float``, optional
New reference frequency to refer the spectra to. Default is None.
"""
self.set_point_estimate_sigma(alpha=new_alpha, fref=new_fref)
[docs] def calculate_delta_sigma_cut(
self,
delta_sigma_cut,
alphas,
fref,
flow=20,
fhigh=1726,
notch_list_path="",
polarization="tensor",
return_naive_and_averaged_sigmas: bool = False,
):
"""
Calculate the delta sigma cut using the naive and average psds, if set in the baseline.
Parameters
=======
delta_sigma_cut: ``float``
The cutoff to implement in the delta sigma cut.
alphas: ``list``
Set of spectral indices to use in the delta sigma cut calculation.
flow: ``float``, optional
Low frequency. Default is 20 Hz.
fhigh: ``float``, optional
High frequency. Default is 1726 Hz.
notch_list_path: ``str``, optional
File path of the baseline notch list.
fref: ``int``
Reference frequency (Hz).
return_naive_and_averaged_sigmas: ``bool``, optional
Option to return naive and sliding sigmas. Default is False.
polarization: ``str``, optional
Polarization of the signal to consider (scalar, vector, tensor) for the orf calculation.
Default is tensor.
See also
--------
pygwb.delta_sigma_cut.run_dsc
Function used to run the delta sigma cut.
"""
if not self._orf_polarization_set:
self.orf_polarization = polarization
deltaF = self.frequencies[1] - self.frequencies[0]
self.crop_frequencies_average_psd_csd(flow=flow, fhigh=fhigh)
naive_psd_1 = self.interferometer_1.psd_spectrogram[
self.csd_segment_offset:-self.csd_segment_offset
]
naive_psd_2 = self.interferometer_2.psd_spectrogram[
self.csd_segment_offset:-self.csd_segment_offset
]
naive_psd_1_cropped = naive_psd_1.crop_frequencies(flow, fhigh + deltaF)
naive_psd_2_cropped = naive_psd_2.crop_frequencies(flow, fhigh + deltaF)
if notch_list_path:
self.notch_list_path = notch_list_path
if self.notch_list_path:
logger.debug(
"loading notches for delta sigma cut from " + self.notch_list_path
)
self.set_frequency_mask(self.notch_list_path)
else:
self.set_frequency_mask()
badGPStimes, delta_sigmas = run_dsc(
dsc=delta_sigma_cut,
segment_duration=self.duration,
psd1_naive=naive_psd_1_cropped,
psd2_naive=naive_psd_2_cropped,
psd1_slide=self.interferometer_1.average_psd,
psd2_slide=self.interferometer_2.average_psd,
alphas=alphas,
sample_rate=self.sampling_frequency,
orf=self.overlap_reduction_function,
fref=fref,
frequency_mask=self.frequency_mask,
window_fftgram_dict=self.window_fftgram_dict_for_bias_factors,
overlap_factor=self.overlap_factor_psd,
N_average_segments_psd = self.N_average_segments_psd,
return_naive_and_averaged_sigmas=return_naive_and_averaged_sigmas,
)
self.badGPStimes = badGPStimes
self.delta_sigmas = delta_sigmas
[docs] def set_coherence_spectrum(self, flow=20, fhigh=1726, badtimes=None, apply_dsc=True):
"""
Set the coherence spectrum between detectors, averaged over all data in the baseline.
Parameters
=======
flow: ``float``, optional
Low frequency. Default is 20 Hz.
fhigh: ``float``, optional
High frequency. Default is 1726 Hz.
badtimes: ``array_like``, optional
Array of times to exclude from the coherence calculation.
Default is None.
apply_dsc: ``bool``, optional
Apply delta sigma cut flag; if True, removes the badGPStimes from the spectra calculations.
Default is True.
Notes
-----
The coherence calculation uses averaged naive PSD estimates as the coherence is calculated using CSD and PSD estimates of each individual segment, calculated \"on shell\".
See also
--------
pygwb.coherence.calculate_coherence
"""
bad_times_indexes = self._get_bad_times_indexes(times=self.interferometer_1.psd_spectrogram.times.value, apply_dsc=apply_dsc)
deltaF = self.frequencies[1] - self.frequencies[0]
n_segs = len(self.interferometer_1.psd_spectrogram[~bad_times_indexes])
psd_1_average = np.mean(self.interferometer_1.psd_spectrogram[~bad_times_indexes].crop_frequencies(flow, fhigh + deltaF), axis=0)
psd_2_average = np.mean(self.interferometer_2.psd_spectrogram[~bad_times_indexes].crop_frequencies(flow, fhigh + deltaF), axis=0)
csd_average = np.mean(self.csd[~bad_times_indexes].crop_frequencies(flow, fhigh + deltaF), axis=0)
coherence = calculate_coherence(
psd_1_average,
psd_2_average,
csd_average,
)
epoch = self.csd.times[0]
self.coherence_spectrum = FrequencySeries(
coherence,
frequencies=self.frequencies,
name=self.name + " coherence spectrum",
epoch=epoch,
)
self.coherence_dict = {}
self.coherence_dict['psd_1_average'] = psd_1_average
self.coherence_dict['psd_2_average'] = psd_2_average
self.coherence_dict['csd_average']= csd_average
self.coherence_dict['n_segs']= n_segs
self.coherence_dict['coherence']= coherence
self.coherence_dict['frequencies']= self.frequencies
self.coherence_dict['epoch']= epoch
def _get_bad_times_indexes(self, times, badtimes=None, apply_dsc=False):
"""
Get indices for segments with bad GPS times, to be removed from analysis.
Parameters
=======
badtimes: ``array_like``, optional
Array of times to exclude from further calculation.
Default is None.
apply_dsc: ``bool``
If True, calculates the indexes of the segments with a bad GPS time, according to the delta sigma cut. If False, returns None.
"""
if apply_dsc:
if not hasattr(self, "badGPStimes"):
warnings.warn(
"Delta sigma cut has not been calculated yet, hence no delta sigma cut will be applied... If this is a mistake, please run `calculate_delta_sigma_cut` first, then re-calculate point estimate/sigma spectra."
)
if badtimes is not None:
if hasattr(self, "badGPStimes"):
badtimes = np.append(badtimes, self.badGPStimes)
self.badGPStimes = badtimes
else:
badtimes = self.badGPStimes
else:
badtimes = np.array([])
bad_times_indexes = np.array(
[np.any(t == badtimes) for t in times]
)
return bad_times_indexes
[docs] def save_point_estimate_spectra(
self,
save_data_type,
filename,
):
"""
Save the overall point estimate Y, its error bar sigma,
the frequency-dependent estimates and variances and the corresponding frequencies
in the required save_data_type, which can be npz, pickle, json or hdf5.
You can call upon this data afterwards when loaoding in using the ['key'] dictionary format.
Parameters
=======
save_data_type: ``str``
The required type of data file where the information will be stored.
filename: ``str``
The path/name of the file in which you want to save.
"""
if save_data_type == "pickle":
save = self._pickle_save
ext = ".p"
elif save_data_type == "npz":
save = self._npz_save
ext = ".npz"
elif save_data_type == "json":
save = self._json_save
ext = ".json"
elif save_data_type == "hdf5" or save_data_type == "h5":
save = self._hdf5_save
ext = ".h5"
else:
raise ValueError(
"The provided data type is not supported, try using 'pickle', 'npz', 'json' or 'hdf5' instead."
)
save(
f"{filename}{ext}",
self.frequencies,
self.frequency_mask,
self.point_estimate_spectrum,
self.sigma_spectrum,
self.point_estimate,
self.sigma,
self.point_estimate_spectrogram,
self.sigma_spectrogram,
self.badGPStimes,
self.delta_sigmas,
self.interferometer_1.gates,
self.interferometer_1.gate_pad,
self.interferometer_2.gates,
self.interferometer_2.gate_pad,
)
[docs] def save_psds_csds(
self,
save_data_type,
filename,
):
"""
Save the average and naive psds and csds and the corresponding frequencies
in the required save_data_type, which can be npz, pickle, json or hdf5.
One can call upon the data afterwards when loading in using the ['key'] dictionary format.
Parameters
=======
save_data_type: ``str``
The required type of data file where the information will be stored.
filename: ``str``
The path/name of the file in which you want to save.
"""
if save_data_type == "pickle":
save_csd = self._pickle_save_csd
ext = ".p"
elif save_data_type == "npz":
save_csd = self._npz_save_csd
ext = ".npz"
elif save_data_type == "json":
save_csd = self._json_save_csd
ext = ".json"
elif save_data_type == "hdf5" or save_data_type == "h5":
save_csd = self._hdf5_save_csd
ext = ".h5"
else:
raise ValueError(
"The provided data type is not supported, try using 'pickle', 'npz', 'json' or 'hdf5' instead."
)
try:
coherence = self.coherence_spectrum
psd_1_coh = self.coherence_dict['psd_1_average']
psd_2_coh = self.coherence_dict['psd_2_average']
csd_coh = self.coherence_dict['csd_average']
n_segs_coh = self.coherence_dict['n_segs']
except AttributeError:
coherence = None
psd_1_coh = None
psd_2_coh = None
csd_coh = None
n_segs_coh = None
save_csd(
f"{filename}{ext}",
self.csd.frequencies.value,
self.average_csd.frequencies.value,
self.csd,
self.average_csd,
self.interferometer_1.psd_spectrogram,
self.interferometer_2.psd_spectrogram,
self.interferometer_1.average_psd,
self.interferometer_2.average_psd,
coherence,
psd_1_coh,
psd_2_coh,
csd_coh,
n_segs_coh,
)
def _npz_save(
self,
filename,
frequencies,
frequency_mask,
point_estimate_spectrum,
sigma_spectrum,
point_estimate,
sigma,
point_estimate_spectrogram,
sigma_spectrogram,
badGPStimes,
delta_sigmas,
ifo_1_gates,
ifo_1_gate_pad,
ifo_2_gates,
ifo_2_gate_pad,
):
try:
naive_sigma_values = delta_sigmas["naive_sigmas"]
slide_sigma_values = delta_sigmas["slide_sigmas"]
except KeyError:
naive_sigma_values = None
slide_sigma_values = None
np.savez(
filename,
frequencies=frequencies,
frequency_mask=frequency_mask,
point_estimate_spectrum=point_estimate_spectrum,
sigma_spectrum=sigma_spectrum,
point_estimate=point_estimate,
sigma=sigma,
point_estimate_spectrogram=point_estimate_spectrogram,
sigma_spectrogram=sigma_spectrogram,
badGPStimes=badGPStimes,
delta_sigma_alphas=delta_sigmas["alphas"],
delta_sigma_times=delta_sigmas["times"],
delta_sigma_values=delta_sigmas["values"],
naive_sigma_values=naive_sigma_values,
slide_sigma_values=slide_sigma_values,
ifo_1_gates=ifo_1_gates,
ifo_1_gate_pad=ifo_1_gate_pad,
ifo_2_gates=ifo_2_gates,
ifo_2_gate_pad=ifo_2_gate_pad,
)
def _pickle_save(
self,
filename,
frequencies,
frequency_mask,
point_estimate_spectrum,
sigma_spectrum,
point_estimate,
sigma,
point_estimate_spectrogram,
sigma_spectrogram,
badGPStimes,
delta_sigmas,
ifo_1_gates,
ifo_1_gate_pad,
ifo_2_gates,
ifo_2_gate_pad,
):
save_dictionary = {
"frequencies": frequencies,
"frequency_mask": frequency_mask,
"point_estimate_spectrum": point_estimate_spectrum,
"sigma_spectrum": sigma_spectrum,
"point_estimate": point_estimate,
"sigma": sigma,
"point_estimate_spectrogram": point_estimate_spectrogram,
"sigma_spectrogram": sigma_spectrogram,
"badGPStimes": badGPStimes,
"delta_sigmas": list(delta_sigmas.items()),
"ifo_1_gates": ifo_1_gates,
"ifo_1_gate_pad": ifo_1_gate_pad,
"ifo_2_gates": ifo_2_gates,
"ifo_2_gate_pad": ifo_2_gate_pad,
}
with open(filename, "wb") as f:
pickle.dump(save_dictionary, f)
def _json_save(
self,
filename,
frequencies,
frequency_mask,
point_estimate_spectrum,
sigma_spectrum,
point_estimate,
sigma,
point_estimate_spectrogram,
sigma_spectrogram,
badGPStimes,
delta_sigmas,
ifo_1_gates,
ifo_1_gate_pad,
ifo_2_gates,
ifo_2_gate_pad,
):
list_freqs = frequencies.tolist()
list_freqs_mask = frequency_mask.tolist()
list_point_estimate_spectrum_r = np.real(point_estimate_spectrum.value).tolist()
list_point_estimate_spectrum_i = np.imag(point_estimate_spectrum.value).tolist()
list_sigma_spectrum = sigma_spectrum.value.tolist()
list_point_estimate_segment_r = np.real(
point_estimate_spectrogram.value
).tolist()
list_point_estimate_segment_i = np.imag(
point_estimate_spectrogram.value
).tolist()
point_estimate_segment_times = point_estimate_spectrogram.times.value.tolist()
list_sigma_segment = sigma_spectrogram.value.tolist()
sigma_segment_times = sigma_spectrogram.times.value.tolist()
badGPStimes_list = badGPStimes.tolist()
save_dictionary = {
"frequencies": list_freqs,
"frequency_mask": list_freqs_mask,
"point_estimate_spectrum_real": list_point_estimate_spectrum_r,
"point_estimate_spectrum_imag": list_point_estimate_spectrum_i,
"sigma_spectrum": list_sigma_spectrum,
"point_estimate": point_estimate,
"sigma": sigma,
"point_estimate_spectrogram_real": list_point_estimate_segment_r,
"point_estimate_spectrogram_imag": list_point_estimate_segment_i,
"point_estimate_spectrogram_times": point_estimate_segment_times,
"sigma_spectrogram": list_sigma_segment,
"sigma_spectrogram_times": sigma_segment_times,
"badGPStimes": badGPStimes_list,
"delta_sigma_alphas": delta_sigmas["alphas"],
"delta_sigma_values": delta_sigmas["values"].tolist(),
"ifo_1_gates": ifo_1_gates,
"ifo_1_gate_pad": ifo_1_gate_pad,
"ifo_2_gates": ifo_2_gates,
"ifo_2_gate_pad": ifo_2_gate_pad,
}
try:
save_dictionary["naive_sigma_values"] = delta_sigmas[
"naive_sigmas"
].tolist()
save_dictionary["slide_sigma_values"] = delta_sigmas[
"slide_sigmas"
].tolist()
except KeyError:
pass
with open(filename, "w") as outfile:
json.dump(save_dictionary, outfile)
def _hdf5_save(
self,
filename,
frequencies,
frequency_mask,
point_estimate_spectrum,
sigma_spectrum,
point_estimate,
sigma,
point_estimate_spectrogram,
sigma_spectrogram,
badGPStimes,
delta_sigmas,
ifo_1_gates,
ifo_1_gate_pad,
ifo_2_gates,
ifo_2_gate_pad,
compress=False,
):
hf = h5py.File(filename, "w")
if compress:
compression = "gzip"
logger.info("Data will be compressed without loss of data")
else:
compression = None
hf.create_dataset("freqs", data=frequencies, compression=compression)
hf.create_dataset("freqs_mask", data=frequency_mask, compression=compression)
hf.create_dataset(
"point_estimate_spectrum",
data=point_estimate_spectrum,
compression=compression,
)
hf.create_dataset(
"sigma_spectrum", data=sigma_spectrum, compression=compression
)
hf.create_dataset(
"point_estimate",
data=point_estimate,
dtype="float",
compression=compression,
)
hf.create_dataset("sigma", data=sigma, dtype="float", compression=compression)
hf.create_dataset(
"point_estimate_spectrogram",
data=point_estimate_spectrogram,
compression=compression,
),
hf.create_dataset(
"sigma_spectrogram", data=sigma_spectrogram, compression=compression
)
hf.create_dataset("badGPStimes", data=badGPStimes, compression=compression)
delta_sigmas_group = hf.create_group("delta_sigmas")
delta_sigmas_group.create_dataset(
"delta_sigma_alphas", data=delta_sigmas["alphas"], compression=compression
)
delta_sigmas_group.create_dataset(
"delta_sigma_times", data=delta_sigmas["times"], compression=compression
)
delta_sigmas_group.create_dataset(
"delta_sigma_values", data=delta_sigmas["values"], compression=compression
)
try:
delta_sigmas_group.create_dataset(
"naive_sigma_values",
data=delta_sigmas["naive_sigmas"],
compression=compression,
)
delta_sigmas_group.create_dataset(
"slide_sigma_values",
data=delta_sigmas["slide_sigmas"],
compression=compression,
)
except KeyError:
pass
gates_group = hf.create_group("Gated_Times")
gates_group.create_dataset(
"ifo_1_gates", data=ifo_1_gates, compression=compression
)
gates_group.create_dataset(
"ifo_2_gates", data=ifo_2_gates, compression=compression
)
hf.close()
def _npz_save_csd(
self,
filename,
freqs,
avg_freqs,
csd,
avg_csd,
psd_1,
psd_2,
avg_psd_1,
avg_psd_2,
coherence,
psd_1_coh,
psd_2_coh,
csd_coh,
n_segs_coh,
):
np.savez(
filename,
freqs=freqs,
avg_freqs=avg_freqs,
csd=csd,
avg_csd=avg_csd,
psd_1=psd_1,
psd_2=psd_2,
avg_psd_1=avg_psd_1,
avg_psd_2=avg_psd_2,
csd_times=csd.times.value,
avg_csd_times=avg_csd.times.value,
psd_times=psd_1.times.value,
avg_psd_times=avg_psd_1.times.value,
coherence=coherence,
psd_1_coh=psd_1_coh,
psd_2_coh=psd_2_coh,
csd_coh=csd_coh,
n_segs_coh=n_segs_coh,
)
def _pickle_save_csd(
self,
filename,
freqs,
avg_freqs,
csd,
avg_csd,
psd_1,
psd_2,
avg_psd_1,
avg_psd_2,
coherence,
psd_1_coh,
psd_2_coh,
csd_coh,
n_segs_coh,
):
save_dictionary = {
"freqs": freqs,
"avg_freqs": avg_freqs,
"csd": csd,
"avg_csd": avg_csd,
"psd_1": psd_1,
"psd_2": psd_2,
"avg_psd_1": avg_psd_1,
"avg_psd_2": avg_psd_2,
"coherence": coherence,
"psd_1_coh": psd_1_coh,
"psd_2_coh": psd_2_coh,
"csd_coh": csd_coh,
"n_segs_coh": n_segs_coh,
}
# with open(filename, "wb") as f:
# pickle.dump(saveObject, f)
with open(filename, "wb") as f:
pickle.dump(save_dictionary, f)
def _json_save_csd(
self,
filename,
freqs,
avg_freqs,
csd,
avg_csd,
psd_1,
psd_2,
avg_psd_1,
avg_psd_2,
coherence,
psd_1_coh,
psd_2_coh,
csd_coh,
n_segs_coh,
):
"""
It seems that saving spectrograms in json does not work, hence everything is converted into a list and saved that way in the json file.
A second issue is that json does not seem to recognise complex values, hence the csd is split up into a real and imaginary part.
When loading in this json file, one needs to 'reconstruct' the csd as a spectrogram using these two lists and the times and frequencies.
"""
list_freqs = freqs.tolist()
list_avg_freqs = avg_freqs.tolist()
list_csd = csd.value.tolist()
real_csd = np.zeros(np.shape(list_csd))
imag_csd = np.zeros(np.shape(list_csd))
for index, row in enumerate(list_csd):
for j, elem in enumerate(row):
real_csd[index, j] = elem.real
imag_csd[index, j] = elem.imag
real_csd_list = real_csd.tolist()
imag_csd_list = imag_csd.tolist()
csd_times = csd.times.value.tolist()
list_avg_csd = avg_csd.value.tolist()
real_avg_csd = np.zeros(np.shape(list_avg_csd))
imag_avg_csd = np.zeros(np.shape(list_avg_csd))
for index, row in enumerate(list_avg_csd):
for j, elem in enumerate(row):
real_avg_csd[index, j] = elem.real
imag_avg_csd[index, j] = elem.imag
real_avg_csd_list = real_avg_csd.tolist()
imag_avg_csd_list = imag_avg_csd.tolist()
avg_csd_times = avg_csd.times.value.tolist()
list_psd_1 = psd_1.value.tolist()
psd_times = psd_1.times.value.tolist()
list_psd_2 = psd_2.value.tolist()
psd_2_times = psd_2.times.value.tolist()
list_avg_psd_1 = avg_psd_1.value.tolist()
avg_psd_times = avg_psd_1.times.value.tolist()
list_avg_psd_2 = avg_psd_2.value.tolist()
avg_psd_2_times = avg_psd_2.times.value.tolist()
if coherence:
list_coherence = coherence.value.tolist()
list_psd_1_coh = psd_1_coh.value.tolist()
list_psd_2_coh = psd_2_coh.value.tolist()
list_csd_coh = csd_coh.value.tolist()
real_csd_coh = np.zeros(np.shape(list_csd_coh))
imag_csd_coh = np.zeros(np.shape(list_csd_coh))
for ix, elem in enumerate(list_csd_coh):
real_csd_coh[ix] = elem.real
imag_csd_coh[ix] = elem.imag
real_csd_coh_list = real_csd_coh.tolist()
imag_csd_coh_list = imag_csd_coh.tolist()
list_n_segs_coh = n_segs_coh
else:
list_coherence = None
list_psd_1_coh = None
list_psd_2_coh = None
list_csd_coh = None
real_csd_coh_list = None
imag_csd_coh_list = None
list_n_segs_coh = None
save_dictionary = {
"frequencies": list_freqs,
"avg_frequencies": list_avg_freqs,
"csd_real": real_csd_list,
"csd_imag": imag_csd_list,
"csd_times": csd_times,
"avg_csd_real": real_avg_csd_list,
"avg_csd_imag": imag_avg_csd_list,
"avg_csd_times": avg_csd_times,
"psd_1": list_psd_1,
"psd_1_times": psd_times,
"psd_2": list_psd_2,
"psd_2_times": psd_2_times,
"avg_psd_1": list_avg_psd_1,
"avg_psd_1_times": avg_psd_times,
"avg_psd_2": list_avg_psd_2,
"avg_psd_2_times": avg_psd_2_times,
"coherence": list_coherence,
"psd_1_coh": list_psd_1_coh,
"psd_2_coh": list_psd_2_coh,
"csd_coh_real": real_csd_coh_list,
"csd_coh_imag": real_csd_coh_list,
"n_segs_coh": list_n_segs_coh,
}
with open(filename, "w") as outfile:
json.dump(save_dictionary, outfile)
def _hdf5_save_csd(
self,
filename,
freqs,
avg_freqs,
csd,
avg_csd,
psd_1,
psd_2,
avg_psd_1,
avg_psd_2,
coherence,
psd_1_coh,
psd_2_coh,
csd_coh,
n_segs_coh,
compress=False,
):
hf = h5py.File(filename, "w")
csd_times = csd.times.value
psd_1_times = psd_1.times.value
psd_2_times = psd_2.times.value
avg_csd_times = avg_csd.times.value
avg_psd_1_times = avg_psd_1.times.value
avg_psd_2_times = avg_psd_2.times.value
if compress:
compression = "gzip"
else:
compression = None
hf.create_dataset("freqs", data=freqs, compression=compression)
hf.create_dataset("avg_freqs", data=avg_freqs, compression=compression)
csd_group = hf.create_group("csd_group")
csd_group.create_dataset("csd", data=csd, compression=compression)
csd_group.create_dataset("csd_times", data=csd_times, compression=compression)
avg_csd_group = hf.create_group("avg_csd_group")
avg_csd_group.create_dataset("avg_csd", data=avg_csd, compression=compression)
avg_csd_group.create_dataset(
"avg_csd_times", data=avg_csd_times, compression=compression
)
psd_group = hf.create_group("psds_group")
psd_1_group = hf.create_group("psds_group/psd_1")
psd_1_group.create_dataset("psd_1", data=psd_1, compression=compression)
psd_1_group.create_dataset(
"psd_1_times", data=psd_1_times, compression=compression
)
psd_2_group = hf.create_group("psds_group/psd_2")
psd_2_group.create_dataset("psd_2", data=psd_2, compression=compression)
psd_2_group.create_dataset(
"psd_2_times", data=psd_2_times, compression=compression
)
avg_psd_group = hf.create_group("avg_psds_group")
avg_psd_1_group = hf.create_group("avg_psds_group/avg_psd_1")
avg_psd_1_group.create_dataset(
"avg_psd_1", data=avg_psd_1, compression=compression
)
avg_psd_1_group.create_dataset(
"avg_psd_1_times", data=avg_psd_1_times, compression=compression
)
avg_psd_2_group = hf.create_group("avg_psds_group/avg_psd_2")
avg_psd_2_group.create_dataset(
"avg_psd_2", data=avg_psd_2, compression=compression
)
avg_psd_2_group.create_dataset(
"avg_psd_2_times", data=avg_psd_2_times, compression=compression
)
if coherence:
hf.create_dataset("coherence", data=coherence, compression=compression)
hf.create_dataset("psd_1_coherence", data=psd_1_coh, compression=compression)
hf.create_dataset("psd_2_coherence", data=psd_2_coh, compression=compression)
hf.create_dataset("csd_coherence", data=csd_coh, compression=compression)
hf.create_dataset("n_segs_coherence", data=n_segs_coh, compression=compression)
hf.close()
[docs]def get_baselines(interferometers, frequencies=None):
"""
Get set of Baseline objects given a list of interferometers.
Parameters
=======
interferometers: ``list``
List of bilby.interferometer objects.
frequencies: ``array_like``, optional
Frequencies to construct the baseline with. Defaults to None.
"""
Nd = len(interferometers)
combo_tuples = []
for j in range(1, Nd):
for k in range(j):
combo_tuples.append((k, j))
baselines = []
for i, j in combo_tuples:
base_name = f"{interferometers[i].name} - {interferometers[j].name}"
baselines.append(
Baseline(
base_name,
interferometers[i],
interferometers[j],
frequencies=frequencies,
)
)
return baselines