Source code for admit.at.PrincipalComponent_AT

"""**PrincipalComponent_AT** --- Calculates principal components of maps.
   ----------------------------------------------------------------------

   This module defines the PrincipalComponent_AT class.
"""
import sys, os
import numpy as np
import matplotlib.mlab as mlab

import admit
import admit.util.bdp_types as bt

[docs]class PrincipalComponent_AT(admit.Task): """ Calculates the principal components of a set of input maps. Given **N** single-plane (i.e., 2-D) images, principal component analysis solves the **N**-dimensional eigenvalue problem determining the principal directions in **N**-space of the image data, ranked according to their contribution to the total variance of the sample points (each sample point being the **N**-vector of image values corresponding to a single pixel position). This decomposition can be used to reduce the dimensionality of the data by identifying components contributing little independent information to the data set. This task also calculates the covariance matrix for input images. Mathematically, a principal component analysis can be applied to an arbitrary set of images. In practice, the input images should be related in some way; otherwise, the eigenvalues will be of similar magnitude and (as expected) no reduction in dataset dimensionality possible. For astronomical images, the inputs will typically be maps of the same source (at the same pixel scale) in different wavelength bands. In ADMIT, the inputs will often be moment maps produced by `Moment_AT <Moment_AT.html>`_. **Keywords** **clipvals**: list of float List of threshold values below which each input map value will be clipped (set to zero). The length of **clipvals** must match the number of input maps; the default empty list implies all zeroes. **covarmin**: float Minimum pairwise covariance for summary output (only); default: 0.90. **Input BDPs** **Image_BDP**: count: `varies` Input images for principal component analysis; e.g., moment maps produced by `Moment_AT <Moment_AT.html>`_ or `CubeSum_AT <CubeSum_AT.html>`_. **Output BDPs** **Table_BDP**: count: 3 First table contains the input mean and standard deviation plus output eigenimage filename and variance fractions. Second table is the calculated projection matrix transforming (mean-subtracted and variance-normalized) input data to principal component space. Third table is the covariance matrix of the input images. **Image_BDP**: count: `varies` (equal to input count) Calculated principal component images (in order of eigenvalue size). Parameters ---------- keyval : dict, optional Dictionary of keyword:value pairs. Attributes ---------- _version : str Version ID string. Notes ----- All input maps must have the same dimensions. Currently only 2-D maps are supported. """ def __init__(self, **keyval): keys = {"clipvals" : [], "covarmin" : 0.90} admit.Task.__init__(self, keys, keyval) self._version = "1.0.1" self.set_bdp_in ([(admit.Image_BDP, 0, bt.REQUIRED)]) self.set_bdp_out([(admit.Table_BDP, 3), (admit.Image_BDP, 0)])
[docs] def summary(self): """ Summary data dictionary. PrincipalComponent_AT adds the following to ADMIT summary: .. table:: :class: borderless +----------+----------+-----------------------------------+ | Key | type | Description | +==========+==========+===================================+ | pca | table | Statistics and covariance data. | +----------+----------+-----------------------------------+ Parameters ---------- None Returns ------- dict Dictionary of SummaryEntry """ if hasattr(self,"_summary"): return self._summary else: return {}
[docs] def userdata(self): """ User data dictionary. Parameters ---------- None Returns ------- dict The user data dictionary from the AT, for merging into the ADMIT user data object. """ if hasattr(self,"_userdata"): return self._userdata else: return {}
[docs] def run(self): """ Task run method. Computes the principal component decomposition of the input images and populates the output eigenimages and projection matrix. Parameters ---------- None Returns ------- None """ self._summary = {} # BDP output uses the alias name if provided, else a flow-unique one. stem = self._alias if not stem: stem = "pca%d" % (self.id()) inum = 0 data = [] icols = [] for ibdp in self._bdp_in: # Convert input CASA images to numpy arrays. istem = ibdp.getimagefile(bt.CASA) ifile = ibdp.baseDir() + istem icols.append(os.path.splitext(istem)[0]) if os.path.dirname(icols[-1]): icols[-1] = os.path.dirname(icols[-1]) # Typical line cube case. img = admit.casautil.getdata(ifile, zeromask=True).data data.append(img) admit.logging.info("%s shape=%s min=%g max=%g" % (icols[-1], str(img.shape), np.amin(img), np.amax(img))) assert len(data[0].shape) == 2, "Only 2-D input images supported" assert data[0].shape == data[inum].shape, "Input shapes must match" inum += 1 # At least two inputs required for meaningful PCA! assert inum >= 2, "At least two input images required" # Each 2-D input image is a plane in a single multi-color image. # Each color multiplet (one per pixel) is an observation. # For PCA we collate the input images into a vector of observations. shape = data[0].shape npix = shape[0] * shape[1] clip = self.getkey('clipvals') if not clip: clip = [0 for i in range(inum)] assert len(clip) >= inum, "Too few clipvals provided" # Clip input values and stack into a vector of observations. pca_data = [] for i in range(inum): nd = data[i] nd[nd < clip[i]] = 0.0 pca_data.append(np.reshape(nd, (npix,1))) pca_in = np.hstack(pca_data) pca = mlab.PCA(pca_in) # Input statistics and output variance fractions. #print "fracs:", pca.fracs #print "mean:", pca.mu #print "sdev:", pca.sigma obdp = admit.Table_BDP(stem+"_stats") obdp.table.setData(np.vstack([pca.mu, pca.sigma,pca.fracs]).T) obdp.table.columns = ["Input mean", "Input deviation", "Eigenimage variance fraction"] obdp.table.description = "PCA Image Statistics" self.addoutput(obdp) # Pre-format columns for summary output. # This is required when mixing strings and numbers in a table. # (NumPy will output the array as all strings.) table1 = admit.Table() table1.setData(np.vstack([[i for i in range(inum)], icols, ["%.3e" % x for x in pca.mu], ["%.3e" % x for x in pca.sigma], ["%s_eigen/%d.im" % (stem, i) for i in range(inum)], ["%.4f" % x for x in pca.fracs]]).T) table1.columns = ["Index", "Input", "Input mean", "Input deviation", "Eigenimage", "Eigenimage variance fraction"] table1.description = "PCA Image Statistics" # Projection matrix (eigenvectors). #print "projection:", pca.Wt obdp = admit.Table_BDP(stem + "_proj") obdp.table.setData(pca.Wt) obdp.table.columns = icols obdp.table.description = \ "PCA Projection Matrix (normalized input to output)" self.addoutput(obdp) # Covariance matrix. covar = np.cov(pca.a, rowvar=0, bias=1) #print "covariance:", covar obdp = admit.Table_BDP(stem + "_covar") obdp.table.setData(covar) obdp.table.columns = icols obdp.table.description = "PCA Covariance Matrix" self.addoutput(obdp) # Collate projected observations into eigenimages and save output. os.mkdir(self.baseDir()+stem+"_eigen") pca_out = np.hsplit(pca.Y, inum) odata = [] for i in range(inum): ofile = "%s_eigen/%d" % (stem, i) img = np.reshape(pca_out[i], shape) odata.append(img) #print ofile, "shape, min, max:", img.shape, np.amin(img), np.amax(img) aplot = admit.util.APlot(figno=inum, abspath=self.baseDir(), ptype=admit.util.PlotControl.PNG) aplot.map1(np.rot90(img), title=ofile, figname=ofile) aplot.final() # Currently the output eigenimages are stored as PNG files only. admit.casautil.putdata_raw(self.baseDir()+ofile+".im", img, ifile) oimg = admit.Image() oimg.addimage(admit.imagedescriptor(ofile+".im", format=bt.CASA)) oimg.addimage(admit.imagedescriptor(ofile+".png", format=bt.PNG)) obdp = admit.Image_BDP(ofile) obdp.addimage(oimg) self.addoutput(obdp) # As a cross-check, reconstruct input images and compute differences. for k in range(inum): ximg = pca.Wt[0][k]*odata[0] for l in range(1,inum): ximg += pca.Wt[l][k]*odata[l] ximg = pca.mu[k] + pca.sigma[k]*ximg admit.logging.regression("PCA: %s residual: " % icols[k] + str(np.linalg.norm(ximg - data[k]))) # Collect large covariance values for summary. cvmin = self.getkey('covarmin') cvsum = [] cvmax = 0.0 for i in range(inum): for j in range(i+1, inum): if abs(covar[i][j]) >= cvmax: cvmax = abs(covar[i][j]) if abs(covar[i][j]) >= cvmin: cvsum.append([icols[i], icols[j], "%.4f" % (covar[i][j])]) admit.logging.regression("PCA: Covariances > %.4f: %s (max: %.4f)" % (cvmin,str(cvsum),cvmax)) table2 = admit.Table() table2.columns = ["Input1", "Input2", "Covariance"] table2.setData(cvsum) table2.description = "PCA High Covariance Summary" keys = "covarmin=%.4f clipvals=%s" % (cvmin, str(clip)) self._summary["pca"] = admit.SummaryEntry([table1.serialize(), table2.serialize() ], "PrincipalComponent_AT", self.id(True), keys)