Source code for admit.util.segmentfinder.ASAPSegmentFinder

""" .. _asapsegment:

    ASAPSegmentFinder --- Finds segments of emission within a spectrum.
    -------------------------------------------------------------------

    This module defines the class for spectral line segment detection.
"""
#*******************************************************************************
# ALMA - Atacama Large Millimeter Array
# Copyright (c) ATC - Astronomy Technology Center - Royal Observatory Edinburgh, 2011
# (in the framework of the ALMA collaboration).
# All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful, 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
#*******************************************************************************

import os
import numpy as np
import copy
import math
import numpy.ma as ma

# Fake linefinder for Sphinx.
try:
    import taskinit
    import asap
    from asap.asaplinefind import linefinder
except:
    print "WARNING: No ASAP; ASAPLineFinder cannot function."
    class linefinder(object):
        def __init__(self):
            pass
    class tb(object):
        def __init__(self):
            pass
        def create(self):
            return


[docs]class ASAPSegmentFinder(linefinder): """ The AdmitLineFinder class inherits from the asap linefinder class. It takes a spectrum (list, CubeSpectrum_BDP, or CubeStats_BDP) and turns it into a scantable. The scantable is then search for spectral lines, with the given parameters. Parameters ---------- name : string Name of the temporary scantable file. Default: "". spec : various Input spectrum, can be a list, CubeSpectrum_BDP, or CubeStats_BDP. Default: None. Attributes ---------- nchan : int Number of spectral channels. name : string Name of the temporary scantable. spectrum : list Spectrum to be analyzed. lines_merged : list Merged list of overlapping lines. tb : casa table Casa table that is converted to a scantable. """ def __init__(self, **keyval): linefinder.__init__(self) self.name = "hlinefinder.tmp.asap" self.spec = None self.vals = ["threshold", "min_nchan", "avg_limit", "box_size", "noise_box", "noise_stat"] for v in ["name", "spec"]: try: setattr(self, v, keyval[v]) except KeyError: pass self.nchan = 0 if self.spec is not None: self.nchan = len(self.spec) self.scantab = None self.spectrum = [] self.freq = [] self.lines_merged = [] self.tb = taskinit.tbtool() self.tb.create() # create dummy scantable self.init() if self.spec is not None: self.set_spectrum() self.set_options(keyval) def __del__(self): """ Destructor """ del self.tb
[docs] def init(self): """ Create dummy scantable to work with linefinder. Parameters ---------- None Returns ------- None Notes ----- Should not be directly called. """ # remove old table if it exists if os.path.exists(self.name): os.system('\\rm -rf %s' % (self.name)) # set ip a scantable s = asap._asap.Scantable(False) s._save(self.name) del s # set up a casa table self.tb.open(self.name, nomodify=False) self.tb.addrows(1) if self.nchan != 0: self.tb.putcell('SPECTRA', 0, np.zeros(self.nchan, float)) self.tb.putcell('FLAGTRA', 0, np.zeros(self.nchan, int)) self.tb.close() # make sure dummy scantable is loaded on memory storageorg = asap.rcParams['scantable.storage'] asap.rcParams['scantable.storage'] = 'memory' self.scantab = asap.scantable(self.name, False) os.system('\\rm -rf %s' % (self.name)) asap.rcParams['scantable.storage'] = storageorg
[docs] def set_spectrum(self): """ Set the spectrum you want to search for lines. If the input spectrum was passed to the constructor there is no need to call this routine. Parameters ---------- None Returns ------- None """ if isinstance(self.spec, list): self.spectrum = self.spec self.mask = None #elif inspect.isclass(spec) and issubclass(spec, BDPB): elif isinstance(self.spec, np.array): self.spectrum = self.spec.tolist() self.mask = None elif isinstance(self.spec, ma.array): self.spectrum = self.spec.data.tolist() self.mask = self.spec.mask.tolist() mask = [] # initialize some things if self.nchan != 0: #print 'call init() again' self.nchan = len(self.spectrum) self.init() self.scantab._setspectrum(self.spectrum) elif self.nchan != len(self.spectrum): #print 'nchan differ, call init() again' self.nchan = len(self.spectrum) self.init() self.scantab._setspectrum(self.spectrum) del self.finder self.finder = asap._asap.linefinder() self.set_options() else: #print 'set spectrum' self.scantab._setspectrum(self.spectrum) # set the scantable linefinder.set_scan(self, self.scantab)
[docs] def merge_lines(self, lines, frac=0.25): """ Merge lines if those are close enough. Parameters ---------- lines: list line list detected by linefinder algorithm frac: float Criterion for merge as a fraction of line width for narrower line. Default: 0.25 Returns ------- The new number of lines """ nlines = len(lines) / 2 #print 'nlines = ', nlines if nlines == 0 or nlines == 1: return lines self.lines_merged = [] merge = [] for i in xrange(nlines - 1): width = min((lines[2 * i + 1] - lines[2 * i]), (lines[2 * i + 3] - lines[2 * i + 2])) sep = lines[2 * i + 2] - lines[2 * i + 1] if sep < width * frac: merge.append(True) else: merge.append(False) #print 'merge = ', merge if merge.count(True) == 0: self.lines_merged = lines return nlines else: self.lines_merged.append(lines[0]) for i in xrange(len(merge)): if merge[i]: continue else: self.lines_merged.append(lines[2 * i + 1]) self.lines_merged.append(lines[2 * i + 2]) self.lines_merged.append(lines[2 * nlines - 1]) return len(self.lines_merged)
[docs] def get_merged_ranges(self): """ Get a list of merged line ranges. Parameters ---------- None Returns ------- List of merged lines """ return self.lines_merged
[docs] def set_options(self, **keyval): """ Set the options for the linefinding algorithm. Parameters ---------- threshold : float A single channel S/N ratio above which the channel is considered to be a detection. Default is sqrt(3), which together with min_nchan=3 gives a 3-sigma criterion. Default: sqrt(3) min_chan : int A minimal number of consequtive channels, which should satisfy the threshold criterion to be a detection. Default: 3 avg_limit : int A number of consequtive channels not greater than this parameter can be averaged to search for broad lines. Default: 8 box_size : float A running mean box size specified as a fraction of the total spectrum length. Default: 0.2 noise_box : string/float Area of the spectrum used to estimate noise stats. Both string values and numbers are allowed. Allowed string values: 'all' use all the spectrum, 'box' noise box is the same as running mean/median box Numeric values are defined as a fraction from the spectrum size. Values should be positive. (noise_box == box_size has the same effect as noise_box = 'box'). Default: 'all' noise_stat : string Statistics used to estimate noise, allowed values: 'mean80' use the 80% of the lowest deviations in the noise box, 'median' median of deviations in the noise box Default: 'mean80' Returns ------- None """ items = {} self.threshold = math.sqrt(3) for v in self.vals: try: items[v] = keyval[v] if v == "threshold": self.threshold = keyval[v] except KeyError: pass linefinder.set_options(self, **items)
[docs] def makepairs(self, inp): """ Method to turn a list into a list of pairs Parameters ---------- inp : list The list to convert Returns ------- List of pairs. """ data = [] if len(inp) % 2 != 0: raise Exception("Input list must have even number of inputs") for i in range(0, len(inp), 2): data.append([inp[i], inp[i + 1]]) return data
[docs] def find(self, **keyval): """ Method to do the actual searching Parameters ---------- merge : Boolean Whether or not to merge overlapping lines Deafult: False Returns ------- tuple containing the frequency ranges, channel ranges, and rms, and mean (forced 0.0) """ self.mask = [] self.merge = False vals = ["mask", "merge"] for v in vals: try: setattr(self, v, keyval[v]) except KeyError: pass spec = copy.deepcopy(self.spectrum) #print "SP ", len(spec) nlines = self.find_lines(mask=self.mask) #print "LINES ", nlines if nlines == 0: if self.freq == []: return [], 0.0, 100000.0, 0.0 return [], 0.0, 100000.0, 0.0 temp = self.get_ranges(False) if self.merge: temp = self.merge_lines(temp) ranges = self.makepairs(temp) # remove line channels from the spectrum for r in range(len(ranges) - 1, -1, -1): del spec[ranges[r][0]:ranges[r][1] + 1] newspec = np.array(spec) rms = np.sqrt(np.mean(np.square(newspec))) return ranges, rms * self.threshold, rms, 0.0