flowpipe.graph module

A Graph of Nodes.

class flowpipe.graph.Graph(name: str | None = None, nodes: list[INode] | None = None)

Bases: object

A graph of Nodes.

accepts_connection(output_plug: OutputPlug, input_plug: InputPlug) bool

Raise exception if new connection would violate integrity of graph.

Parameters:
Raises:

CycleError and ValueError

Returns:

True if the connection is accepted

add_node(node: INode) None

Add given Node to the Graph.

Nodes on a Graph have to have unique names.

add_plug(plug: IPlug, name: str | None = None) None

Promote the given plug this graph.

Parameters:
  • plug (IPlug) – The plug to promote to this graph

  • name (str) – Optionally use the given name instead of the name of the given plug

property all_nodes: list[INode]

Expand the graph with all its subgraphs into a flat list of nodes.

Please note that in this expanded list, the node names are no longer guaranteed to be unique!

Returns:

All nodes, including the nodes from subgraphs

Return type:

(list of INode)

delete_node(node: INode) None

Disconnect all plugs and then delete the node object.

static deserialize(data: dict[str, Any]) Graph

De-serialize from the given json data.

evaluate(*, mode: EvalMode | None = 'linear', skip_clean: bool = False, submission_delay: float = 0.1, max_workers: int | None = None, data_persistence: bool = True, evaluator: Evaluator | None = None, on_node_event: NodeEventCallback | None = None) None

Evaluate all Nodes in the graph.

Sorts the nodes in the graph into a resolution order and evaluates the nodes. Evaluation can be parallelized by utilizing the dependencies between the nodes - see the “mode” keyword for the options.

Note that no checks are in place whether the node execution is actually thread-safe or fit for multiprocessing. It is assumed to be given if the respective mode is selected.

Some keyword arguments do not affect all evaluation modes.

Parameters:
  • mode (str) – The evaluation mode. Possible modes are * linear : Iterates over all nodes in a single thread * threading : Evaluates each node in a new thread * multiprocessing : Evaluates each node in a new process

  • skip_clean (bool) – Whether to skip nodes that are ‘clean’ (as tracked by the ‘is_dirty’ attribute on the node), i.e. whose inputs have not changed since their output was computed

  • submission_delay (float) – The delay in seconds between loops issuing new threads/processes if nodes are ready to process.

  • max_workers (int) – The maximum number of parallel threads to spawn. None defaults to your pythons ThreadPoolExecutor default.

  • data_persistence (bool) – If false, the data on plugs that have connections gets cleared (set to None). This reduces the reference count of objects.

  • evaluator (flowpipe.evaluators.Evaluator) – The evaluator to use. For the basic evaluation modes will be picked by ‘mode’.

  • on_node_event (callable) – Optional callback invoked for node lifecycle events: started, finished, failed.

property evaluation_matrix: list[list[INode]]

Sort nodes into a 2D matrix based on their dependency.

Rows affect each other and have to be evaluated in sequence. The Nodes on each row however can be evaluated in parallel as they are independent of each other. The amount of Nodes in each row can vary.

Returns:

Each sub list represents a row.

Return type:

(list of list of INode)

property evaluation_sequence: list[INode]

Sort Nodes into a sequential, flat execution order.

Returns:

A one dimensional representation of the

evaluation matrix.

Return type:

(list of INode)

static from_json(data: dict[str, Any]) Graph

De-serialize from the given json data.

static from_pickle(data: bytes) Graph

De-serialize from the given pickle data.

property input_groups: dict[str, InputPlugGroup]

Return all inputs that are actually input groups.

list_repr() str

List representation of the graph showing Nodes and connections.

node_repr() str

Format to visualize the Graph.

serialize(with_subgraphs: bool = True) dict[str, Any]

Serialize the graph in its grid form.

Deprecated.

property subgraphs: dict[str, Graph]

All other graphs that the nodes of this graph are connected to.

Returns:

A dict in the form of {graph.name: graph}

to_json() dict[str, Any]

Serialize the graph into a json.

to_pickle() bytes

Serialize the graph into a pickle.

flowpipe.graph.get_default_graph() Graph

Retrieve the default graph.

flowpipe.graph.reset_default_graph() None

Reset the default graph to an empty graph.

flowpipe.graph.set_default_graph(graph: Graph) None

Set a graph as the default graph.