flowpipe.graph module

A Graph of Nodes.

class flowpipe.graph.Graph(name=None, nodes=None)

Bases: object

A graph of Nodes.

accepts_connection(output_plug, input_plug)

Raise exception if new connection would violate integrity of graph.

Parameters:
Raises:

CycleError and ValueError

Returns:

True if the connection is accepted

add_node(node)

Add given Node to the Graph.

Nodes on a Graph have to have unique names.

add_plug(plug, name=None)

Promote the given plug this graph.

Parameters:
  • plug (flowpipe.plug.IPlug) – The plug to promote to this graph

  • name (str) – Optionally use the given name instead of the name of the given plug

property all_nodes

Expand the graph with all its subgraphs into a flat list of nodes.

Please note that in this expanded list, the node names are no longer guaranteed to be unique!

Returns:

All nodes, including the nodes from subgraphs

Return type:

(list of INode)

delete_node(node)

Disconnect all plugs and then delete the node object.

static deserialize(data)

De-serialize from the given json data.

evaluate(mode='linear', skip_clean=False, submission_delay=0.1, max_workers=None, data_persistence=True, evaluator=None)

Evaluate all Nodes in the graph.

Sorts the nodes in the graph into a resolution order and evaluates the nodes. Evaluation can be parallelized by utilizing the dependencies between the nodes - see the “mode” keyword for the options.

Note that no checks are in place whether the node execution is actually thread-safe or fit for multiprocessing. It is assumed to be given if the respective mode is selected.

Some keyword arguments do not affect all evaluation modes.

Parameters:
  • mode (str) – The evaluation mode. Possible modes are * linear : Iterates over all nodes in a single thread * threading : Evaluates each node in a new thread * multiprocessing : Evaluates each node in a new process

  • skip_clean (bool) – Whether to skip nodes that are ‘clean’ (as tracked by the ‘is_dirty’ attribute on the node), i.e. whose inputs have not changed since their output was computed

  • submission_delay (float) – The delay in seconds between loops issuing new threads/processes if nodes are ready to process.

  • max_workers (int) – The maximum number of parallel threads to spawn. None defaults to your pythons ThreadPoolExecutor default.

  • data_persistence (bool) – If false, the data on plugs that have connections gets cleared (set to None). This reduces the reference count of objects.

  • evaluator (flowpipe.evaluators.Evaluator) – The evaluator to use. For the basic evaluation modes will be picked by ‘mode’.

property evaluation_matrix

Sort nodes into a 2D matrix based on their dependency.

Rows affect each other and have to be evaluated in sequence. The Nodes on each row however can be evaluated in parallel as they are independent of each other. The amount of Nodes in each row can vary.

Returns:

Each sub list represents a row.

Return type:

(list of list of INode)

property evaluation_sequence

Sort Nodes into a sequential, flat execution order.

Returns:

A one dimensional representation of the

evaluation matrix.

Return type:

(list of INode)

static from_json(data)

De-serialize from the given json data.

static from_pickle(data)

De-serialize from the given pickle data.

property input_groups

Return all inputs that are actually input groups.

list_repr()

List representation of the graph showing Nodes and connections.

node_repr()

Format to visualize the Graph.

serialize(with_subgraphs=True)

Serialize the graph in its grid form.

Deprecated.

property subgraphs

All other graphs that the nodes of this graph are connected to.

Returns:

A dict in the form of {graph.name: graph}

to_json()

Serialize the graph into a json.

to_pickle()

Serialize the graph into a pickle.

flowpipe.graph.get_default_graph()

Retrieve the default graph.

flowpipe.graph.reset_default_graph()

Reset the default graph to an empty graph.

flowpipe.graph.set_default_graph(graph)

Set a graph as the default graph.