Source code for admit.at.PVCorr_AT

""" .. _PVCorr-at-api:

   **PVCorr_AT** --- Determines position-velocity correlations in a map.
   ---------------------------------------------------------------------

   Defines the PVCorr_AT class.
"""
import sys, os, logging

from admit.AT import AT
from admit.Summary import SummaryEntry
import admit.util.bdp_types as bt
from admit.util.Image import Image
from admit.util.Table import Table
import admit.util.utils as utils
import admit.util.casautil as casautil
from admit.bdp.CubeStats_BDP import CubeStats_BDP
from admit.bdp.PVCorr_BDP import PVCorr_BDP
from admit.bdp.Image_BDP import Image_BDP
from admit.util import APlot
from admit.util import stats
from admit.util.AdmitLogging import AdmitLogging as logging

import numpy as np
import numpy.ma as ma
try:
    import scipy
    import scipy.signal
except:
    print "WARNING: No scipy; PVCorr task cannot function."
try:
    import casa
except:
    print "WARNING: No CASA; PVCorr task cannot function."

#
# Some discussion/code on https://github.com/keflavich/image_registration
# about sub-pixel registration, although we probably don't need this 
# accuracy in PVCorr.

[docs]class PVCorr_AT(AT): """PV correllation in a PVSlice map. PVCorr_AT computes a cross-correlation of a feature in a PVSlice with the whole PVSlice, looking for repeated patterns to detect spectral lines. Much like the output from CubeStats_AT and CubeSpectrum_AT, this table can then be given to LineID_AT to attempt a line identification. See also :ref:`PVCorr-AT-Design` for the design document. **Keywords** **numsigma**: float Minimum intensity, in terms of sigma, above which a selected portion of the spectrum will be used for cross-correlation. Default: 3.0. **range**: integer list If given, it has to be a list with 2 channel numbers, the first and last channel (0-based channels) of the range which to use for the cross-correlation. Default: []. **nchan**: integer The number of channels (in case range= was not used) that defines the line. The line is centers on the strongest point in the input PV-map. If 0 is given, it will watershed down from the strongest line. Default: 0. **Input BDPs** **PVSlice_BDP**: count: 1 Input PV slice, normally from a `PVSlice_AT <PVSlice_AT.html>`_. **CubeStats_BDP**: count: 1 Input cube statistics from which the RMS is taken, received from a `CubeStats_AT <CubeStats_AT.html>`_. **Output BDPs** **PVCorr_BDP**: count: 1 Output table. """ def __init__(self,**keyval): keys = {"numsigma" : 3.0, # N-sigma "range" : [], # optional channel range "nchan" : 0, # number of channels around the channel where the peak is } AT.__init__(self,keys,keyval) self._version = "1.1.0" self.set_bdp_in([(Image_BDP,1,bt.REQUIRED), # @todo optional 2nd PVSlice can be used to draw the template from (CubeStats_BDP,1,bt.REQUIRED)]) self.set_bdp_out([(PVCorr_BDP,1)])
[docs] def summary(self): """Returns the summary dictionary from the AT, for merging into the ADMIT Summary object. PVCorr_AT adds the following to ADMIT summary: .. table:: :class: borderless +----------+----------+-----------------------------------+ | Key | type | Description | +==========+==========+===================================+ | pvcorr | list | correlation diagram | +----------+----------+-----------------------------------+ Parameters ---------- None Returns ------- dict Dictionary of SummaryEntry """ if hasattr(self,"_summary"): return self._summary else: return {}
[docs] def run(self): dt = utils.Dtime("PVCorr") self._summary = {} numsigma = self.getkey("numsigma") mode = 1 # PV corr mode (1,2,3) normalize = True # normalize = False b1 = self._bdp_in[0] # PVSlice_BDP fin = b1.getimagefile(bt.CASA) # CASA image data = casautil.getdata_raw(self.dir(fin)) # grab the data as a numpy array self.myplot = APlot(ptype=self._plot_type,pmode=self._plot_mode,abspath=self.dir()) #print 'DATA[0,0]:',data[0,0] #print 'pv shape: ',data.shape npos = data.shape[0] nvel = data.shape[1] dt.tag("getdata") b2 = self._bdp_in[1] # CubeStats_BDP sigma = b2.sigma # global sigma in the cube cutoff = numsigma * sigma freq = b2.table.getColumnByName("frequency") chans = self.getkey("range") # range of channels, if used if len(chans) > 0: if len(chans) != 2: logging.fatal("range=%s" % chans) raise Exception,"range= needs two values, left and right (inclusive) channel" ch0 = chans[0] ch1 = chans[1] else: nchan = self.getkey("nchan") imstat0 = casa.imstat(self.dir(fin)) # @todo can use data[] now xmaxpos = imstat0['maxpos'][0] ymaxpos = imstat0['maxpos'][1] logging.info("MAXPOS-VEL %s %g" % (str(imstat0['maxpos']),imstat0['max'][0])) if nchan > 0: # expand around it, later ch0,ch1 will be checked for running off the edge ch0 = ymaxpos - nchan/2 ch1 = ymaxpos + nchan/2 else: # watershed down to find ch0 and ch1 ? # this doesn't work well in crowded areas ch0 = ymaxpos ch1 = ymaxpos spmax = data.max(axis=0) k = spmax.argmax() n = len(spmax) logging.debug('spmax %s %d %g' % (str(spmax.shape),k,spmax[k])) # find lower cutoff for i in range(n): ch0 = ymaxpos - i if ch0<0: break if spmax[ch0] < cutoff: break ch0 = ch0 + 1 # find higher cutoff for i in range(n): ch1 = ymaxpos + i if ch1==n: break if spmax[ch1] < cutoff: break ch1 = ch1 - 1 dt.tag("imstat") bdp_name = self.mkext(fin,"pvc") # output PVCorr_BDP b3 = PVCorr_BDP(bdp_name) self.addoutput(b3) if ch0<0 or ch1>=nvel: # this probably only happens to small cubes (problematic for PVCorr) # or when the strongest line is really close to the edge of the band # (which is probably ok) if ch0<0 and ch1>=nvel: logging.warning("Serious issues with the size of this cube") if ch0<0: logging.warning("Resetting ch0 edge to 0") ch0=0 if ch1>=nvel: ch1=nvel-1 logging.warning("Resetting ch1 edge to the maximum") if ch0 > ch1: logging.warning("Sanity swapping ch0,1 due to likely noisy data") ch0,ch1 = ch1,ch0 if mode == 1: out,rms = mode1(data, ch0, ch1, cutoff, normalize) corr = out elif mode == 2: out,rms = mode2(data, ch0, ch1, cutoff) # slower 2D version corr = out[npos/2,:] # center cut, but could also try feature detection elif mode == 3: out,rms = self.mode3(data, ch0, ch1, cutoff) # Doug's faster 2D version # get the peak of each column corr = np.amax(out,axis=0) # print "PVCORR SHAPE ",corr.shape," mode", mode if len(corr) > 0: # print "SHAPE out:",out.shape,corr.shape,npos/2 ch = range(len(corr)) if len(corr) != len(freq): logging.fatal("ch (%d) and freq (%d) do not have same size" % (len(corr),len(freq))) raise Exception,"ch and freq do not have same dimension" dt.tag("mode") labels = ["channel", "frequency", "pvcorr"] units = ["number", "GHz", "N/A"] data = (ch, freq, corr) table = Table(columns=labels,units=units,data=np.column_stack(data)) else: # still construct a table, but with no rows labels = ["channel", "frequency", "pvcorr"] units = ["number", "GHz", "N/A"] table = Table(columns=labels,units=units) b3.setkey("table",table) b3.setkey("sigma",float(rms)) dt.tag("table") if len(corr) > 0: table.exportTable(self.dir("testPVCorr.tab"),cols=['frequency','pvcorr']) test_single(ch,freq,corr) logging.regression("PVC: %f %f" % (corr.min(),corr.max())) title = 'PVCorr mode=%d [%d,%d] %g' % (mode,ch0,ch1,cutoff) x = ch xlab = 'Channel' y = [corr] ylab = 'PV Correlation' p1 = "%s_%d" % (bdp_name,0) segp = [] segp.append( [0,len(ch),0.0,0.0] ) segp.append( [0,len(ch),3.0*rms, 3.0*rms] ) # @todo: in principle we know with given noise and size of box, what the sigma in pvcorr should be self.myplot.plotter(x,y,title,figname=p1,xlab=xlab,ylab=ylab,segments=segp, thumbnail=True) #out1 = np.rot90 (data.reshape((nvel,npos)) ) if mode > 1: self.myplot.map1(data=out,title="testing PVCorr_AT: mode%d"%mode,figname='testPVCorr', thumbnail=True) taskargs = "numsigma=%.1f range=[%d,%d]" % (numsigma,ch0,ch1) caption = "Position-velocity correlation plot" thumbname = self.myplot.getThumbnail(figno=self.myplot.figno,relative=True) figname = self.myplot.getFigure(figno=self.myplot.figno,relative=True) image = Image(images={bt.PNG: figname}, thumbnail=thumbname, thumbnailtype=bt.PNG, description=caption) b3.image.addimage(image, "pvcorr") self._summary["pvcorr"] = SummaryEntry([figname,thumbname,caption,fin],"PVCorr_AT",self.id(True),taskargs) else: self._summary["pvcorr"] = None logging.warning("No summary") logging.regression("PVC: -1") dt.tag("done") dt.end()
[docs] def mode3(self, data, v0, v1, dmin=0.0): """ v0..v1 (both inclusive) are channel selections threshold on dmin @todo the frequency axis is not properly calibrated here @todo a full 2D is slow, we only need the 1D version """ print "PVCorr mode3: v0,1=",v0,v1 smin = data.min() #s = data[v0:v1+1,:] s = data[:,v0:v1+1] if dmin==0.0: logging.warning("Using all data in crosscorr") f = s else: f = np.where(s>dmin,s,0) # find out where the zeros are temp = np.amax(f,axis=1) nz = np.nonzero(temp) # trim the kernel in the y direction, removing rows that are all 0.0 f = f[nz[0][0]:nz[0][-1],:] f0 = np.where(s>smin,1,0) f1 = np.where(s>dmin,1,0) fmax = f.max() print "PVCorr mode3:",f1.sum(),'/',f0.sum(),'min/max',smin,fmax out = scipy.signal.correlate2d(data,f,mode='same') self.myplot.map1(data=f,title="PVCorr 2D Kernel",figname='PVCorrKernel', thumbnail=True) print 'PVCorr min/max:',out.min(),out.max() n1,m1,s1,n2,m2,s2 = stats.mystats(out.flatten()) print "PVCorr stats", n1,m1,s1,n2,m2,s2 rms_est = s2/np.sqrt(f1.sum()) return out,rms_est
[docs]def test_single(x1,x2,y,cutoff=None): """ test as if there is only one line via a simple moment analysis """ ymax = y.max() if cutoff == None: cutoff = 0.10 * ymax ym = ma.masked_less(y,cutoff) y1 = ym * x1 y2 = ym * x2 logging.info("Average: %d %d %d %d" % (ym.count(), y1.count(), y2.count(), len(y))) logging.info("Average intensity weighted channel: %g %g %g %g" % (y1.sum()/ym.sum(),y2.sum()/ym.sum(),cutoff,ymax))
[docs]def mode1(data,v0,v1, dmin=0.0, normalize=False): """ faster 1D version of mode2 This works by forcing mode='valid' and making the X (position) dimension of the template the same as the PV diagram. We then pad the ends of the PV with zero's. Introduces some complexity how to return the correct shape v0..v1 (both inclusive) are channel selections threshold on dmin @todo the frequency axis may not be properly calibrated here @todo plot the template for debug, like in mode3 """ logging.info("PVCorr mode1: v0,1= %d %d dmin= %g normalize=%s " % (v0,v0,dmin,str(normalize))) smin = data.min() nx = data.shape[0] ny = data.shape[1] nv = v1-v0+1 pad = np.zeros(nx*nv).reshape(nx,nv) # ValueError: negative dimensions are not allowed (EGNoG) pdata = np.concatenate((pad,data,pad),axis=1) s = data[:,v0:v1+1] # the stencil where the line is if dmin==0.0: logging.warning("Using all data in crosscorr") f = s else: f = np.where(s>dmin,s,0.0) if normalize: f = np.where(f != 0.0, 1.0, 0.0) f0 = np.where(s>smin,1,0) f1 = np.where(s>dmin,1,0) if s.max() < dmin: logging.warning("Datamax=%g, no data above dmin=%g; data too noisy?" % (s.max(),dmin)) return np.arange(0),0.0 fmax = f.max() fsum = f.sum() ssum = s.sum() fssum = (f*s).sum() ffsum = (f*f).sum() logging.info("PVCorr mode1: %d/%d min/max %g %g sums: %g %g %g %g" % (f1.sum(),f0.sum(),smin,fmax,fsum,ssum,ffsum,fssum)) if normalize: out = scipy.signal.correlate2d(pdata,f,mode='valid')/fssum else: out = scipy.signal.correlate2d(pdata,f,mode='valid')/ffsum logging.info('PVCorr min/max: %g %g' % (out.min(),out.max())) n1,m1,s1,n2,m2,s2 = stats.mystats(out.flatten()) logging.info("PVCorr stats %d %g %g %d %g %g" % (n1,m1,s1,n2,m2,s2)) rms_est = s2 logging.info("RMS est: %g" % rms_est) # CAVEAT: # cutting it this way will agree with peaks in mode2, # but between ny=odd or even there is a half channel freq offset # alternatively for one of them an interpolation is needed. # the idea is that half channel should not influence LineID. corr = out[0,nv/2+1:nv/2+ny+1] return corr,rms_est
[docs]def mode2(data,v0,v1, dmin=0.0): """ v0..v1 (both inclusive) are channel selections threshold on dmin for odd number of channels, center line in mode2 will be same as mode1 @todo the frequency axis is not properly calibrated here @todo a full 2D is slow, we only need the 1D version """ print "PVCorr mode2: v0,1=",v0,v1,"dmin=",dmin smin = data.min() s = data[:,v0:v1+1] if dmin==0.0: logging.warning("Using all data in crosscorr") f = s else: f = np.where(s>dmin,s,0) print "PVCorr dmin:",dmin f0 = np.where(s>smin,1,0) f1 = np.where(s>dmin,1,0) fmax = f.max() ffsum = (f*f).sum() print "PVCorr mode2:",f1.sum(),'/',f0.sum(),'min/max',smin,fmax out = scipy.signal.correlate2d(data,f,mode='same')/ffsum print 'PVCorr min/max:',out.min(),out.max() n1,m1,s1,n2,m2,s2 = stats.mystats(out.flatten()) print "PVCorr stats", n1,m1,s1,n2,m2,s2 rms_est = s2/np.sqrt(f1.sum()) return out,rms_est