""" .. _Spectrum-api:
**Spectrum** --- Spectral line data container.
----------------------------------------------
This module defines the Spectrum class.
"""
# system imports
import numpy as np
import numpy.ma as ma
import copy
import math
# ADMIT imports
from admit.util.AdmitLogging import AdmitLogging as logging
import utils
[docs]class Spectrum(object):
""" Class for holding a spectrum. It holds entries for the spectrum,
frequency axis, channel axis, vlsr, and rest_frequency. A mask
is also available to mask bad data points. The mask is common to
all axes.
A spectrum needs to hold at least 2 channels.
Parameters
----------
spec : array like
The spectrum.
freq : array like
The frequency axis, one entry per spectral point.
chans : array like
The channel axis, one entry per spectral point.
contin : array like or float
The continuum for each channel, or for all channels if it is an
int.
noise : float
The rms noise of the spectrum.
mask : array like or bool
The mask for the spectrum, one entry per spectral point
or a single boolean value, Defaults to all good.
"""
def __init__(self, spec=None, freq=None, chans=None, contin=None, noise=None, mask=[False]):
domask = True
self._mask = None
self._freq = None
self._chans = None
self._spec = None
self._contin = None
self._noise = None
self._delta = None
if spec is not None:
self.set_spec(spec, mask)
domask = False
if freq is not None:
self.set_freq(freq)
if noise is not None:
self._noise = noise
if chans is not None:
self.set_chans(chans)
if contin is not None:
self.set_contin(contin)
if domask:
self.set_mask(mask)
self.integritycheck()
def __len__(self):
return len(self._spec)
[docs] def spec(self, csub=True, masked=True):
""" Method to get the spectrum, the mask is optionally returned
Parameters
----------
csub : bool
If True return the spectrum with the continuum subtracted,
False will return the pure spectrum
Default: True
masked : bool
If True then return the spectrum as a masked array with
the current mask applied. False will return a numpy array
with no masking done.
Default: True
Returns
-------
Array like object containing the spectrum
"""
data = copy.deepcopy(self._spec)
if csub and self._contin is not None and len(self._contin) > 0:
data -= self._contin
if masked:
return ma.masked_array(data, mask=self._mask)
return data
[docs] def freq(self, masked=True):
""" Method to get the frequency axis, the mask is optionally returned
Parameters
----------
masked : bool
If True then return the frequency axis as a masked array with
the current mask applied. False will return a numpy array
with no masking done.
Returns
-------
Array like object containing the frequency axis
"""
if masked:
return ma.masked_array(self._freq, mask=self._mask)
return self._freq
[docs] def contin(self, masked=True):
""" Method to get the continuum, the mask is optionally returned
Parameters
----------
masked : bool
If True then return the continuum as a masked array with
the current mask applied. False will return a numpy array
with no masking done.
Returns
-------
Array like object containing the continuum
"""
if masked:
return ma.masked_array(self._contin, mask=self._mask)
return self._contin
[docs] def noise(self):
""" Method to return the noise of the spectrum.
Parameters
----------
None
Returns
-------
Float of the rms noise.
"""
return self._noise
[docs] def rms(self):
"""If the noise has been set, return the noise, otherwise
compute and return the root-mean-square value of the spectrum.
"""
if self._noise == None:
return np.sqrt(np.mean(np.square(self.spec())))
else:
return self._noise
[docs] def delta(self):
if self._delta is None:
self.calcdelta()
else:
return self._delta
[docs] def getfreq(self, chan):
""" Method to get the frequency of the given channel, the channel can
contain a fraction of a channel (e.g. 1.45)
Parameters
----------
chan : float
The channel to convert
Returns
-------
Float of the frequency corresponding to the channel
"""
if chan > np.max(self._chans):
return self._freq[self._chans[np.argmax(self._chans)]]
if chan < np.min(self._chans):
return self._freq[self._chans[np.argmin(self._chans)]]
ichan = int(chan)
ch = np.where(self._chans == ichan)[0][0]
f1 = self._freq[ch]
if np.where(self._chans == np.max(self._chans))[0][0] > ch + 1:
f2 = self._freq[ch + 1]
else:
f2 = self._freq[ch - 1]
frac = chan - float(ichan)
return f1 + frac * (f2 - f1)
[docs] def getchanindex(self, chan):
"""
"""
if ma.max(self._chans) >= chan >= ma.min(self._chans):
return ma.where(self._chans == int(chan))[0][0]
else:
return -1
[docs] def getchan(self, frq):
""" Method to get the channel number given the frequency
Parameters
----------
frq : float
The frequency whose channel number is to be found
Returns
-------
int containing the channel number, will be 0 or len(self.x)-1 if
the frequency is outside of the frequency range in the class
"""
if frq <= np.min(self._freq):
return int(self._chans[np.where(self._freq == np.min(self._freq))[0][0]])
if frq >= np.max(self._freq):
return int(self._chans[np.where(self._freq == np.max(self._freq))[0][0]])
for i in range(len(self._freq) - 1):
if self._freq[i] <= frq < self._freq[i + 1] or \
self._freq[i] >= frq > self._freq[i + 1]:
return int(self._chans[i])
[docs] def chans(self, masked=True):
""" Method to get the channel axis, the mask is optionally returned
Parameters
----------
masked : bool
If True then return the channel axis as a masked array with
the current mask applied. False will return a numpy array
with no masking done.
Returns
-------
Array like object containing the channel axis
"""
if masked:
return ma.masked_array(self._chans, mask=self._mask)
return self._chans
[docs] def mask(self):
""" Method to return the mask as a numpy array
Parameters
----------
None
Returns
-------
Array like object containing the mask
"""
return self._mask
[docs] def mask_equal(self, value, axis="spec"):
""" Method to mask data equal to the given value.
Any axis can be used ("spec", "freq", "chans") as
the basis for the masking.
Parameters
----------
value : float or int
The value equal to which all data of the given axis
will be masked.
axis : str
The axis which is used to determine the flags.
Default: "spec"
Returns
-------
None
"""
if not self.integritycheck():
logging.warning(" Masking operation not performed.")
return
mask = getattr(self, "_" + axis, None) == value
self._mask = np.logical_or(self._mask, mask)
[docs] def mask_lt(self, limit, axis="spec"):
""" Method to mask any data less than the given value.
Any axis can be used ("spec", "freq", "chans") as
the basis for the masking.
Parameters
----------
limit : float or int
The value below which all data of the given axis
will be masked.
axis : str
The axis which is used to determine the flags.
Default: "spec"
Returns
-------
None
"""
if not self.integritycheck():
logging.warning(" Masking operation not performed.")
return
mask = getattr(self, "_" + axis, None) < limit
self._mask = np.logical_or(self._mask, mask)
[docs] def mask_gt(self, limit, axis="spec"):
""" Method to mask any data greater than the given value.
Any axis can be used ("spec", "freq", "chans") as
the basis for the masking.
Parameters
----------
limit : float or int
The value above which all data of the given axis
will be masked.
axis : str
The axis which is used to determine the flags.
Default: "spec"
Returns
-------
None
"""
if not self.integritycheck():
logging.warning(" Masking operation not performed.")
return
mask = getattr(self, "_" + axis, None) > limit
self._mask = np.logical_or(self._mask, mask)
[docs] def mask_le(self, limit, axis="spec"):
""" Method to mask any data less than or erqual to the given value.
Any axis can be used ("spec", "freq", "chans") as
the basis for the masking.
Parameters
----------
limit : float or int
The value which all data, of the given axis,
less than or equal to, will be masked.
axis : str
The axis which is used to determine the flags.
Default: "spec"
Returns
-------
None
"""
if not self.integritycheck():
logging.warning(" Masking operation not performed.")
return
mask = getattr(self, "_" + axis, None) <= limit
self._mask = np.logical_or(self._mask, mask)
[docs] def mask_ge(self, limit, axis="spec"):
""" Method to mask any data less than or equal to the given value.
Any axis can be used ("spec", "freq", "chans") as
the basis for the masking.
Parameters
----------
limit : float or int
The value which all data, of the given axis,
greater than or equal to, will be masked.
axis : str
The axis which is used to determine the flags.
Default: "spec"
Returns
-------
None
"""
if not self.integritycheck():
logging.warning(" Masking operation not performed.")
return
mask = getattr(self, "_" + axis, None) >= limit
self._mask = np.logical_or(self._mask, mask)
[docs] def mask_between(self, limit1, limit2, axis="spec"):
""" Method to mask any data bewteen the the given values.
Any axis can be used ("spec", "freq", "chans") as
the basis for the masking.
Parameters
----------
limit1 : float or int
The value above which all data of the given axis
will be masked.
limit2 : float or int
The value below which all data of the given axis
will be masked.
axis : str
The axis which is used to determine the flags.
Default: "spec"
Returns
-------
None
"""
if not self.integritycheck():
logging.warning(" Masking operation not performed.")
return
mask1 = limit1 < getattr(self, "_" + axis, None)
mask2 = getattr(self, "_" + axis, None) < limit2
mask = np.logical_and(mask1, mask2)
self._mask = np.logical_or(self._mask, mask)
[docs] def mask_outside(self, limit1, limit2, axis="spec"):
""" Method to mask any data less than or greater than the given values.
Any axis can be used ("spec", "freq", "chans") as
the basis for the masking.
Parameters
----------
limit1 : float or int
The value below which all data of the given axis
will be masked.
limit2 : float or int
The value above which all data of the given axis
will be masked.
axis : str
The axis which is used to determine the flags.
Default: "spec"
Returns
-------
None
"""
if not self.integritycheck():
logging.warning(" Masking operation not performed.")
return
mask1 = getattr(self, "_" + axis, None) < limit1
mask2 = limit2 < getattr(self, "_" + axis, None)
mask = np.logical_and(mask1, mask2)
self._mask = np.logical_or(self._mask, mask)
[docs] def set_noise(self, noise):
""" Method to set the noise value for the spectrum.
Parameters
----------
noise : float
The noise of the spectrum
Returns
-------
None
"""
self._noise = noise
[docs] def set_mask(self, mask):
""" Method to set the mask item
Parameters
----------
mask : array like or bool
The mask to use, must be equal in dimmension to
the spectral axis, or a single boolean value which
is applied to all data.
Returns
-------
None
"""
tempmask = self._mask
if isinstance(mask, ma.masked_array):
if len(self._spec) > len(mask) > 1:
raise
elif len(mask) == 1:
self._mask = np.array([mask.data[0]] * len(self._spec))
else:
self._mask = np.array(mask.data)
elif isinstance(mask, list) or isinstance(mask, np.ndarray):
if len(self._spec) > len(mask) > 1:
raise
elif len(mask) == 1:
self._mask = np.array([mask[0]] * len(self._spec))
else:
self._mask = np.array(mask)
if self.integritycheck():
return
logging.warning(" Mask not applied")
self._mask = tempmask
[docs] def set_spec(self, spec, mask=None):
""" Method to set the spectrum, an optionally the mask.
Parameters
----------
spec : array like
The spectrum to set
mask : array like or single bool
The mask to apply to the spectrum. Default: None (no mask)
Returns
-------
None
"""
if isinstance(spec, list):
self._spec = np.array(spec)
if mask is not None:
self.set_mask(mask)
else:
self.set_mask([False])
elif isinstance(spec, ma.masked_array):
self._spec = spec.data
if isinstance(spec.mask, bool) or isinstance(spec.mask, np.bool_):
self._mask = np.array([spec.mask] * len(spec.data))
else:
self._mask = spec.mask
elif isinstance(spec, np.ndarray):
self._spec = spec
if mask is not None:
self.set_mask(mask)
else:
self.set_mask([False])
else:
raise
[docs] def set_freq(self, freq):
""" Method to set the frequency axis.
Parameters
----------
freq : array like
The frequency axis to use
Returns
-------
None
"""
if isinstance(freq, list):
self._freq = np.array(freq)
elif isinstance(freq, ma.masked_array):
self._freq = freq.data
elif isinstance(freq, np.ndarray):
self._freq = freq
else:
raise
self.calcdelta()
[docs] def set_contin(self, contin):
""" Method to set the continuum.
Parameters
----------
freq : array like
The continuum to use
Returns
-------
None
"""
if isinstance(contin, list):
self._contin = np.array(contin)
elif isinstance(contin, float) or isinstance(contin, int):
self.contin = np.array([contin] * len(self._spec))
elif isinstance(contin, ma.masked_array):
self._contin = contin.data
elif isinstance(contin, np.ndarray):
self._contin = contin
else:
raise
[docs] def set_chans(self, chans):
""" Method to set the channel axis.
Parameters
----------
chans : array like
The channel axis to use
Returns
-------
None
"""
if isinstance(chans, list):
self._chans = np.array(chans, dtype=np.int)
elif isinstance(chans, ma.masked_array):
self._chans = chans.data.astype(np.int, copy=True)
elif isinstance(chans, np.ndarray):
self._chans = chans.astype(np.int, copy=True)
else:
raise
[docs] def calcdelta(self):
if self._freq is None:
raise Exception("calcdelta: no freq set")
if len(self._freq) == 1:
raise Exception("calcdelta: frequency axis 1, continuum?")
for f in range(len(self._freq) - 1):
if not self._mask[f] and not self._mask[f + 1]:
self._delta = abs(self._freq[f] - self._freq[f + 1])
return
raise Exception("calcdelta")
[docs] def centerfreq(self):
cen = int(len(self._freq) / 2)
for i in range(cen - 1):
if not self._mask[cen + i]:
return self._freq[cen + i]
if not self._mask[cen - i]:
return self._freq[cen - i]
raise
[docs] def fix_invalid(self, mask_value=0.0):
""" Method to replace invalid spectral data with a specific value
and mask it. Invalid values are: NaN, Inf, -Inf
Parameters
----------
mask_value : float
The value to replace the invalid data with.
Default: 0.0
Returns
-------
None
"""
if not self.integritycheck():
logging.warning(" Masking operation not performed.")
return
mask = np.isfinite(self._spec)
for i in range(len(mask)):
if not mask[i]:
self._spec[i] = mask_value
self._mask = np.logical_or(self._mask, np.logical_not(mask))
[docs] def mask_invalid(self):
""" Method to mask all invalid spectral data.
Invalid values are: NaN, Inf, -Inf
Parameters
----------
None
Returns
-------
None
"""
if not self.integritycheck():
logging.warning(" Masking operation not performed.")
return
mask = np.isfinite(self._spec)
self._mask = np.logical_or(self._mask, np.logical_not(mask))
[docs] def integritycheck(self):
""" Method to check that all axes are the same length. Axes
that have no data are ignored.
Parameters
----------
None
Returns
-------
Bool, True if all match, False if there is an inconsistency
"""
if self._spec is not None:
ls = len(self._spec)
else:
ls = 0
if self._freq is not None:
lf = len(self._freq)
else:
lf = ls
if self._chans is not None:
lc = len(self._chans)
else:
lc = ls
if self._mask is not None:
lm = len(self._mask)
else:
lm = ls
if self._contin is not None:
lco = len(self._contin)
else:
lco = ls
if ls == lf == lc == lm == lco:
return True
logging.warning("Inconsistent axes: spectrum: %i, freq: %i, chans: %i, mask: %i, contin: %i" % (ls, lf, lc, lm, lco))
return False
def _sanitizechanrange(self,chanrange,chupper):
if chanrange == None:
chanrange = [ 0, chupper ]
else:
if chanrange[0] < 0 or chanrange[0] > chupper or chanrange[1] < 0 or chanrange[1] > chupper:
msg = "Bad input channel range %s. Available range is [0,%d]" % (chanrange,chupper)
raise Exception, msg
return chanrange
[docs] def momenti(self,chanrange=None,p=1):
"""Intensity-weighted moment
Note this moment does poorly for very narrow line segments where
some channels may be negative."
Parameters
----------
chanrange: range of channels over which to compute moment
[startchan, endchan]
p: the moment to compute (the power of the frequency in the sum)
Returns:
The computed moment
"""
# get the masked array
s = self.spec()
chupper = len(s)-1
chanrange = self._sanitizechanrange(chanrange,chupper)
sum_s = ma.sum(s[chanrange[0]:chanrange[1]+1])
sum_sf = 0
mean = 0
if p > 1:
mean = self.moment(chanrange,p-1)
for i in range(chanrange[0],chanrange[1]+1):
sum_sf += s[i]*math.pow((self._freq[i]-mean),p)
return sum_sf/sum_s
[docs] def momenta(self,chanrange=None,p=1):
"""abs(intensity)-weighted moment
Does somewhat better than signed intensity-weighted moment.
Parameters
----------
chanrange: range of channels over which to compute moment
[startchan, endchan]
p: the moment to compute (the power of the frequency in the sum)
Returns:
The computed moment
"""
# get the masked array
s = self.spec()
chupper = len(s)-1
chanrange = self._sanitizechanrange(chanrange,chupper)
sum_s = ma.sum(ma.abs(s[chanrange[0]:chanrange[1]+1]))
sum_sf = 0
mean = 0
if p > 1:
mean = self.moment(chanrange,p-1)
for i in range(chanrange[0],chanrange[1]+1):
sum_sf += ma.abs(s[i])*math.pow((self._freq[i]-mean),p)
return sum_sf/sum_s
[docs] def moment(self,chanrange=None,p=1):
"""Compute the power-weighted moment of a spectrum
If a mask exists, this function operates on the masked spectrum.
f_mean = Sum(spec[i]**2*(f[i]-mom{p-1})^p])/Sum(spec[i]**2)
where spec[i] is the intensity at channel i (spec[i]**2 is he power)
and f[i] is the frequency at channel i, p is the moment power,
and mom{p-1} is the p-1-th moment [for p >1].
Parameters
----------
chanrange: range of channels over which to compute moment
[startchan, endchan]
p: the moment to compute (the power of the frequency in the sum)
Returns:
The computed moment
"""
# get the masked array
s = self.spec()
chupper = len(s)-1
chanrange = self._sanitizechanrange(chanrange,chupper)
sum_s = ma.sum(s[chanrange[0]:chanrange[1]+1]*s[chanrange[0]:chanrange[1]+1])
sum_sf = 0
mean = 0
if p > 1:
mean = self.moment(chanrange,p-1)
for i in range(chanrange[0],chanrange[1]+1):
sum_sf += s[i]*s[i]*math.pow((self._freq[i]-mean),p)
return sum_sf/sum_s
[docs] def meanfrequency(self,chanrange=None):
"""Compute the power-weighted mean frequency of this spectrum,
aka the first moment.
If a channel range is given, the mean frequency is computed
over that channel range, otherwise over the entire spectrum.
If a mask exists, this function operates on the masked spectrum.
Parameters
----------
chanrange: range of channels over which to compute frequency
[startchan, endchan]
Returns
-------
Power-weighted mean frequency as computed by
f_mean = Sum(spec[i]**2*f[i]])/Sum(spec[i]**2)
where spec[i] is the intensity at channel i and f[i] is
the frequency at channel i.
"""
return self.moment(chanrange,1)
[docs] def meanvelocity(self,chanrange=None):
"""Compute the power-weighted mean velocity of this spectrum.
derived from the mean frequency and channel width:
mean_v = C * delta()/meanfrequency(chanrange)
"""
mf = self.meanfrequency(chanrange)
return self.moment(chanrange,1) * utils.freqtovel(mf,self.delta())
[docs] def freqdispersion(self,chanrange=None):
"""Compute the frequency dispersion of the spectrum in the given
channel range.
If a mask exists, this function operates on the masked spectrum.
Parameters
----------
chanrange: range of channels over which to compute dispersion
[startchan, endchan]
Returns
-------
Power-weighted frequency dispersion df as computed by
df = sqrt(mom2)
where mom2 is the 2nd moment as computed by the moment() method.
"""
#MWP: I don't understand why I need an extra factor of sqrt(2) here
# when using power spectrum weighting in moment()!
return math.sqrt(2*self.moment(chanrange,2))
[docs] def veldispersion(self,chanrange=None):
"""Compute the velocity dispersion (km/s) of the spectrum in the given
channel range.
If a mask exists, this function operates on the masked spectrum.
Parameters
----------
chanrange: range of channels over which to compute dispersion
[startchan, endchan]
Returns
-------
Power-weighted velocity dv as computed by
dv = C * df/mf
where df is the frequency dispersion computered by
the frequencydispersion() method,
mf is the mean frequency from the meanfrequency() method,
and C is the speed of light.
"""
mf = self.meanfrequency(chanrange)
return utils.freqtovel(mf,self.freqdispersion(chanrange))
[docs] def fwhm(self,chanrange=None):
"""Compute the full-width at half-maximum velocity (km/s).
If a mask exists, this function operates on the masked spectrum.
Parameters
----------
chanrange: range of channels over which to compute dispersion
[startchan, endchan]
Returns
-------
fwhm computed by
fwhm = dv * sqrt(8*ln(2))
where dv is the velocity dispersion computered by
the veldispersion() method.
"""
return self.veldispersion(chanrange)*math.sqrt(8*math.log(2))
[docs] def peak(self,chanrange=None):
"""Return the peak intensity in the given channel range
If a mask exists, this function operates on the masked spectrum.
Parameters
----------
chanrange: range of channels over which to compute dispersion
[startchan, endchan]
Returns
----------
Maximum of the absolute value of the spectrum in the channel range
max(abs(spectrum[startchan:endchan]))
"""
s = self.spec()
chupper = len(s)-1
chanrange = self._sanitizechanrange(chanrange,chupper)
# Handle one-channel ranges.
if (chanrange[0] == chanrange[1]):
return s[chanrange[0]]
return ma.max(ma.abs(s[chanrange[0]:chanrange[1]]))
[docs] def invert(self):
"""Multiply this spectrum and continuum by -1
Parameters
----------
None
Returns
----------
None
"""
if self._spec is not None:
self._spec *= -1.
if self._contin is not None:
self._contin *= -1.