Flow — Project task flow manager.¶
This module defines the FlowManager class.
-
class
admit.FlowManager.
FlowManager
(connmap=None, bdpmap=None, depsmap=None, varimap=None, tasklevs=None, tasks=None)[source]¶ Manages the flow of data products between ADMIT tasks.
The flow manager maintains the tree of ADMIT tasks (ATs) constituting a data flow. Tasks communicate through basic data products (BDPs) that flow between tasks, which combine and transform them into new BDPs according to their function. This class implements the bookkeeping to track task connections and inter-dependencies. A central concept is the task connection 4-tuple,
(si, sp, di, dp)
, connecting a source task (IDsi
) output BDP (portsp
) to a destination task (IDdi
) input BDP (portdp
). The primary concern of the flow manager is to organize and maintain this connection information to permit efficient execution and modification of data flows.The flow manager includes limited functionality supporting self-modifying variadic flows (variflows). In a variflow, at least one AT produces a variable number of outputs, whether from instance to instance or between runs of the same instance. This presents a problem for defining the sub-flow emanating from a variadic AT, as every concrete flow must assume a fixed number of outputs from each AT to define the connection map. To support variflows, the flow manager will automatically clone or prune (disable) sub-flows attached to the variadic output ports of an AT, each time the AT is executed, according to the actual number of BDPs present. When additional parallel sub-flows are needed, the sub-flow(s) attached to the first variadic output port is cloned and connected to the new, extra outputs. (If the port already has any ATs attached to it, the existing sub-flow(s) are enabled but no cloning is performed.) If fewer sub-flows are needed, the excess tasks are simply disabled (i.e., not removed).
Parameters: connmap : triple-nested dictionary of 4-tuples, optional
Initial value of the _connmap attribute (defaults to an empty dictionary).
bdpmap : dictionary of list of 2-tuples, optional
Initial value of the _bdpmap attribute (defaults to an empty dictionary).
depsmap : dictionary of sets, optional
Initial value of the _depsmap attribute (defaults to an empty dictionary).
varimap : double-nested dictionary of list of sets, optional
Initial value of the _varimap attribute (defaults to an empty dictionary).
tasklevs : dictionary of int, optional
Initial value of the _tasklevs attribute (defaults to an empty dictionary).
tasks : dictionary of task references, optional
Initial value of the _tasks attribute (defaults to an empty dictionary).
Notes
All internal state is kept up to date by add() and remove() so that it is valid at all times. Hence _connmap, _depsmap, etc. should never be modified by any other methods, either inside or outside of FlowManager (the leading underscore emphasizes this intent).
Attributes
_connmap (triple-nested dictionary of 4-tuples) Organizes connection 4-tuples in a 3-level dictionary hierarchy; _connmap[si][di][dp] is a connection 4-tuple (si, sp, di, dp)
connecting source task IDsi
, BDP output portsp
, to destination taskdi
, inputdp
._bdpmap (dictionary of list of 2-tuples) Relates BDP outputs from source tasks with the BDP inputs to connected destination tasks; _bdpmap[di] is a list of 2-tuples (si, sp)
, one for each BDP input port, describing the source of the corresponding BDP._depsmap (dictionary of sets) Maintains the task dependency levels needed by the run() method; _depsmap[level] is the set of task IDs residing at the dependency level
. Lower level tasks must execute prior to higher level tasks to satisfy dependency requirements; tasks at the same level may be executed in any order (even concurrently)._varimap (double-nested dictionary of list of sets) Tracks variadic sub-flows and their controlling (variadic) root task; _varimap[si][sp] is a list of sub-flow task IDs, each stored as a set (one per sub-flow), emanating from task ID si
, (variadic) output portsp
._tasklevs (dictionary of int) Maintains task dependency information in a complementary (inverse) manner to _depsmap; _tasklevels[id] is the dependency level for task id
._tasks (dictionary of task references) Holds references to all ADMIT tasks in the flow, keyed by task id; _tasks[id] is an ADMIT task reference. _aliases (2-tuple of dictionary of task aliases) Reserved alias registry, keyed by alias base name (index 0) or AT type (index 1). For index 0, the value is an integer count of aliases created with that base name; for index 1, it is the number of default aliases created for ATs of that type. In both cases, the duplicate aliases are appended with ‘@N’ for some integer N. Not present in XML. _taskid (int) Task ID counter, used to ensure unqiue task IDs within the flow. Not present in XML. Methods
add
(a[, stuples, dtuples])Appends or inserts an AT into the task flow. clone
(id[, flow, idmap])Clones the flow emanating from a given root task (included). connectInputs
()Connects input BDPs to all tasks in the flow. diagram
(dotfile)Generates dot (graphviz) diagram for the current flow. downstream
(id[, leaf])Determines the ATs downstream of a task (includes itself). dryrun
()Performs a dry run. find
(isMatch)Finds ATs in the flow matching a criterion. findTwin
(tid, flow0, twins)Attempts to find corresponding task in another flow. inFlow
(a[, stuples])Determines whether a compatible task exists in the flow. mergeTasks
(summary, flow0, summary0, twins, ...)Merges task settings from another flow. remove
(id[, keepRoot, delFiles])Removes an AT and its downstream tasks. replace
(id, a[, stuples])Replaces one task with another, removing the original. run
([dryrun])Executes the flow, but only tasks that are out of date. sameLineage
(tid, tid0, flow0, twins, match)Determines whether two tasks in different flows have identical ancestry. script
(py[, proj, tcube])Generates a Python script recreating the current flow. show
()Displays formatted internal FlowManager state. showsetkey
([outfile])Display keysettings for all tasks, meant to be used to stale
(id[, direct])Sets the stale flag of an AT and its downstream ATs. verify
()Verifies the internal state of the FlowManager. -
add
(a, stuples=None, dtuples=None)[source]¶ Appends or inserts an AT into the task flow.
Appending a task creates a new leaf—a task whose BDP outputs (if any) are not connected to any tasks; in this case, dtuples can be omitted. Insertion implies that one or more of the tasks’s outputs feeds back into the flow, in which case dtuples will be non-empty.
Parameters: a : AT
ADMIT task to append/insert into the flow.
stuples : list of 2-tuples, optional
List of source connection 2-tuples, one per BDP input port. For example,
[(si0,sp0), (si1,sp1)]
if the task requires two BDP inputs, the first (input port 0) from tasksi0
, BDP outputsp0
and the second (input port 1) from tasksi1
, BDP outputsp1
. Defaults to None (no upstream tasks).dtuples : list of 4-tuples, optional
List of destination connection 4-tuples
(si, sp, di, dp)
. The number of connections varies but will be zero for leaf (i.e., appended) tasks. The source task ID in these tuples must match the input task IDa.id
, as it is the source for all dtuples connections. For example, [(1, 0, 2, 1), (1, 0, 3, 2)] if this task (id #1) disseminates its BDP output (port 0) to two downstream task inputs (#2 port 1 and #3 port 2). Defaults to None (no downstream tasks).Returns: int
Input task ID on success, else -1 (error detected).
Notes
Usually all but the root task(s) will have a non-empty stuples list.
As a convenience, in the
stuples
anddtuples
arguments the task alias may be used in place of the integer task ID. Also, for thestuples
argument (only), a destination port of zero may be omitted (implied); i.e.,(si, 0)
may be replaced withsi
(or the alias name).
-
clone
(id, flow=None, idmap=None)[source]¶ Clones the flow emanating from a given root task (included).
Creates an independent, parallel sub-flow duplicating the action of the original. If flow is None, the sub-flow emanating from task id (inclusive) is duplicated. Otherwise, the sub-flow defined by flow = (fm, tid), where fm is a FlowManager instance and task tid (contained in fm) is the root of the sub-flow, is duplicated and spliced into the current flow, using task id as its root instead—i.e., task tid is identified with id and the tasks following tid are cloned and appended to id. In the latter case, task id is not duplicated; the new sub-flow is merely appended to it.
Parameters: id : int
Task ID of the existing sub-flow root task.
flow : 2-tuple, optional
Tuple (fm, tid) describing an external sub-flow; fm is a FlowManager object and tid the ID of a task within it.
idmap : dict of int key-values, optional
Dictionary mapping root task IDs in flow to corresponding tasks in self.
Returns: int
Task ID of the new (possibly cloned) root task.
Notes
If an external flow is provided that is not autonomous (i.e., one or more of its ATs receives inputs from ATs outside the sub-flow), the idmap argument is required and must map task IDs from flow to self for all ATs outside the sub-flow. This informs the method how to identify parent tasks between flows.
-
connectInputs
()[source]¶ Connects input BDPs to all tasks in the flow.
Connections are made regardless of whether tasks are out of date or disabled. Missing inputs are ignored. This method also updates the stale flag on each task.
Parameters: None Returns: None
-
diagram
(dotfile)[source]¶ Generates dot (graphviz) diagram for the current flow.
Parameters: dotfile : str
Output dot file name.
Returns: None
-
downstream
(id, leaf=None)[source]¶ Determines the ATs downstream of a task (includes itself).
The downstream tasks constitute the sub-flow emanating from the specified root task (also considered part of the sub-flow).
Parameters: id : int
Root task ID.
leaf : set of int, optional
Initial set of leaf task IDs.
Returns: set
Set of AT task IDs (including the root, id) on success, else an empty list if recursion is detected.
Notes
Recursion through task id is detected. This is used by add() to prevent the creation of cyclic flows.
-
dryrun
()[source]¶ Performs a dry run.
Dry runs provide a summary of which tasks are in the flow, which are stale (out-of-date), and the order in which they will be executed in a live run.
Returns: None
-
find
(isMatch)[source]¶ Finds ATs in the flow matching a criterion.
Applies the function isMatch to all ATs in the flow, in proper dependency order, accumulating matching ATs in a list (the return value). Downstream ATs are guaranteed to follow their predecessors in this list. Often isMatch may be conveniently expressed as a lambda function.
Parameters: isMatch : bool functor(AT)
Function object taking one AT as input and returning a Boolean.
Returns: list of ATs
ATs testing True using isMatch.
Notes
Tasks at the same dependency level are scanned in an unspecified order. (Such tasks are all independent of each other.)
Examples
To find all ATs with ID less than 100 in flow fm:
>>> fm.find(lambda at: at.id() < 100)
-
findTwin
(tid, flow0, twins)[source]¶ Attempts to find corresponding task in another flow.
For tasks to correspond they must be of identical types with identical lineage. The latter means that the two tasks have identical parents (the same number of inputs connected to the same ports on tasks of identical type), grandparents, etc.
Parameters: tid : int
Task ID in current flow.
flow0 : FlowManager
Flow in which to search for matching task.
twins : dict of ints
Mapping of previously matched task IDs in flow0 (keys) to corresponding IDs in the current flow (values).
Returns: int or None
Valid flow0 task ID on success, else
None
.
-
inFlow
(a, stuples=None)[source]¶ Determines whether a compatible task exists in the flow.
A task is compatible with another if: 1. It has the identical concrete type. 2. It has the same task ID. 3. It has identical source connections (if supplied).
Parameters: a : ADMIT task
Task reference.
stuples : list of 2-tuples, optional
List of source connection 2-tuples, one per BDP input port (default None bypasses the connection check).
Returns: True if a compatible task exists in the flow, else False.
See also
-
mergeTasks
(summary, flow0, summary0, twins, final)[source]¶ Merges task settings from another flow.
Compares the tasks in flow0 to self; tasks of the same type, at the same dependency level and with equivalent BDP input trees, are identified and relevant settings transferred into the current flow. Any task in the current flow which cannot be matched with a flow0 task according to these criteria is passed unaltered. Each task in flow0 may be merged with at most one task in the current flow.
Parameters: summary : SummaryData
Summary data for current flow.
flow0 : FlowManager
Flow from which to merge task stale settings.
summary0 : SummaryData
Original summary data for flow0.
twins : dict of ints
Task IDs of all known pairs of identified (merged) tasks.
final : bool
Whether this will be the last call to this method for the current flow. If so, any unattached variflow branches will be merged.
Returns: None
Notes
This is a low-level method normally called only by Admit.mergeFlow().
In support of incremental flow construction, it is permissible to merge a flow multiple times; individual tasks will be merged at most once.
-
remove
(id, keepRoot=False, delFiles=True)[source]¶ Removes an AT and its downstream tasks.
Deletes an entire sub-flow starting from the specified root task.
Parameters: id : int
Task ID of root AT to be removed.
keepRoot : bool, optional
Whether to preserve the root AT, id, and remove only its descendents.
delFiles : bool, optional
Whether to delete BDP data when removing tasks.
Returns: None
Notes
This method does not remove any variflow-related metadata as this would disrupt the flow management performed by veriflows themselves. User removal of tasks within variadic flows is only supported via editing and re-running of script files.
-
replace
(id, a, stuples=None)[source]¶ Replaces one task with another, removing the original.
The replacement task must have the same output signature (i.e, produce the same types/number of output BDPs)—otherwise the existing task could not be removed—but stuples may be specified if the inputs differ.
Parameters: id : int
Task ID of AT to be removed.
a : AT
Task to insert into the flow.
stuples : list of 2-tuples, optional
Source connection 2-tuples
(si, sp)
for a. The special default value None indicates that the existing task’s sources should be reused verbatim.Returns: None
-
run
(dryrun=False)[source]¶ Executes the flow, but only tasks that are out of date.
Runs all stale, enabled tasks in the correct order, accounting for their inter-dependencies. This will reduce to a no-op for tasks whose outputs are up-to-date (not dependent on stale tasks). The (global) project summary is updated on-the-fly as tasks are executed and must contain a valid Summary object on entry.
Parameters: dryrun : bool, optional
Whether to perform a dry run (else a live run); defaults to
False
.Returns: None
-
sameLineage
(tid, tid0, flow0, twins, match)[source]¶ Determines whether two tasks in different flows have identical ancestry.
Ancestry is determined by the type and number of parents, the associated port connections, and likewise for all grandparents, etc.
Parameters: tid : int
Task ID in current flow.
tid0 : int
Task ID in flow0.
flow0 : FlowManager
Flow in which to search for matching task.
twins : dict of ints
Mapping of previously matched task IDs in flow0 (keys) to corresponding IDs in the current flow (values).
match : int (or None)
Current best-match task ID in flow0 known to have lineage compatible with tid.
Returns: bool
Whether tid0 task has lineage compatible with tid and is the better-matching task (compared to match).
Notes
- Fix stale setting when parent keywords have changed between flows, even though each task is individually up to date.
-
script
(py, proj='p', tcube=None)[source]¶ Generates a Python script recreating the current flow.
Parameters: py : file
Open, writable Python file object.
proj : str, optional
Project variable name; defaults to ‘p’.
tcube : int, optional
Task ID of Ingest_AT to set input file=cubefile; defaults to None.
Returns: None
Notes
This is a low-level method normally called only by Admit.script().
-
show
()[source]¶ Displays formatted internal FlowManager state.
Pretty-prints the current contents of the instance to the screen (standard output).
Returns: None
-
showsetkey
(outfile=None)[source]¶ Display keysettings for all tasks, meant to be used to write a template file for rerunning an admit
-
stale
(id, direct=True)[source]¶ Sets the stale flag of an AT and its downstream ATs.
Stale tasks will be re-run upon execution of the flow, updating their output BDPs in the process; tasks not so marked are skipped to minimize running time.
Parameters: id : int
Root task ID.
direct : bool, optional
Whether to mark only direct descendants stale (else the entire sub-flow); defaults to
True
.Returns: None
-