""" .. _AT-base-api:
**Task** --- ADMIT task (AT) base.
----------------------------------
This module defines the ADMIT task (AT) base class.
"""
# system level imports
import os.path
import copy
import xml.etree.cElementTree as et
import errno
# ADMIT imports
from admit.bdp.BDP import BDP
from admit.xmlio.DtdReader import DtdReader
import admit.util.bdp_types as bt
import admit.xmlio.XmlWriter as XmlWriter
import admit.util.PlotControl as PlotControl
from admit.util.AdmitLogging import AdmitLogging as logging
[docs]class AT(object):
""" Base class for Admit Task (AT) objects.
The ADMIT task base class implements generic interface features common
to concrete ATs. Tasks are the basic execution unit in ADMIT and are
responsible for operating on `BDPs`_ to perform calculations or
otherwise transform them into new data products. Each task includes a
unique set of keywords controlling its behavior.
Parameters
----------
keys : dictionary of str keywords with default values, optional.
Task keyword settings; default keyword values vary by task.
Currently the following keywords, defined by the base class, are
activated for each task:
**alias** : str
By default ATs will use an alias by appending the file basename
they normally use with "-alias". For example, if an AT would
normally transform "foo.im" to "foo.pv", with "alias=test" you
would get "foo-test.pv". The notable exception is Ingest_AT,
where the alias replaces the (potentially long) cube file
basename, i.e., "verylongsourcename.fits" becomes "alias.im".
Must be unique for each concrete type of AT (will be appended with
a unique suffix otherwise). Alias is a system keyword,
inaccessible (via getkey()) after the task is instantiated.
**taskid** : int
Forces a pre-determined task ID number when instantiating the task.
Normally left defaulted (-1, meaning auto-select), but useful in
special circumstances. This is a system keyword, inaccessible (via
getkey()) after the task is instantiated.
**variflow** : bool
Whether the flow emanating from an AT with variadic outputs should
be dynamically adjusted according to the actual number of outputs.
Normally left defaulted (`True`), but may be set to `False` if this
will be managed separately or if the number of outputs is
pre-determined for a particular instance. This is a system keyword,
inaccessible (via getkey()) after the task is instantiated.
keyvals : dictionary of new values for the keywords, optional.
These keyword value pairs can be fed in form the command line
instantiation of the class.
Attributes
----------
_alias : str
Task alias name. Appears in various filenames, HTML output, flow
diagrams, etc.
_taskid : int
Task ID number (defaults to ``None``, normally set by FlowManager).
_baseDir : str
Project base directory (None if unknown).
_bdp_in : list of BDPs_
BDP input cache.
_bdp_in_map : list of ints
BDP IDs corresponding to those listed in _bdp_in.
_bdp_out : list of BDPs_
BDP output cache.
_bdp_out_map : list of ints
BDP IDs corresponding to those listed in _bdp_out.
_keys : dict
Task keyword dictionary.
_link : int
Link count (for multiflows).
_type : str
Concrete task type.
_stale : boolean
Whether the AT needs to be run (e.g., a keyword changed).
_enabled : boolean
Whether the AT is enabled; if set to False, the flow manager
will not execute the local run method.
_running : boolean
Whether the AT is currently running (within the execute() method).
Useful for crash recovery diagnostics.
_plot_mode : int
Plot mode, one of util.PlotControl plot mode (e.g., PlotControl.INTERACTIVE). Default: PlotControl.NOPLOT
_plot_type : int
Plotting format, one of util.PlotControl plot type (e.g., PlotControl.PNG). Default: PlotControl.NONE
_valid_bdp_in : List
List of tuples indicating what types and how many BDPs_ are
expected as input.
_valid_bdp_out : List
List of tuples indicating what types and how many BDPs_ are
expected as output.
_needToSave : boolean
Whether this AT and underlying BDPs need to be saved to disk.
_variflow : bool
Whether sub-flows attached to the instance should be automatically
cloned in the case of variadic output.
See Also
--------
isAutoAlias
Notes
-----
As a general rule the attributes of an AT should not be directly
manipulated by the code in an AT---all are taken care of internally
or accessed via methods (e.g., setkey).
.. _BDPs: ../admit.bdp/BDP.html
"""
def __init__(self, keys={}, keyvals={}):
# (System) keywords and default values defined for all ATs.
self._keys = {"alias": '', "taskid": -1, "variflow": True}
self._stale = True # Does this AT need to be run
self._enabled = True # is this AT able to be run
self._running = False # is this AT now running
self._plot_mode = PlotControl.NOPLOT # no plotting
self._plot_type = PlotControl.NONE # no plotting
self._type = self.__class__.__name__ # what type of AT is this
self._bdp_out = [] # BDP's that are produced by this AT
self._bdp_out_map = [] # list of uid's of _bdp_out
self._bdp_out_zero = [] # list of optional types
self._bdp_in = [] # BDP's that are used as inputs for this AT
self._bdp_in_map = [] # list of uid's of _bdp_in
self._valid_bdp_in = [] # the listing of valid input BDP types
self._valid_bdp_out = [] # the listing of valid output BDP types
self._link = 0 # link counter
self._loglevel = logging.getEffectiveLevel()
self._loggername = ""
# Set the initial values for the keywords.
self._keys.update(keys)
# Task alias.
if keyvals.has_key('alias'):
self._keys['alias'] = keyvals.pop('alias')
self._alias = self._keys.pop('alias')
# Task ID.
if keyvals.has_key('taskid'):
self._keys['taskid'] = keyvals.pop('taskid')
self._taskid = self._keys.pop('taskid')
# Variflow support.
if keyvals.has_key('variflow'):
self._keys['variflow'] = keyvals.pop('variflow')
self._variflow = self._keys.pop('variflow')
# Set any values for the keywords that were given at instantiation.
self.setkey(keyvals)
self._version = "0.0.0" # the version data
self._needToSave = False # do we need to save this AT to disk
# it's ok to leave this off (since we don't need to write it to the XML files)
# but upon reading the parser doesn't grab it from Admit, so that needs to be fixed
self._baseDir = "NONE" # addtask() will set this
def __len__(self):
"""Return the current number of registered BDP_OUTs.
Parameters
----------
None
Returns
-------
int
Length of the _bdp_out list.
"""
return len(self._bdp_out)
def __contains__(self, index):
"""BDP outputs membership operator.
Parameters
----------
index : int
BDP output index.
Returns
-------
bool
Membership result.
"""
return self._bdp_out.__contains__(index)
def __iter__(self):
"""BDP outputs iterator.
Returns
-------
iterator
BDP iterator.
"""
return iter(self._bdp_out)
def __getitem__(self, index):
"""Return an indexed _bdp_OUT: AT._bdp_out[index].
Parameters
----------
index : int
Index of the BDP to return.
Returns
-------
BDP
The BDP located at index.
"""
if index >= len(self._bdp_out):
msg = "AT::%d has bdp len %d, %d %d" % (self._taskid, len(self._bdp_in), len(self._bdp_out), index)
raise Exception(msg)
return self._bdp_out[index]
def __setitem__(self, index, bdp):
"""Sets (replaces) BDP output.
Parameters
----------
index : int
BDP output index.
bdp : BDP
BDP reference.
Returns
-------
None
"""
self._bdp_out[index] = bdp
def __str__(self):
print bt.format.BOLD + bt.color.GREEN + "\nAT :" + bt.format.END + bt.format.BOLD + self._type + bt.format.END
for i, j in self.__dict__.iteritems():
if isinstance(j, BDP):
print str(j)
continue
print bt.format.BOLD + i + ": " + bt.format.END + str(j)
for bdp in self._bdp_out:
print bdp
return "\n"
[docs] def isAutoAlias(self, withEmpty=True, compat=None):
"""
Whether the task alias appears to be auto-generated.
The form of auto-generated aliases is a string ending in '@' followed
by a decimal integer. Users should avoid defining aliases which look
like ADMIT-generated aliases as they may be reset automatically, but
are not actively prevented from doing so.
Parameters
----------
withEmpty : bool, optional
Whether an empty alias is considered auto-generated.
compat : str, optional
An alias name to test for compatibility. If not ``None``, the alias
stem (with its '@N' removed) must match the stem for `compat` for
this method to return ``True``.
Returns
-------
bool
Whether the task alias was auto-generated (or, optionally, empty)
and compatible with `compat` (if specified).
Notes
-----
It does not affect the result whether `compat` itself is an
auto-generated alias; e.g., 'foo' and 'foo@1' are equivalent arguments.
"""
if compat:
at = compat.rfind('@')
if at != -1 and compat[at+1:].isdigit(): compat = compat[:at]
alias = self._alias
if withEmpty and not alias and not compat: return True
at = alias.rfind('@')
return at != -1 and alias[at+1:].isdigit() and \
(compat is None or alias[:at] == compat)
[docs] def setloggername(self, name):
""" Method to set the name of the logger for this AT instance
Parameters
----------
name : str
The name of the logger for this AT
Returns
-------
None
"""
self._loggername = name
[docs] def getloggername(self):
""" Method to get the name of the logger for this AT instance
Parameters
----------
None
Returns
-------
String containing the name of the logger.
"""
return self._loggername
[docs] def setlogginglevel(self, level):
""" Method to set the logging level
Parameters
----------
level : int
The logging level to use
Returns
-------
None
"""
if not isinstance(level, int):
logging.error("Only integers can be given for log levels")
return
self._loglevel = level
[docs] def get(self, attrib):
""" Method to get the given attributes value
Parameters
----------
attrib : str
The name of the attribute to get
Returns
-------
varies, depends on the type of the attribute
The value of the attribute, or None is it does not exist
"""
return getattr(self, attrib, None)
[docs] def set(self, item, val):
""" Method to set protected attributes, rather than direct access
Parameters
----------
item : str
The name of the variable to set
val : varies
The value to set the attribute to
Returns
-------
None
"""
if not hasattr(self, item):
raise Exception("%s is not a valid key for %s." % (item, self.get("_type")))
if type(getattr(self, item, None)) != type(val):
raise Exception("You cannot change the data type of an AT keyword. Type for %s is %s" % (item, str(type(getattr(self, item)))))
setattr(self, item, val)
[docs] def getlogginglevel(self):
""" Method to get the current logging level of the AT
Parameters
----------
None
Returns
-------
int, the current logging level
"""
return self._loglevel
[docs] def geteffectivelevel(self):
""" Method to get the effective logging level of the logging subsystem
Parameters
----------
None
Returns
-------
int, the effective logging level
"""
return logging.getEffectiveLevel()
[docs] def seteffectivelevel(self, level):
""" Method to set the effective logging level of the logging subsystem
Parameters
----------
level : int
The logging level to use
Returns
-------
None
"""
logging.setLevel(level)
[docs] def len2(self):
"""Returns the length of _bdp_in and _bdp_out in a tuple.
If you just want the _bdp_out, the intrinsic len(my_at)
Thus my_at.len2()[1] is the same as len(my_at).
Returns
-------
Tuple
Tuple contains 2 values, the length of _bdp_in and _bdp_out
"""
return (len(self._bdp_in), len(self._bdp_out))
[docs] def baseDir(self, path=None):
""" Get/set project base directory.
Unless empty, the base directory is guaranteed to end in os.sep.
Parameters
----------
path : str, optional
New project base directory (ignored if None).
Returns
-------
Updated project base directory.
"""
if path is not None:
if path and path[-1] != os.sep:
path += os.sep
self._baseDir = path
return self._baseDir
[docs] def reset(self, a):
""" Performs an *in-place* shallow copy.
Parameters
----------
a : ADMIT Task
Task reference to copy from.
Returns
-------
None
"""
self.__dict__ = a.__dict__.copy()
[docs] def enabled(self, state=None):
"""Returns current task enabled setting, with optional reset.
Parameters
----------
state : bool, optional
New value of enabled flag (default `None` keeps current value).
Returns
-------
bool
Whether task is enabled (prior to applying `state`, if provided).
"""
enabled = self._enabled
if state is not None:
self._enabled = True if state else False
return enabled
[docs] def running(self, state=None):
"""Returns current task execution flag, with optional reset.
Parameters
----------
state : bool, optional
New value of enabled flag (default `None` keeps current value).
Returns
-------
bool
Whether task is currently being executed (prior to applying
`state`, if provided).
"""
running = self._running
if state is not None:
self._running = True if state else False
return running
[docs] def markUpToDate(self):
# why can't we call this setStale(False) so that we can grep a little easier
"""Resets _stale to indicate that the AT does not need to be run.
Parameters
----------
None
Returns
-------
None
"""
self._stale = False
for i,bdp in zip(range(len(self._bdp_out)),self._bdp_out):
if bdp is not None:
logging.info("BDP_OUT[%d] = %s %s" % (i,str(bdp._type),bdp.xmlFile))
else:
# probably never should happen?
logging.info("BDP_OUT[%d] not connected" % i)
[docs] def markChanged(self):
# why can't we call this setStale(True) so that we can grep a little easier
"""Mark an AT that it's state was changed, so it would need to be
rerun. The FlowManager will take care of any dependants that
will need to be marked as well.
Use "markUpToDate" if you want the opposite function.
Parameters
----------
None
Returns
-------
None
"""
self._stale = True
[docs] def isstale(self):
"""Returns whether the AT is out of date.
Parameters
----------
None
Returns
-------
boolean
True if the AT is out of date, else False.
"""
return self._stale
[docs] def id(self, strip=False):
"""Returns task ID number.
Parameters
----------
strip : bool, optional
Whether to strip project ID (if any) from returned value
(default: False).
Returns
-------
int
Task ID number.
Notes
-----
Raises an exception if no ID assigned (freshly constructed tasks do
not have an ID, it is set when the task is added to a project).
"""
if self._taskid < 0:
raise Exception("Task %s - '%s' has no task ID; "
"it must be added to a project first." %
(self._type, str(self._alias)))
return self._taskid if not strip else self._taskid & 0xffffffff
[docs] def link(self):
"""Increments the task link count.
Returns
-------
None
"""
self._link += 1
[docs] def unlink(self):
"""Decrements the task link count.
Returns
-------
None
"""
if self._link != 0:
self._link -= 1
[docs] def setProject(self, pid):
"""
Adds a project ID to task ID.
Multiflows involve adding tasks belonging to other projects into a
single flow, the multiflow. To prevent task ID collisions, tasks
in multiflows embed a project number into the task ID.
This method performs the actual embedding operation. The project ID
is inserted beginning at bit 32 and replaces any previous project ID.
Since Python integers are automatically multi-precision there is never
danger of truncation of the project ID.
Parameters
----------
pid : int
Project ID number.
Returns
-------
None
See Also
--------
getProject
"""
self._taskid = (pid << 32) + (self._taskid & 0xffffffff)
[docs] def getProject(self):
"""
Retrieves project ID associated with the task.
Multiflows involve adding tasks belonging to other projects into a
single flow, the multiflow. To prevent task ID collisions, tasks
in multiflows embed a project number into the task ID.
This method returns the project ID attached to the task. This will be
zero unless setProject() has been called---normally only by the
ProjectManager when a project is added for use in a multiflow.
Returns
-------
int
Task project ID.
See Also
--------
setProject
Notes
-----
Project IDs are local to each multiflow. Hence the same task, linked in
two independent multiflows, may have a different project ID in each.
This has no effect on the task in its parent flow (for which this
method will return zero).
"""
return 0 if self._taskid < 0 else self._taskid >> 32
[docs] def show(self):
"""Return the AT type.
Parameters
----------
None
Returns
-------
string
The type of the AT.
"""
return self._type
[docs] def dir(self, filename=None):
"""Absolute directory reference of the ADMIT project.
Returns the absolute directory name of this ADMIT project
or a derefenced filename address. Normally you will need
such an absolute file or directory name when non-ADMIT tools
are needed for I/O. Also note that this project
directory is guarenteed to end with the operating dependent
directory separator (usually '/'). This way filenames can be constructed
as follows: **Admit.dir() + 'foobar.dat'** or more conveniently
**Admit.dir('foobar.dat')**
Parameters
----------
filename : string, optional
Filename to be appended to the absolute directory name.
This can include additional subdirectories.
Returns
-------
string
The directory name within which all ADMIT files reside.
"""
if self._baseDir == "NONE":
raise Exception("AT._baseDir was not initialized")
if filename is None:
return self._baseDir
# note that self._baseDir is guaranteed to end in os.sep (unless "")
return self._baseDir + filename
[docs] def mkext(self, filename, ext, alias=""):
"""Return a new filename with a new extension with optional ADMIT alias.
This will either append an extension to a (file)name
without an extension, or replace one. If an alias was set by
this task, the alias will replace the alias found in
the basename dash.
You can also provide a local alias, instead of using the task
alias.
Examples:
("x" , "z" ) -> "x.z"
("x.y", "z" ) -> "x.z"
("x.y", "z", "a") -> "x-a.z"
"""
# caveat: does not work if the directory part has a . and the file does not
# e.g. "a.dir/b/c" would give "a.ext" and that's very wrong
loc = filename.rfind('.')
if loc < 0:
base = filename
else:
base = filename[:loc]
# done
if not alias:
alias = self._alias
if not alias:
return "%s.%s" % (base, ext)
else:
# we have a new alias ; first check if base had an old alias
loc1 = base.rfind('.')
loc2 = base.rfind('-')
if loc1 < 0:
# no dot
if loc2 < 0:
# no old alias
return "%s-%s.%s" % (base, alias, ext)
else:
# replace alias
return "%s-%s.%s" % (base[:loc2], alias, ext)
else:
# dot
if loc2 < loc1:
# dash was too early, no replace needed
return "%s-%s.%s" % (base, alias, ext)
else:
# replace alias
return "%s-%s.%s" % (base[:loc2], alias, ext)
# should never come here
raise Exception,"mkext: no code path here"
[docs] def mkdir(self, dirname):
"""Make a directory in the ADMIT hierarchy.
Checks whether the directory already exists.
It also allows an absolute path, in the classical UNIX sense, but
this is normally not needed.
Parameters
----------
dirname : str
Directory name.
Notes
-----
.. todo:: this routine is identical to the one in Admit()
"""
if dirname[0] == os.sep:
# it's already an absolute path
dname = dirname
else:
# make it relative to the admit
dname = os.path.abspath(self._baseDir + dirname)
if not os.path.exists(dname):
try:
os.makedirs(dname)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(dname):
pass
else: raise
# print "AT.mkdir: ", dname
[docs] def summary(self):
"""Returns the summary dictionary from the AT, for merging
into the ADMIT Summary object. Derived classes should
override this if they provide summary data.
"""
# print "***********AT BASE CLASS summary()***********"
return {}
[docs] def userdata(self):
"""Returns the user dictionary from the AT, for merging
into the ADMIT userdata object. Derived classes should
override this if they provide user data.
"""
return {}
[docs] def freeAlias(self, aliases, alias = None):
"""
Deletes alias reservation, if present.
Parameters
----------
aliases : 2-tuple of dict of str to set of str
Reserved alias registry, keyed by alias base name (index 0) or AT
type (index 1). For index 0, the value is the set of aliases
created with that base name; for index 1, it is the set of default
aliases created for ATs of that type. In both cases, the duplicate
aliases are appended with '@N' for some integer N.
alias : str, optional
Alias name to release; default ``None`` means use the current AT's
alias.
Returns
-------
None
"""
if alias is None: alias = self._alias
if alias is not None:
# Strip auto-generated suffix (if any) from name.
at = alias.rfind('@')
stem = alias[:at] if at != -1 and alias[at+1:].isdigit() else alias
index, key = (0, stem) if stem != '' else (1, self._type)
if key in aliases[index]:
aliases[index][key].discard(alias)
if not aliases[index][key]:
del aliases[index][key]
[docs] def setAlias(self, aliases, alias=None, auto=False):
"""
Sets and registers the task alias, guaranteed unique among
registered aliases.
Checks whether the input `alias` is already in use. If so, raises
an exception unless the name appears auto-generated (or is empty), in
which case the aliases is updated to ensure uniqueness. The latter are
names appended with a string of the form '@N', where N is some decimal
integer value. Resets the task alias name to the final result.
Normally users should *not* call this function directly; user-defined
aliases should be set via the AT constructor argument instead.
Parameters
----------
aliases : 2-tuple of dict of str to set of str
Reserved alias registry, keyed by alias base name (index 0) or AT
type (index 1). For index 0, the value is the set of aliases
created with that base name; for index 1, it is the set of default
aliases created for ATs of that type. In both cases, the duplicate
aliases are appended with '@N' for some integer N.
alias : str, optional
The (suggested) task alias name; default ``None`` uses
the current `_alias` value.
auto : bool, optional
Whether to automatically append a unique '@N' suffix if the alias
is already in use; otherwise, an exception will be raised (unless
the alias already ends in '@N', which implies `auto` = ``True``).
Returns
-------
str
The assigned alias name.
Notes
-----
Performs an iterative search for the first available name following the
(estimated) final assigned alias for the AT of the given type.
Users may specify an alias of the form 'foo@0' to indicate that a
sequence of aliases 'foo@0', 'foo@1', ... be auto-generated as
necessary. This can be useful in scripts where tasks are generated in
loops and a predictable series of aliases is desired.
"""
if alias is None:
# In principle None could be treated separately from '', but isn't.
alias = '' if self._alias is None else self._alias
# Strip auto-generated suffix (if any) from name.
at = alias.rfind('@')
if at != -1 and alias[at+1:].isdigit():
stem = alias[:at]
count0 = int(alias[at+1:])
else:
stem = alias
count0 = -1
# Update the appropriate alias set.
index, key = (0, stem) if stem != '' else (1, self._type)
if key in aliases[index]:
if alias and alias == stem and not auto:
raise Exception("Explicit alias '%s' is already in use, "
"must be unique." % alias)
else:
# Find unused alias.
used = aliases[index][key]
count = len(used)
while alias in used:
alias = stem+'@'+str(count)
count += 1
used.add(alias)
else:
if not count0: alias = stem+'@0'
aliases[index][key] = set([alias])
self._alias = alias
return alias
[docs] def setkey(self, name="", value="", isinit=False):
"""
Set keyword value.
Two styles are possible:
1. name = {key:val} e.g. **setkey({"a":1})**
2. name = "key", value = val e.g. **setkey("a", 1)**
This method checks the type of the keyword value, as it must
remain the same. Also new keywords cannot be added.
Parameters
----------
name : dictionary or string, optional
Dictionary of keyword value pairs, or a string containing
a single key name; defaults to empty string.
value : any, optional
The (string) keyword value; defaults to empty string.
isinit : bool
Whether keyword is being set for the first time.
Returns
-------
None
"""
change = False
# if we are given a dictionary then go through each entry
if isinstance(name, dict):
# check that the keys are valid first and that the data type is not changing
for key in name:
if not self.haskey(key):
raise Exception("%s is not a valid key for %s." % (key, self._type))
if type(self._keys[key]) != type(name[key]):
if isinstance(self._keys[key], float) and isinstance(name[key], int):
name[key] = float(name[key])
else:
raise Exception("You cannot change the data type of an AT keyword. Type for %s is %s (expected %s)" % (key, str(type(name[key])), str(type(self._keys[key]))))
if name[key] != self.getkey(key):
self._keys[key] = name[key]
change = True
# if we were given a string
elif not name == "":
#2
# check that the key is valid and not changing data type
if not self.haskey(name):
print "valid keys: ", self._keys
raise Exception("%s is not a valid key for %s." % (name, self._type))
if type(self._keys[name]) != type(value):
if isinstance(self._keys[name], float) and isinstance(value, int):
value = float(value)
else:
raise Exception("You cannot change the data type of an AT keyword. Type for %s is %s (expected %s)" % (name, str(type(value)), str(type(self._keys[name]))))
if value != self.getkey(name):
self._keys[name] = value
change = True
else:
raise Exception("Invalid name parameter given, it must be a string or a dictionary of keys:values.")
if change and not isinit:
if isinstance(name,str):
logging.info("Setting '%s' = %s for %s" %
(name, str(value), self._type))
else:
logging.info("Setting %s for %s" % (str(name), self._type))
self.markChanged()
[docs] def getkey(self, key):
"""Retrieval value for a key. If the key had not been set yet,
a default value can be returned, else None is returned.
Notice no type checking has been done here, the caller is
responsible for this.
Parameters
----------
key : string
The key whose value is to be returned.
Returns
-------
The value of the key.
See also
--------
haskey
"""
if self.haskey(key):
return self._keys[key]
raise Exception("Key %s does not exist in this AT (%s)" % (key, self._type))
[docs] def haskey(self, key):
"""Query if a key exists for an AT.
Parameters
----------
key : string
The key to check for existence.
Returns
-------
Boolean
True if the key exists, False otherwise.
"""
return key in self._keys
[docs] def checktype(self, item):
"""Check the type of an object to see if it is a BDP.
Parameters
----------
item : Any
The item to check.
Returns
-------
Boolean
True if the item is a BDP, False otherwise.
"""
if isinstance(item, BDP):
return
raise Exception("Item is not a BDP")
[docs] def addoutput(self, item, slot=-1):
""" Add a BDP output product to an AT. The BDP type will be validated
before it is added to the output list.
Parameters
----------
item : BDP
The BDP to be added to the output list.
slot : int, optional
The slot in the BDP to add it to. The BDP type must match the expected type for the slot.
Default: -1 (i.e. insert into the first available slot)
Returns
-------
None
"""
# Set base directory.
item.baseDir(self.baseDir())
# check that it is a valid bdp output type for this AT
self.checktype(item)
found = False
for bdp in self._valid_bdp_out:
if isinstance(item, bdp[0]):
found = True
break
if not found:
raise Exception("Given BDP is not of a type specified in _valid_bdp_out.")
found = False
# If a slot was specified
if slot > -1:
# check if the slot is in the list
if slot >= len(self._bdp_out):
raise Exception("Specified slot does not exist")
# if the slot is an optional BDP then check the type and replace
if slot >= len(self._bdp_out_order_list):
if isinstance(item, self._bdp_out[slot]):
self.addoutputbdp(item, slot)
return
else:
raise Exception("You annot replace one BDP type with another in _bdp_out.")
# if it is a required output type check the type and replace
else:
if isinstance(item, self._bdp_out_order_list[slot]):
self.addoutputbdp(item, slot)
return
raise Exception("Specified slot is not of the expected BDP type.")
# if no slot was specified then find where it belongs
for i in range(len(self._bdp_out_order_list)):
# if the slot is empty then insert the BDP
if isinstance(item, self._bdp_out_order_list[i]):
found = True
if self._bdp_out[i] is None:
self.addoutputbdp(item, i)
return
mark = -1
# if it is an optional one find where it belongs
for i in range(len(self._bdp_out_zero)):
if isinstance(item, self._bdp_out_zero[i]):
mark = i
if mark == -1:
raise Exception("No available slots for given BDP")
# if it is the first one then just add it to the end
if len(self._bdp_out) == self._bdp_out_length:
self.addoutputbdp(item)
# otherwise find its appropriate slot
else:
slot = len(self._bdp_out_order_list)+1
for i in range(len(self._bdp_out_order_list), len(self._bdp_out)):
for j in range(mark+1):
if isinstance(self._bdp_out[i], self._bdp_out_zero[j]):
slot = i + 1
if slot >= len(self._bdp_out):
self.addoutputbdp(item)
else:
self.addoutputbdp(item, slot, True)
# @todo isn't this normally called via FM.add() ?
[docs] def addoutputbdp(self, item, slot=-1, insert=False):
""" Add a BDP to the _bdp_out list.
Parameters
----------
item : BDP
The BDP to add.
slot : int, optional
Where the BDP should be added to the list;
default : -1
insert : Boolean, optional
Whether to insert into the middle of the list (not replace);
defaults to False.
Returns
-------
None
Notes
-----
This method should not be called directly, use addoutput instead.
"""
item.setkey("_taskid", self.id(True))
if slot < 0:
self._bdp_out.append(item)
self._bdp_out_map.append(item.get("_uid"))
else:
if insert:
self._bdp_out.insert(slot, item)
self._bdp_out_map.insert(slot, item.get("_uid"))
else:
self._bdp_out[slot] = item
self._bdp_out_map[slot] = item.get("_uid")
if len(self._bdp_out) != len(self._bdp_out_map):
raise Exception("Mismatch in _bdp_out size.")
[docs] def deloutput(self, slot):
""" Delete a specific BDP in the _bdp_out list.
Parameters
----------
slot : int
The slot to remove the BDP from
Returns
-------
None
"""
if slot < len(self._bdp_out_order_list):
self._bdp_out[slot].delete()
self._bdp_out[slot] = None
self._bdp_out_map[slot] = -1
else:
self._bdp_out[slot].delete()
del self._bdp_out[slot]
del self._bdp_out_map[slot]
[docs] def clearoutput(self, delete=True):
""" Clear the output BDP list.
Parameters
----------
delete : bool
If True then delete the output bdps.
Default: True
Returns
-------
None
"""
if delete:
for bdp in self._bdp_out:
if bdp is not None:
bdp.delete()
del bdp
self._bdp_out = [None] * self._bdp_out_length
self._bdp_out_map = [-1] * self._bdp_out_length
[docs] def set_bdp_out(self, bout=[]):
""" Validate the _valid_bdp_out list and digest it into the appropriate
attributes.
Parameters
----------
bout : list
List containing tuples of the valid BDP outputs and their
counts; defaults to empty list.
Returns
-------
None
"""
self._bdp_out_order_list = [] # ordered list of input types
self._bdp_out_zero = [] # reset list of optional types
self._bdp_out = [] # reset the _bdp_out list
self._bdp_out_map = [] # reset the mapping
# error check the input
if not isinstance(bout, list):
raise Exception("_valid_bdp_out is not in the proper form. See the dcoumentation for details.")
haveopt = False
# evaluate each entry
for b in bout:
if not isinstance(b, tuple):
raise Exception("Entries in _valid_bdp_out are not all tuples as they must be.")
if not issubclass(b[0], BDP):
raise Exception("Invalid data type given in _valid_bdp_out, must be of type BDP.")
# take care of the optional one (only one allowed)
if b[1] == 0:
if haveopt:
raise Exception("Only one type of optional output is allowed.")
self._bdp_out_zero.append(b[0])
haveopt = True
# process the required ones
else:
# check the order
if haveopt:
raise Exception("Optional outputs must be last.")
# add an entry into the appropriate maps
for _ in range(0, b[1]):
self._bdp_out.append(None)
self._bdp_out_order_list.append(b[0])
self._bdp_out_map.append(-1)
self._bdp_out_length = len(self._bdp_out)
self._valid_bdp_out = bout
[docs] def set_bdp_in(self, bdpin=[]):
""" Validate the _valid_bdp_in list and digest it into the appropriate
attributes.
Parameters
----------
bdpin : list
List containing tuples of the valid BDP inputs and their
counts; defaults to empty list.
Returns
-------
None
"""
self._bdp_in_order_list = [] # ordered list of input types
self._bdp_in_order_type = [] # ordered list of type (optional/required)
self._bdp_in_zero = [] # list of optional types
self._bdp_in = [] # reset the _bdp_in list
self._bdp_in_map = [] # reset the mapping
# error check the input
if not isinstance(bdpin, list):
raise Exception("_valid_bdp_in is not in the proper form. See the dcoumentation for details.")
haveopt = False
havezero = False
# evaluate each entry
for b in bdpin:
if not isinstance(b, tuple):
raise Exception("Entries in _valid_bdp_out are not all tuples as they must be.")
if not issubclass(b[0], BDP):
raise Exception("Invalid data type given in _valid_bdp_in, must be of type BDP.")
# take care of the optional ones
if b[2] == bt.OPTIONAL:
haveopt = True
# process the non-open ended ones
if b[1] != 0:
if havezero:
raise Exception("Improper order in _valid_bdp_in see documentation.")
if b[2] == bt.REQUIRED and haveopt:
raise Exception("Improper order in _valid_bdp_in see documentation.")
for _ in range(0, b[1]):
self._bdp_in.append(None)
self._bdp_in_order_list.append(b[0])
self._bdp_in_order_type.append(b[2])
self._bdp_in_map.append(-1)
# take care of open ended ones
else:
havezero = True
self._bdp_in_zero.append(b[0])
self._bdp_in_length = len(self._bdp_in)
self._valid_bdp_in = bdpin
[docs] def getVersion(self):
""" Return the version string.
Parameters
----------
None
Returns
-------
String
The version of the AT
"""
return self._version
[docs] def getdtd(self, fl):
""" Method to write out the dtd data.
Parameters
----------
fl : file handle
Open output file handle.
Returns
-------
None
"""
dtdRead = DtdReader(self._type + ".dtd")
dtd = dtdRead.getDtd()
for line in dtd:
fl.write(line)
[docs] def html(self, inheader):
""" Method to represent the current AT in HTML format.
Parameters
----------
inheader : str
Base header info for the html output
Returns
-------
An html representation of the AT
"""
SPANXVAL = '<div class="span%s">%s</div><!-- spanx -->'
ENDROW = '</div><!-- row-fluid --> \n'
STARTROW ='<div class="row-fluid">'
outdir = self.dir()
basedir = os.path.basename(outdir.rstrip(os.sep))
if inheader == None:
admitloc = utils.admit_root()
admitetc = admitloc + os.sep + "etc"
admitfile = admitetc + os.sep + "form_at.html"
try:
with open(admitfile,"r") as h:
header = h.read()
except:
return "<h4> ***** failed to open %s ***** </h4>"% admitfile
else:
header = inheader
h = '<!-- ##### BEGIN FORM DATA FOR %s ##### -->\n' % self._type
num_bdp_in = len(self._bdp_in)
num_bdp_out = len(self._bdp_out)
h = h+'<h5><u>Input BDPs</u></h5>'
tabstr = '<table width="80%" class="table table-admit table-bordered table-striped"><tbody><tr><th>Index</th><th>Type</th><th>File name</th></tr>'
if num_bdp_in > 0:
h = h+tabstr
else:
h = h + "None"
for i in range(len(self._bdp_in)):
if self._bdp_in[i] != None:
href = '<a href="http://admit.astro.umd.edu/admit/module/admit.bdp/%s.html">%s</a>' % (self._bdp_in[i]._type, self._bdp_in[i]._type)
h = h+"<tr><td>%d</td><td>%s</td><td>%s</td></tr>" % (i,href,self._bdp_in[i].xmlFile)
else:
h = h+"<tr><td>%d</td><td>%s</td><td>%s</td></tr>" % (i,"None","None")
h = h + '</tbody></table>'
h = h+'<h5><u>Output BDPs</u></h5>'
if num_bdp_out > 0:
h = h+tabstr
else:
h = h + "None"
for i in range(len(self._bdp_out)):
if self._bdp_out[i] != None:
href = '<a href="http://admit.astro.umd.edu/admit/module/admit.bdp/%s.html">%s</a>' % (self._bdp_out[i]._type, self._bdp_out[i]._type)
h = h+"<tr><td>%d</td><td>%s</td><td>%s</td></tr>" % (i,href,self._bdp_out[i].xmlFile)
else:
h = h+"<tr><td>%d</td><td>%s</td><td>%s</td></tr>" % (i,"None","None")
h = h + '</tbody></table>'
h = h+'<h5><u>Keywords</u></h5>'
# squirrel away the task ID and task name for reference. the callback
# method will use these.
inp = '\n<!-- ********** Task ID and Task Name ********** -->\n <input class="input-admitform" type="hidden" value="%d" name="task[][taskid]"><br>\n' % self._taskid
h = h + inp
inp = '<input class="input-admitform" type="hidden" value="%s" name="task[][taskname]"><br>\n<!-- *********** Task Keywords *********** -->' % self._type
h = h + inp
for k in self._keys:
h = h + STARTROW
l = '<label class="label-admitform">%s</label>' % k
h = h + (SPANXVAL % ('2',l))
# If the type is not string, use repr() to get the value. This is because simply using
# %s can cause floating point numbers to be truncated, which AT.setkey() later interprets as
# a changed value causing a task to be re-run when it shouldn't be. Why not use repr() on strings, too?
# Because that causes the quotes to be transmitted in the json to the web form, which we
# don't want: It forces users to type quotes on string inputs (which Pedantic Python Peter prefers),
# but complicates parsing on input in Admit.py _onpost(). You'd think ast.literal_eval would
# solve this (see comments in _onpost()), but in fact it does not completely and there are
# still downstream failures. I do not wish to force our users to type quotes around string
# inputs in the web form, nor do I wish to chase this parsing rabbit down the hole.
# mwp 2016-aug-23
if type(self._keys[k]) == str:
inp = '<input class="input-admitform" type="text" value="%s" name="task[][%s]"><br>\n' % (self._keys[k],k)
else:
inp = '<input class="input-admitform" type="text" value="%s" name="task[][%s]"><br>\n' % (repr(self._keys[k]),k)
h = h + (SPANXVAL % ('6',inp))
h = h + ENDROW
h = h + '<!-- ##### END FORM DATA FOR %s ##### -->\n' % self._type
if self.running():
taskclass = "crashed-admittask"
elif self.enabled():
taskclass = "stale-admittask-form" if self.isstale() else "label-admittask-form"
else:
taskclass = "disabled-admittask-form"
retval = header % (taskclass, self._taskid, self.statusicons(), self._type,self._taskid,self._taskid,self._type, self._type, self._type, h , self._taskid)
return retval
#@todo move this to an htmlutils module
[docs] def statusicons(self):
"""return some html icons representing the enabled/stale status of this task"""
if not self._stale and self._enabled:
iconhtml = '<i class="icon-ok"></i>'
return iconhtml
iconhtml = ''
if self._stale:
iconhtml += '<i class="icon-warning-sign"></i>'
if not self._enabled:
iconhtml += '<i class="icon-ban-circle"></i>'
# If running() is true at the time statusicons() is called, the AT
# must have crashed.
if self.running():
iconhtml += '<i class="icon-fire"></i>'
return iconhtml
[docs] def write(self, node):
""" Method to write the AT to disk.
Parameters
----------
node : elementtree node
The node to attach to (usually supplied by flowmanager)
Returns
-------
node : elementtree node
The modified node
dtd : Text string of the dtd
"""
root = et.SubElement(node, self._type)
root.set("type", bt.AT)
dtdRead = DtdReader(self._type + ".dtd")
order = dtdRead.getOrder()
dtd = dtdRead.getDtd()
typs = dtdRead.getTypes()
kys = dtdRead.getKeys()
# call the writer
XmlWriter.XmlWriter(self, order, typs, root, kys)
# tell each BDP output to write also
self.save()
return node, dtd
[docs] def save(self):
""" Save (write) any BDPs connected to this AT.
Parameters
----------
None
Returns
-------
None
"""
if not self._needToSave:
return
for i in self._bdp_out:
if i is not None:
#print self.dir(i.xmlFile)
i.write(self.dir(i.xmlFile))
self._needToSave = False
[docs] def validatekeys(self):
""" Method to error check all input keys. If an error is found an error message should be
issued, not an exception. This allows for all values to be error checked before an
ecpetion is raised. This method should return a bool where True means all key values
are ok, and False otherwise. An exception will be raised by validatekeyvalues, which
calls this method, if False is returned. The generic form of this method does not check
anything, but should be overridden in individual AT's that need to check key values.
Parameters
----------
None
Returns
-------
bool, True if all key values are ok, False otherwise.
"""
return True
[docs] def dryrun(self):
""" Method to do a dry run of the AT, generally just checks input values for errors.
Parameters
----------
None
Returns
-------
bool, True if all is ok, False otherwise
"""
try:
self.validatekeys()
except:
return False
return True
[docs] def run(self):
"""Runs the task.
Running a task transforms its input BDPs into output BDPs according
to its function. Following this call the output BDPs accurately
reflect the current state of the task (keywords and BDP inputs).
Each concrete AT must implement its own **run()** method.
Returns
-------
None
See Also
--------
execute
"""
return
[docs] def execute(self, args=None):
""" Executes the task.
Execution performs the following actions:
0. Mark task running.
1. Update BDP inputs (if `args` is given).
2. Validate task keywords.
3. Validate task BDP inputs.
4. Call task `run()`_.
5. Mark task up to date.
6. Mark persistent BDP outputs out of date.
7. Disable running attribute.
Parameters
----------
args : list of BDPs, optional
List of BDP inputs used by `run()`_ method; default is to use
the most recently supplied inputs.
Returns
-------
None
See Also
--------
run
Notes
-----
If `args` is not given, the previously cached BDP inputs
are reused; hence execution (input validation) will fail
if required arguments are absent.
.. _run(): #admit.AT.AT.run
"""
self._running = True
logging.heading("Executing %s - '%s' (V%s)" %
(self._type, self._alias, self._version))
logging.reportKeywords(self._keys)
temploglevel = self.geteffectivelevel()
self.seteffectivelevel(self._loglevel)
if args: # self._bdp_in = args
self.clearinput()
for a in args:
self.addinput(a)
self.validatekeys()
validated, details = self.validateinput(True)
if not validated:
raise Exception("Inputs not validated: %s" % details)
# @todo review this if need, currently we clear (delete) all BDPs prior to running
self.clearoutput()
self.run()
self.markUpToDate()
self._needToSave = True
self.seteffectivelevel(temploglevel)
self._running = False
[docs] def checkfiles(self):
""" Check if the files from all the BDP_out's in an AT exist.
Return the list of files not found.
Parameters
----------
None
Returns
-------
None
Notes
-----
In multiflows, BDPs for ATs linked from other projects
(distinguished by a non-zero value of getProject()) are
*not* (re)checked for existence; this occurs when the parent
project is read.
"""
# Skip ATs linked into multiflows.
if self.getProject():
return None
if not self._enabled or self._stale:
return None
files = []
for bdp in self._bdp_out:
if bdp is None:
raise Exception("Missing BDP output files(s) from %s" % (self._type))
files += bdp.getfiles()
for fl in files:
#print "CHECKFILES: ", fl, self.dir(fl)
if fl is None:
continue
if not os.path.exists(self.dir(fl)):
self.markChanged()
logging.warning("AT.checkfiles():: File not found: " + fl)
return files
[docs] def delete(self):
"""Method to delete the AT and underlying BDPs.
It is recommended that any AT that stores images inside of lists,
dictionaries, tuples, etc. override this method with a customized
version.
Parameters
----------
None
Returns
-------
None
"""
for bdp in self_bdp_out:
if bdp is not None:
bdp.delete()
del bdp
[docs] def copy(self):
""" Creates an independent duplicate of the task.
The task ID of the duplicate will be unset and the alias marked
to allow auto-updating.
Parameters
----------
aliases : dict of set of str
Reserved alias registry.
Returns
-------
Reference to the new (cloned) task.
Notes
-----
The current implementation performs a deepcopy() and updates a few
members.
"""
a = copy.deepcopy(self)
if not a.isAutoAlias(): a._alias += '@1'
a._taskid = None
a.clearinput()
a.clearoutput(False)
a.markChanged()
return a
[docs] def isequal(self, at):
"""Method to determine if two ATs are the same. Useful for testing.
Parameters
----------
at : AT
The AT to compare against
Returns
-------
Boolean
Whether or not the ATs are equal
"""
try:
if at.get("_type") != self.get("_type"):
logging.info("AT types are not the same: " + at.get("_type") + " vs " +self.get("_type"))
return False
for i in self.__dict__:
if i == "_keys" or i == "_bdp_out" or i == "_bdp_in":
continue
if cmp(getattr(self, i), getattr(at, i)) != 0:
logging.info("Attribute %s does not match" % (i))
return False
for i in self._keys:
if cmp(self.getkey(i), at.getkey(i)) != 0:
logging.info("Keyword %s does not match" % (i))
return False
except:
return False
return True
[docs] def merge(self, at, aliases=None):
"""
Merges attributes from another task.
The purpose of this method is to facilitate re-running of flows by
transferring select attributes from one task to a compatible
replacement task. To maintain transparency, attributes are overwritten
only where necessary to preserve flow integrity; in particular, items
such as keyword values and the task ID are never modified.
Parameters
----------
at : ADMIT task
Task to merge with `self`.
aliases : 2-tuple of dict of str to set of str
Reserved alias registry, keyed by alias base name (index 0) or AT
type (index 1). For index 0, the value is the set of aliases
created with that base name; for index 1, it is the set of default
aliases created for ATs of that type. In both cases, the duplicate
aliases are appended with '@N' for some integer N.
Returns
-------
None
Notes
-----
It is the caller's responsibility to set the alias as desired. Doing so
locally (here) may result in unnecessary reassignments, causing tasks
which are otherwise unaltered to be marked stale.
The task enabled/disabled status is always copied from the input task.
This is necessary for variflow maintenance but applied uniformly for
simplicity.
"""
if self._type != at._type:
raise Exception("Task type mismatch: %s / %s" % \
(self._type, at._type))
if self.getProject() != at.getProject():
raise Exception("Task project mismatch: %d / %d" % \
(self.getProject(), at.getProject()))
# Outputs may be stale, but they're the latest available.
self.clearoutput(False)
if at:
for bdp in at:
if bdp is not None:
self.addoutput(bdp)
else:
at.markChanged()
logging.warning("Null output BDP encountered merging "
"%s - '%s'; task will be marked stale" %
(at._type, at._alias))
self._needToSave = True
self.save()
if self._keys == at._keys and self._variflow == at._variflow and \
(self.isAutoAlias(compat=at._alias) or self._alias == at._alias):
self._stale = at._stale
else:
self._stale = True
self.enabled(at.enabled())
self._summary = at.summary()
for key in self._summary: self._summary[key].setTaskID(self.id(True))
# Correct double counting of linked tasks.
if self.getProject(): self.unlink()
[docs] def newId(self, tid):
"""
Assigns the task a new ID number.
Related attributes are updated accordingly.
Parameters
----------
tid : int
New task ID.
Returns
-------
int
Assigned ID number.
"""
if self._taskid != tid:
self._taskid = tid
if self:
for bdp in self:
if bdp is not None: bdp.setkey("_taskid", tid)
self._needToSave = True
self.save()
return tid
[docs] def bestMatch(self, at1, at2):
"""
Determines the better match of two tasks to the current one.
Type is most important, then alias, then keywords (most matching
keywords, followed by most matching values).
Parameters
----------
at1 : AT
First task.
at2 : AT
Second task.
Returns
-------
int
ID number of best matching task (in the case of a tie, the first
task ID is returned).
"""
score = [0, 0]
for i in range(2):
task = at1 if i == 0 else at2
if task._type == self._type: score[i] += 1000000000
if task._alias == self._alias: score[i] += 100000000
if self.isAutoAlias(compat=task._alias): score[i] += 100000000
for key in self._keys:
if key in task._keys:
score[i] += 10
if self._keys[key] == task._keys[key]:
score[i] += 1
return at1.id() if score[0] >= score[1] else at2.id()