Version Build Status Codacy_Badge_Grade Coverage
Coverage Report
FileStmtsMissCover
TOTAL9400100%
License: MIT PyPI - Python Version Documentation Status Black Flowpipe Logo

Flow-based Programming

A lightweight framework for flow-based programming in python.

+-------------------+          +---------------------+
|   Invite People   |          |   Birthday Party    |
|-------------------|          |---------------------|
o amount<4>         |   +----->o attendees<>         |
|            people o---+ +--->o cake<>              |
+-------------------+     |    +---------------------+
                          |
+-------------------+     |
|    Bake a cake    |     |
+-------------------+     |
o type<"Chocolate"> |     |
|              cake o-----+
+-------------------+

Benefits:

  • Visualize code

  • Re-usability

  • Streamlined code design

  • Built-in concurrency

  • Represent workflows one to one in the code

Quick Example

Consider this simple example on how to represent the construction of a house with Flowpipe:

from flowpipe import Graph, INode, Node, InputPlug, OutputPlug


class HireWorkers(INode):
    """A node can be derived from the INode interface.

    The plugs are defined in the init method.
    The compute method received the inputs from any connected upstream nodes.
    """

    def __init__(self, amount=None, **kwargs):
        super(HireWorkers, self).__init__(**kwargs)
        InputPlug('amount', self, amount)
        OutputPlug('workers', self)

    def compute(self, amount):
        workers = ['John', 'Jane', 'Mike', 'Michelle']
        print('{0} workers are hired to build the house.'.format(amount))
        return {'workers.{0}'.format(i): workers[i] for i in range(amount)}


@Node(outputs=['workers'])
def Build(workers, section):
    """A node can also be created by the Node decorator.outputs

    The inputs to the function are turned into InputsPlugs, otuputs are defined
    in the decorator itself. The wrapped function is used as the compute method.
    """
    print('{0} are building the {1}'.format(', '.join(workers.values()), section))
    return {'workers.{0}'.format(i): worker for i, worker in workers.items()}


@Node()
def Party(attendees):
    print('{0} and {1} are having a great party!'.format(
        ', '.join(list(attendees.values())[:-1]), list(attendees.values())[-1]))


# Create a graph with the necessary nodes
graph = Graph(name='How to build a house')
workers = HireWorkers(graph=graph, amount=4)
build_walls = Build(graph=graph, name='Build Walls', section='walls')
build_roof = Build(graph=graph, name='Build Roof', section='roof')
party = Party(graph=graph, name='Housewarming Party')

# Wire up the connections between the nodes
workers.outputs['workers']['0'].connect(build_walls.inputs['workers']['0'])
workers.outputs['workers']['1'].connect(build_walls.inputs['workers']['1'])
workers.outputs['workers']['2'].connect(build_roof.inputs['workers']['0'])
workers.outputs['workers']['3'].connect(build_roof.inputs['workers']['1'])
build_walls.outputs['workers']['0'] >> party.inputs['attendees']['0']
build_walls.outputs['workers']['1'] >> party.inputs['attendees']['2']
build_roof.outputs['workers']['0'] >> party.inputs['attendees']['1']
build_roof.outputs['workers']['1'] >> party.inputs['attendees']['3']
party.inputs['attendees']['4'].value = 'Homeowner'

Visualize the code as a graph or as a listing:

print(graph.name)
print(graph)
print(graph.list_repr())

Output:

How to build a house
+------------------------+          +------------------------+          +---------------------------+
|      HireWorkers       |          |       Build Roof       |          |    Housewarming Party     |
|------------------------|          |------------------------|          |---------------------------|
o amount<4>              |          o section<"roof">        |          % attendees                 |
|                workers %          % workers                |     +--->o  attendees.0<>            |
|             workers.0  o-----+--->o  workers.0<>           |     |--->o  attendees.1<>            |
|             workers.1  o-----|--->o  workers.1<>           |     |--->o  attendees.2<>            |
|             workers.2  o-----|    |                workers %     |--->o  attendees.3<>            |
|             workers.3  o-----|    |             workers.0  o-----|    o  attendees.4<"Homeowner>  |
+------------------------+     |    |             workers.1  o-----|    +---------------------------+
                               |    +------------------------+     |
                               |    +------------------------+     |
                               |    |      Build Walls       |     |
                               |    |------------------------|     |
                               |    o section<"walls">       |     |
                               |    % workers                |     |
                               +--->o  workers.0<>           |     |
                               +--->o  workers.1<>           |     |
                                    |                workers %     |
                                    |             workers.0  o-----+
                                    |             workers.1  o-----+
                                    +------------------------+

Build a House
 HireWorkers
  [i] amount: 4
  [o] workers
   [o] workers.0 >> Build Walls.workers.0
   [o] workers.1 >> Build Walls.workers.1
   [o] workers.2 >> Build Roof.workers.0
   [o] workers.3 >> Build Roof.workers.1
 Build Roof
  [i] section: "roof"
  [i] workers
   [i] workers.0 << HireWorkers.workers.2
   [i] workers.1 << HireWorkers.workers.3
  [o] workers
   [o] workers.0 >> Housewarming Party.attendees.1
   [o] workers.1 >> Housewarming Party.attendees.3
 Build Walls
  [i] section: "walls"
  [i] workers
   [i] workers.0 << HireWorkers.workers.0
   [i] workers.1 << HireWorkers.workers.1
  [o] workers
   [o] workers.0 >> Housewarming Party.attendees.0
   [o] workers.1 >> Housewarming Party.attendees.2
 Housewarming Party
  [i] attendees
   [i] attendees.0 << Build Walls.workers.0
   [i] attendees.1 << Build Roof.workers.0
   [i] attendees.2 << Build Walls.workers.1
   [i] attendees.3 << Build Roof.workers.1
   [i] attendees.4: "Homeowner"

Now build the house:

graph.evaluate(mode='threading')  # Options are linear, threading and multiprocessing

Output:

4 workers are hired to build the house.
Michelle, Mike are building the roof
Jane, John are building the walls
Mike, John, Michelle, Jane and Homeowner are having a great party!

(Note: for more elaborate evaluation schemes, see Evaluators)

We now know how to throw a party, so let’s invite some people and re-use these skills for a birthday:

graph = Graph(name='How to throw a birthday party')

@Node(outputs=['people'])
def InvitePeople(amount):
    people = ['John', 'Jane', 'Mike', 'Michelle']
    d = {'people.{0}'.format(i): people[i] for i in range(amount)}
    d['people'] = {people[i]: people[i] for i in range(amount)}
    return d

invite = InvitePeople(graph=graph, amount=4)
birthday_party = Party(graph=graph, name='Birthday Party')
invite.outputs['people'] >> birthday_party.inputs['attendees']

print(graph.name)
print(graph)
graph.evaluate()

Output:

How to throw a birthday party
+-------------------+          +---------------------+
|   InvitePeople    |          |   Birthday Party    |
|-------------------|          |---------------------|
o amount<4>         |     +--->o attendees<>         |
|            people o-----+    +---------------------+
+-------------------+

Jane, Michelle, Mike and John are having a great party!

More Examples

There are more examples for common use cases of flowpipe:

The code for these examples: house_and_birthday.py!

Another simple example: world_clock.py!

How to make use of nested subgraphs: nested_graphs.py!

Using the command pattern with flowpipe successfully: workflow_design_pattern.py!

Use flowpipe on a remote cluster of machines, commonly refered to as a “render farm” in the VFX/Animation industry: vfx_render_farm_conversion.py!

An example graph showcasing a common workflow encountered in the VFX/Animation industry: vfx_rendering.py!

VFX Pipeline

If you are working in the VFX/Animation industry, please check out this extensive guide on how to use flowpipe in a vfx pipeline!

Evaluators

If your nodes just need sequential, threaded or multiprocessing evaluation, the Graph.evaluate() method will serve you just fine. If you want to take more control over the way your Graph is being evaluated, Evaluators are for you. This can also be used to add, e.g. logging or tracing to node evaluation.

Evaluators allow you to take control of node evaluation order, or their scheduling. See flowpipe/evaluator.py to see the Graph.evaluate() method’s evaluation schemes.

To use a custom evaluator, subclass flowpipe.evaluator.Evaluator, and provide at least an _evaluate_nodes(self, nodes) method. This method should take a list of nodes and call their respective node.evalaute() methods (along with any other task you want to do for each node being evaluated). To use a cusom evaluator, create it and call its Evalator.evaluate() method with the Graph to evaluate as an argument:

from flowpipe.evaluators import LinearEvaluator

# assuming you created a graph to evaluate above, called `graph`
lin_eval = LinearEvaluator()
lin_eval.evaluate(graph)

Code Documentation

API Reference

Submodules

flowpipe.errors module

Exceptions raised by flowpipe.

exception flowpipe.errors.CycleError

Bases: Exception

Raised when an action would result in a cycle in a graph.

flowpipe.event module

Events are emitted during node evaluation.

They an be used to observe the evaluation process.

class flowpipe.event.Event(name)

Bases: object

Very simple implementation of an event system.event

The event simply calls the registered functions with the given arguments. Please note that the integrity of the listeners is not enforced or checked.

clear()

Remove all listeners from this event.

deregister(listener)

Deregister the given function object if it is registered.

emit(*args, **kwargs)

Call all the listeners with the given args and kwargs.

is_registered(listener)

Whether the given function object is already registered.

register(listener)

Register the given function object if it is not yet registered.

flowpipe.graph module

A Graph of Nodes.

class flowpipe.graph.Graph(name=None, nodes=None)

Bases: object

A graph of Nodes.

accepts_connection(output_plug, input_plug)

Raise exception if new connection would violate integrity of graph.

Parameters
Raises

CycleError and ValueError

Returns

True if the connection is accepted

add_node(node)

Add given Node to the Graph.

Nodes on a Graph have to have unique names.

add_plug(plug, name=None)

Promote the given plug this graph.

Parameters
  • plug (flowpipe.plug.IPlug) – The plug to promote to this graph

  • name (str) – Optionally use the given name instead of the name of the given plug

property all_nodes

Expand the graph with all its subgraphs into a flat list of nodes.

Please note that in this expanded list, the node names are no longer guaranteed to be unique!

Returns

All nodes, including the nodes from subgraphs

Return type

(list of INode)

delete_node(node)

Disconnect all plugs and then delete the node object.

static deserialize(data)

De-serialize from the given json data.

evaluate(mode='linear', skip_clean=False, submission_delay=0.1, max_workers=None, data_persistence=True, evaluator=None)

Evaluate all Nodes in the graph.

Sorts the nodes in the graph into a resolution order and evaluates the nodes. Evaluation can be parallelized by utilizing the dependencies between the nodes - see the “mode” keyword for the options.

Note that no checks are in place whether the node execution is actually thread-safe or fit for multiprocessing. It is assumed to be given if the respective mode is selected.

Some keyword arguments do not affect all evaluation modes.

Parameters
  • mode (str) – The evaluation mode. Possible modes are * linear : Iterates over all nodes in a single thread * threading : Evaluates each node in a new thread * multiprocessing : Evaluates each node in a new process

  • skip_clean (bool) – Whether to skip nodes that are ‘clean’ (as tracked by the ‘is_dirty’ attribute on the node), i.e. whose inputs have not changed since their output was computed

  • submission_delay (float) – The delay in seconds between loops issuing new threads/processes if nodes are ready to process.

  • max_workers (int) – The maximum number of parallel threads to spawn. None defaults to your pythons ThreadPoolExecutor default.

  • data_persistence (bool) – If false, the data on plugs that have connections gets cleared (set to None). This reduces the reference count of objects.

  • evaluator (flowpipe.evaluators.Evaluator) – The evaluator to use. For the basic evaluation modes will be picked by ‘mode’.

property evaluation_matrix

Sort nodes into a 2D matrix based on their dependency.

Rows affect each other and have to be evaluated in sequence. The Nodes on each row however can be evaluated in parallel as they are independent of each other. The amount of Nodes in each row can vary.

Returns

Each sub list represents a row.

Return type

(list of list of INode)

property evaluation_sequence

Sort Nodes into a sequential, flat execution order.

Returns

A one dimensional representation of the

evaluation matrix.

Return type

(list of INode)

static from_json(data)

De-serialize from the given json data.

static from_pickle(data)

De-serialize from the given pickle data.

property input_groups

Return all inputs that are actually input groups.

list_repr()

List representation of the graph showing Nodes and connections.

node_repr()

Format to visualize the Graph.

serialize(with_subgraphs=True)

Serialize the graph in its grid form.

Deprecated.

property subgraphs

All other graphs that the nodes of this graph are connected to.

Returns

A dict in the form of {graph.name: graph}

to_json()

Serialize the graph into a json.

to_pickle()

Serialize the graph into a pickle.

flowpipe.graph.get_default_graph()

Retrieve the default graph.

flowpipe.graph.reset_default_graph()

Reset the default graph to an empty graph.

flowpipe.graph.set_default_graph(graph)

Set a graph as the default graph.

flowpipe.node module

Nodes manipulate incoming data and provide the outgoing data.

class flowpipe.node.FunctionNode(func=None, outputs=None, name=None, identifier=None, metadata=None, graph=None, **kwargs)

Bases: INode

Wrap a function into a Node.

RESERVED_INPUT_NAMES = ('func', 'name', 'identifier', 'inputs', 'outputs', 'metadata', 'omit', 'graph')
compute(*args, **kwargs)

Call and return the wrapped function.

post_deserialize(data)

Apply the function back to the node.

to_pickle()

Pickle the node. – DOES NOT WORK FOR FunctionNode.

class flowpipe.node.INode(name=None, identifier=None, metadata=None, graph='default')

Bases: object

Holds input and output Plugs and a method for computing.

EVENT_TYPES = ['evaluation-omitted', 'evaluation-started', 'evaluation-finished', 'evaluation-exception']
all_inputs()

Collate all input plugs and their sub_plugs into one dictionary.

all_outputs()

Collate all output plugs and their sub_plugs into one dictionary.

property children

Nodes connected directly to outputs of this Node.

abstract compute(*args, **kwargs)

Implement the data manipulation in the subclass.

Return a dictionary with the outputs from this function.

connect(other)

Connect this node’s outputs to another plug’s input by name.

If other is an InputPlug, connect the output with matching name. If other is an INode, connect all outputs with matching names.

Note: This will also connect up sub-plugs if, and only if, they already exist. As they are dynamically created, they will come into existence only after being referenced explicity at least once. Before, the connect() method will not pick them up.

static deserialize(data)

De-serialize from the given json data.

property downstream_nodes

Nodes connected directly or indirectly to outputs of this Node.

evaluate()

Compute this Node, log it and clean the input Plugs.

Also push a stat report in the following form containing the Node, evaluation time and timestamp the computation started.

static from_json(data)

De-serialize from the given json data.

static from_pickle(data)

De-serialize from the given pickle data.

property is_dirty

Whether any of the input Plug data has changed and is dirty.

list_repr()

List representation of the node showing inputs and their values.

Node
  [i] in: "A"
  [i] in_compound
   [i] in_compound.0: "B"
   [i] in_compound.1 << Node1.out
  [o] compound_out
   [o] in_compound.0: null
   [o] compound_out.1 >> Node2.in, Node3.in
  [o] out >> Node4.in
node_repr()

The node formated into a string looking like a node.

+--Node.graph.name--+
|     Node.Name     |
|-------------------|
% compound_in       |
o  compound_in-1    |
o  compound_in-2    |
o in                |
|               out o
|      compound_out %
|   compound_out-1  o
|   compound_out-2  o
+-------------------+
on_input_plug_set_dirty()

Propagate the dirty state to the connected downstream nodes.

property parents

Nodes connected directly to inputs of this Node.

post_deserialize(data)

Perform more data operations after initial serialization.

serialize()

Serialize the node to json.

Deprecated and kept for backwards compatibility.

static sort_plugs(plugs)

Sort the given plugs alphabetically into an OrderedDict.

to_json()

Serialize the node to json.

to_pickle()

Serialize the node into a pickle.

property upstream_nodes

Nodes connected directly or indirectly to inputs of this Node.

flowpipe.node.Node(*args, **kwargs)

Wrap the given function into a Node.

flowpipe.plug module

Plugs are ins and outs for Nodes through which they exchange data.

class flowpipe.plug.IPlug(name, node)

Bases: object

The interface for the plugs.

Plugs are associated with a Node and can be connected, disconnected and hold a value, that can be accesses by the associated Node.

abstract connect(plug)

Has to be implemented in the subclass.

disconnect(plug)

Break the connection to the given Plug.

property is_dirty

Access to the dirty status on this Plug.

promote_to_graph(name=None)

Add this plug to the graph of this plug’s node.

Parameters

name (str) – Optionally provide a different name for the Plug

property value

Access to the value on this Plug.

class flowpipe.plug.InputPlug(name, node, value=None)

Bases: IPlug

Receives data from an OutputPlug.

connect(plug)

Connect this Plug to the given OutputPlug.

Set both participating Plugs dirty.

serialize()

Serialize the Plug containing all it’s connections.

class flowpipe.plug.InputPlugGroup(name, graph, plugs=None)

Bases: object

Group plugs inside a group into one entry point on the graph.

connect(plug)

Connect all plugs in this group to the given plug.

disconnect(plug)

Disconnect all plugs in this group from the given plug.

property value

Getting the value of an InputPlugGroup is not supported.

The value property is implemented nonetheless, in order to allow for convenient setting of the value of all plugs in the InputPlugGroup.

class flowpipe.plug.OutputPlug(name, node)

Bases: IPlug

Provides data to an InputPlug.

connect(plug)

Connect this Plug to the given InputPlug.

Set both participating Plugs dirty.

serialize()

Serialize the Plug containing all it’s connections.

class flowpipe.plug.SubInputPlug(key, node, parent_plug, value=None)

Bases: SubPlug, InputPlug

Held by a parent input plug to form a compound plug.

serialize()

Serialize the Plug containing all it’s connections.

class flowpipe.plug.SubOutputPlug(key, node, parent_plug, value=None)

Bases: SubPlug, OutputPlug

Held by a parent output plug to form a compound plug.

serialize()

Serialize the Plug containing all it’s connections.

class flowpipe.plug.SubPlug

Bases: object

Mixin that unifies common properties of subplugs.

property is_dirty

Access to the dirty status on this Plug.

promote_to_graph(name=None)

Add this plug to the graph of this plug’s node.

NOTE: Subplugs can only be added to a graph via their parent plug.

Parameters

name (str) – Optionally provide a different name for the Plug

flowpipe.utilities module

Utilities for serializing and importing Nodes.

class flowpipe.utilities.NodeEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)

Bases: JSONEncoder

Custom JSONEncoder to handle non-json serializable node values.

If the value is not json serializable, a sha256 hash of its bytes is encoded instead.

default(o)

Encode the object, handling type errors by encoding into sha256.

flowpipe.utilities.deserialize_graph(data)

De-serialize from the given json data.

flowpipe.utilities.deserialize_node(data)

De-serialize a node from the given json data.

flowpipe.utilities.get_hash(obj, hash_func=<function <lambda>>)

Safely get the hash of an object.

This function tries to compute the hash as safely as possible, dealing with json data and strings in a well-defined manner.

Parameters
  • obj – The object to hash

  • hash_func (func(obj) -> str) – The hashing function to use

Returns

A hash of the obj

Return type

(str)

flowpipe.utilities.import_class(module, cls_name, file_location=None)

Import and return the given class from the given module.

File location can be given to import the class from a location that is not accessible through the PYTHONPATH. This works from python 2.6 to python 3.

Module contents

Flow-based programming with python.

Examples

Flowpipe for VFX Pipelines

Flowpipe was inspired by commonly experienced challenges in vfx/animation pipelines.

Re-usability

Flowpipe encourages the re-usability of code through encapsualation into nodes that do just one thing. Code example!

Code Design in a 1000+ ways

Usually a pipeline codebase tends to be organized in as many ways as the number of developers ever employed by the company. Every situation has a mutlitude of possible solutions and if there is no common framework and not enough structure every developer will pick whatever feels right to them. Flowpipe helps by providing this very simple framework. Developers will be able to understand each other’s code better and faster and can collaborate more easily.

Planning and Coordination

Thinking about the seolution to a problem in a graph-like fashion is a helpful approach in a lot of situations. Since flowpipe naturally supports this approach, the planning phase can oftentimes be mapped more or less directly to a flowpipe graph. This helps to reason about the implementation, not only with other developers but also with non-technical people!

Render Farm

Usually any code that has to run on a render farm is wrapped individually with some help from the farm API itself and some in-house functionality. This means that the render farm leaves an imprint everywhere in the code base. It also means that getting things to run on the farm is usually a tedious process requiring custom code every time. This is where flowpipe can really make a difference through the abstraction of logic into a graph which can then be translated into a farm job network in a unified way and thus avoiding all these issues. Please see the detailed explanation below and the code examples on vfx_render_farm_conversion.py!

Workflow Design Pattern

As the name suggests, this pattern wants to represent workflows. It is basically an extension of the ‘Command Pattern’ meant for more complex, long-running commands consisting of multiple sub-commands. Workflows also provide multiple ways of evaluation, usually local and remote (farm).

A workflow would be a common, pre-defined set of tasks frequently used in a pipeline, for example:

* prepare a delivery to the client
* publish geometry with a subsequent turntable rendering
* ingest data from vendors, including data cleanup and transformation

The Workflow builds a Graph and initializes it with user provided settings as well as data taken from other sources (database, filesystem).

Refer to the workflow_design_pattern.py! for an implementation example.

This can be a powerful approach, especially when used with the Farm Conversion.

+--------------------------+          +--------------------------------+          +----------------------+
|         Publish          |          |        CreateTurntable         |          |    UpdateDatabase    |
|--------------------------|          |--------------------------------|          |----------------------|
o source_file<"model.ma">  |     +--->o alembic_cache<>                |          o asset<"model">       |
|           published_file o-----+    o render_template<"template.>    |     +--->o images<>             |
+--------------------------+     |    |                      turntable o-----+    o status<"published>   |
                                 |    +--------------------------------+          |                asset o
                                 |    +----------------------------------+        +----------------------+
                                 |    |           SendMessage            |
                                 |    |----------------------------------|
                                 |    o recipients<>                     |
                                 |    o template<"Hello,\n\>             |
                                 |    % values                           |
                                 +--->o  values.path<>                   |
                                      o  values.recipients<["john@mai>   |
                                      o  values.sender<"sender">         |
                                      |                    return_status o
                                      +----------------------------------+

Farm Conversion

Since workflows rely on Flowpipe graphs they can be converted into a farm job of equal shape through this single entry point.

Every node is converted into a farm task. The flowpipe connections are used to determine the farm task dependencies. Each node gets serialized to json and stored in a “database” before submission. On the farm, the node gets deserialized from there, with any upstream data also taken from the json “database”. After evaluation, the node gets serialized back into the database, making the outputs available for the subsequent nodes.

There are three basic utilities required for this approach:

  1. Convert a Graph to an equivalent farm job

  2. Evaluate a Node on the farm

  3. Handling the data transferral between nodes on the farm

Any farm specific settings are stored in the metadata of the nodes and/or directly provided on job creation.

Refer to vfx_render_farm_conversion.py! for a pseudo-implementation of all the required parts and vfx_rendering.py! for an example of a complex graph. It also touches on more complex concepts like implicit and explicit batching.

Dynamic Plug Names

"""Showing a programming pattern that defines plug names at runtime.

In some applications it will be useful to re-use the same node definition for
different inputs/output - our working example will be to compute a face match.
To do so, we use an `EmbeddingNode` to compute features from both, an input
and a reference image, and then a `MatchNode` to compute whether the faces are
the same from these embeddings.

If the graph is to remain both, clean and explicit, it is advantageous to name
the plugs differently for the different `EmbeddingNode`.

To do so, accept the plug names as parameters to the nodes `__init__()` method.
You can then define the InputPlugs / OutputPlug with the given name. To access
the dynamically named plugs, your INode instance needs to store the plug names
as attributes, and the `compute()` method needs to allow for generic keyword
arguments.
"""

from flowpipe import Graph, INode, InputPlug, OutputPlug


def compute_embeddings(image):
    """A mock function for a call to a deep learning model or a web service."""
    del image  # this is just a mock and doesn't do anything with the input
    return 42


def compare_embeddings(image_emb, reference_emb, threshold=2):
    """A mock function for the appropriate comparison of embeddings."""
    return abs(image_emb - reference_emb) < threshold


class EmbeddingNode(INode):
    """The embedding node computes facial features from an image."""

    def __init__(self, input_name, output_name, **kwargs):
        """Set up a new EmbeddingNode with given names for plugs."""
        super().__init__(**kwargs)

        self.input_name = input_name  # Needed to access the value in compute
        InputPlug(input_name, self)

        self.output_name = output_name  # Needed to access the value in compute
        OutputPlug(output_name, self)

    # Accept generic keyword arguments, since the names of the inputs are
    # undefined until at runtime
    def compute(self, **kwargs):
        image = kwargs.pop(self.input_name)

        embedding = compute_embeddings(image)

        return {self.output_name: embedding}


class MatchNode(INode):
    """The match node compares two embeddings."""

    def __init__(self, threshold=2, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold

        InputPlug("image_emb", self)
        InputPlug("reference_emb", self)

        OutputPlug("facematch", self)

    def compute(self, image_emb, reference_emb):
        """Compare the embeddings."""
        match = compare_embeddings(image_emb, reference_emb, self.threshold)
        return {"facematch": match}


def get_facematch_graph(threshold):
    """Set up facematching e.g. with paramters taken from a config."""
    facematch_graph = Graph()

    # It is useful to define
    image_node = EmbeddingNode(
        input_name="image",
        output_name="image_emb",
        graph=facematch_graph,
        name="ImageEmbeddings",
    )

    reference_node = EmbeddingNode(
        input_name="reference",
        output_name="reference_emb",
        graph=facematch_graph,
        name="ReferenceEmbeddings",
    )

    match_node = MatchNode(threshold=threshold, graph=facematch_graph)

    image_node.outputs["image_emb"] >> match_node.inputs["image_emb"]
    (
        reference_node.outputs["reference_emb"]
        >> match_node.inputs["reference_emb"]
    )

    match_node.outputs["facematch"].promote_to_graph("result")

    return facematch_graph


if __name__ == "__main__":
    facematch = get_facematch_graph(1)

    image = "foo"  # load image from disk
    reference = "bar"  # load image from database
    facematch.evaluate(mode="threading")

    print(facematch)
    print("\n", facematch.outputs["result"].value)

House and Birthday

"""Two simple examples from the README file.

Build a house:

+------------------------+          +------------------------+          +---------------------------+
|      HireWorkers       |          |       Build Roof       |          |    Housewarming Party     |
|------------------------|          |------------------------|          |---------------------------|
o amount<4>              |          o section<"roof">        |          % attendees                 |
|                workers %          % workers                |     +--->o  attendees.0<>            |
|             workers.0  o-----+--->o  workers.0<>           |     |--->o  attendees.1<>            |
|             workers.1  o-----|--->o  workers.1<>           |     |--->o  attendees.2<>            |
|             workers.2  o-----|    |                workers %     |--->o  attendees.3<>            |
|             workers.3  o-----|    |             workers.0  o-----|    o  attendees.4<"Homeowner>  |
+------------------------+     |    |             workers.1  o-----|    +---------------------------+
                               |    +------------------------+     |
                               |    +------------------------+     |
                               |    |      Build Walls       |     |
                               |    |------------------------|     |
                               |    o section<"walls">       |     |
                               |    % workers                |     |
                               +--->o  workers.0<>           |     |
                               +--->o  workers.1<>           |     |
                                    |                workers %     |
                                    |             workers.0  o-----+
                                    |             workers.1  o-----+
                                    +------------------------+

Throw a birthday party:

+-------------------+          +---------------------+
|   InvitePeople    |          |   Birthday Party    |
|-------------------|          |---------------------|
o amount<4>         |     +--->o attendees<>         |
|            people o-----+    +---------------------+
+-------------------+

"""
from flowpipe import Graph, INode, InputPlug, Node, OutputPlug


class HireWorkers(INode):
    """A node can be derived from the INode interface.

    The plugs are defined in the init method.
    The compute method received the inputs from any connected upstream nodes.
    """

    def __init__(self, amount=None, **kwargs):
        super(HireWorkers, self).__init__(**kwargs)
        InputPlug("amount", self, amount)
        OutputPlug("workers", self)

    def compute(self, amount):
        workers = ["John", "Jane", "Mike", "Michelle"]
        print("{0} workers are hired to build the house.".format(amount))
        return {"workers.{0}".format(i): workers[i] for i in range(amount)}


@Node(outputs=["workers"])
def Build(workers, section):
    """A node can also be created by the Node decorator.outputs

    The inputs to the function are turned into InputsPlugs, outputs are defined
    in the decorator itself.
    The wrapped function is used as the compute method.
    """
    print(
        "{0} are building the {1}".format(", ".join(workers.values()), section)
    )
    return {"workers.{0}".format(i): worker for i, worker in workers.items()}


@Node()
def Party(attendees):
    """Nodes do not necessarily need to have output or input plugs."""
    print(
        "{0} and {1} are having a great party!".format(
            ", ".join(list(attendees.values())[:-1]),
            list(attendees.values())[-1],
        )
    )


graph = Graph(name="Build a House")
workers = HireWorkers(graph=graph, amount=4)
build_walls = Build(graph=graph, name="Build Walls", section="walls")
build_roof = Build(graph=graph, name="Build Roof", section="roof")
party = Party(graph=graph, name="Housewarming Party")

# Nodes are connected via their input/output plugs.
workers.outputs["workers"]["0"].connect(build_walls.inputs["workers"]["0"])
workers.outputs["workers"]["1"].connect(build_walls.inputs["workers"]["1"])
workers.outputs["workers"]["2"].connect(build_roof.inputs["workers"]["0"])
workers.outputs["workers"]["3"].connect(build_roof.inputs["workers"]["1"])

# Connecting nodes can be done via the bit shift operator as well
build_walls.outputs["workers"]["0"] >> party.inputs["attendees"]["0"]
build_walls.outputs["workers"]["1"] >> party.inputs["attendees"]["2"]
build_roof.outputs["workers"]["0"] >> party.inputs["attendees"]["1"]
build_roof.outputs["workers"]["1"] >> party.inputs["attendees"]["3"]

# Initial values can be set onto the input plugs for initialization
party.inputs["attendees"]["4"].value = "Homeowner"


print("---------------------------------------")
print(graph.name)
print(graph)
print(graph.list_repr())
print("---------------------------------------")
graph.evaluate()
print("---------------------------------------")


graph = Graph(name="Celebrate a Birthday Party")


@Node(outputs=["people"])
def InvitePeople(amount):
    people = ["John", "Jane", "Mike", "Michelle"]
    d = {"people.{0}".format(i): people[i] for i in range(amount)}
    d["people"] = {people[i]: people[i] for i in range(amount)}
    return d


invite = InvitePeople(graph=graph, amount=4)
birthday_party = Party(graph=graph, name="Birthday Party")
invite.outputs["people"] >> birthday_party.inputs["attendees"]


print("---------------------------------------")
print(graph.name)
print(graph)
print("---------------------------------------")
graph.evaluate()
print("---------------------------------------")

Nested Graphs

"""Nested graphs are supported in flowpipe."""
from flowpipe import Graph, Node


@Node(outputs=["file"])
def MyNode(file):
    # Something is done in here ...
    return {"file": file}


# A graph that fixes an incoming file, cleaning up messy names etc.
#
# +-----------------------+          +-------------------------+
# |   Cleanup Filename    |          |   Change Lineendings    |
# |-----------------------|          |-------------------------|
# o file<>                |     +--->o file<>                  |
# |                  file o-----+    |                    file o
# +-----------------------+          +-------------------------+
fix_file = Graph(name="fix_file")
cleanup_filename = MyNode(name="Cleanup Filename", graph=fix_file)
change_lineendings = MyNode(name="Change Lineendings", graph=fix_file)
cleanup_filename.outputs["file"].connect(change_lineendings.inputs["file"])


# A second graph reads finds files, and extracts their contents into a database
# +----------------+          +----------------------------+          +----------------+
# |   Find File    |          |   Read Values from File    |          |   Update DB    |
# |----------------|          |----------------------------|          |----------------|
# o file<>         |     +--->o file<>                     |     +--->o file<>         |
# |           file o-----+    |                       file o-----+    |           file o
# +----------------+          +----------------------------+          +----------------+
udpate_db_from_file = Graph(name="udpate_db_from_file")
find_file = MyNode(name="Find File", graph=udpate_db_from_file)
values_from_file = MyNode(
    name="Read Values from File", graph=udpate_db_from_file
)
update_db = MyNode(name="Update DB", graph=udpate_db_from_file)
find_file.outputs["file"].connect(values_from_file.inputs["file"])
values_from_file.outputs["file"].connect(update_db.inputs["file"])


# The second graph however relies on clean input files so the first graph can
# be used within the second "udpate db" graph.
# For this purpose, graphs can promote input and output plugs from their nodes
# to the graph level, making other graphs aware of them:
fix_file["Cleanup Filename"].inputs["file"].promote_to_graph(
    name="file_to_clean"
)
fix_file["Change Lineendings"].outputs["file"].promote_to_graph(
    name="clean_file"
)

# Now the update_db graph can connect nodes to the fix_file graph
find_file.outputs["file"].connect(fix_file.inputs["file_to_clean"])
fix_file.outputs["clean_file"].connect(
    udpate_db_from_file["Read Values from File"].inputs["file"]
)


# The result now looks like this:
#
# +---udpate_db_from_file----+          +-------fix_file--------+          +--------fix_file---------+          +----udpate_db_from_file-----+          +---udpate_db_from_file----+
# |        Find File         |          |   Cleanup Filename    |          |   Change Lineendings    |          |   Read Values from File    |          |        Update DB         |
# |--------------------------|          |-----------------------|          |-------------------------|          |----------------------------|          |--------------------------|
# o file<>                   |     +--->o file<>                |     +--->o file<>                  |     +--->o file<>                     |     +--->o file<>                   |
# |                     file o-----+    |                  file o-----+    |                    file o-----+    |                       file o-----+    |                     file o
# +--------------------------+          +-----------------------+          +-------------------------+          +----------------------------+          +--------------------------+
print(fix_file)


# Subgraphs can be accessed by their name from any participating graph
assert udpate_db_from_file.subgraphs["fix_file"] is fix_file
assert fix_file.subgraphs["udpate_db_from_file"] is udpate_db_from_file

VFX Renderfarm Conversion

"""Demonstrating how to convert a flowpipe graph to a render farm job.

This guide expects that your render farm can handle dependencies between tasks.
"""
import json
import logging
import os
from tempfile import gettempdir

from flowpipe import Graph, INode, Node

# -----------------------------------------------------------------------------
#
# Necessary utilities
#
# -----------------------------------------------------------------------------


class JsonDatabase:
    """The Database stores the JSON-serialized nodes.

    The storage can also be handled via a database, this is just the easiest
    way for demonstrational purposes. In production, a file based storage also
    has advantages for debugging and allows for easy hacking by just altering
    the JSON files directly.
    """

    PATH = os.path.join(gettempdir(), "json-database", "{identifier}.json")

    @staticmethod
    def set(node):
        """Store the node under it's identifier."""
        serialized_json = JsonDatabase.PATH.format(identifier=node.identifier)
        if not os.path.exists(os.path.dirname(serialized_json)):
            os.makedirs(os.path.dirname(serialized_json))
        with open(serialized_json, "w") as f:
            json.dump(node.serialize(), f, indent=2)
        return serialized_json

    @staticmethod
    def get(identifier):
        """Retrieve the node behind the given identifier."""
        serialized_json = JsonDatabase.PATH.format(identifier=identifier)
        with open(serialized_json, "r") as f:
            data = json.load(f)
        return INode.deserialize(data)


# Command templates to execute a flowpipe node in the terminal.
# Uses different python interpreters and commands based on the host application
# The template just needs the path to the serialized json file and optionally
# a range of frames passed to the node for the implicit batch conversion.
COMMANDS = {
    "python": (
        "python -c '"
        "from my_farm import conversion;"
        'conversion.evaluate_on_farm("{serialized_json}", {frames})\''
    ),
    "maya": (
        "mayapy -c '"
        "import maya.standalone;"
        'maya.standalone.initialize(name="python");'
        "from my_farm import conversion;"
        'conversion.evaluate_on_farm("{serialized_json}", {frames})\''
    ),
}


def convert_graph_to_job(graph):
    """Convert the graph to a dict representing a typical render farm job."""
    job = {"name": graph.name, "tasks": []}

    # Turn every node into a farm task
    tasks = {}
    for node in graph.nodes:
        serialized_json = JsonDatabase.set(node)

        tasks[node.name] = []

        # IMPLICIT BATCHING:
        # Create individual tasks for each batch if the batch size is defined
        # Feed the calculated frame range to each batch
        if node.metadata.get("batch_size") is not None:
            batch_size = node.metadata["batch_size"]
            frames = node.inputs["frames"].value
            i = 0
            while i < len(frames) - 1:
                end = i + batch_size
                if end > len(frames) - 1:
                    end = len(frames)
                f = frames[i:end]

                task = {"name": "{0}-{1}".format(node.name, i / batch_size)}
                command = COMMANDS.get(
                    node.metadata.get("interpreter", "python"), None
                )
                task["command"] = command.format(
                    serialized_json=serialized_json, frames=f
                )
                job["tasks"].append(task)

                tasks[node.name].append(task)

                i += batch_size
        else:
            task = {"name": node.name}
            command = COMMANDS.get(
                node.metadata.get("interpreter", "python"), None
            )
            task["command"] = command.format(
                serialized_json=serialized_json, frames=None
            )
            job["tasks"].append(task)

            tasks[node.name].append(task)

    # The dependencies between the tasks based on the connections of the Nodes
    for node_name in tasks:
        for task in tasks[node_name]:
            node = graph[node_name]
            task["dependencies"] = []
            for upstream in [n.name for n in node.upstream_nodes]:
                task["dependencies"] += [t["name"] for t in tasks[upstream]]

    return job


def evaluate_on_farm(serialized_json, frames=None):
    """Evaluate the node behind the given json file.

    1. Deserialize the node
    2. Collect any input values from any upstream dependencies
        For implicit batching, the given frames are assigned to the node,
        overriding whatever might be stored in the json file, becuase all
        batches share the same json file.
    3. Evaluate the node
    4. Serialize the node back into its original file
        For implicit farm conversion, the serialization only happens once,
        for the 'last' batch, knowing that the last batch in numbers might
        not be the 'last' batch actually executed.
    """
    # Debug logs might be useful on the farm
    logging.baseConfig.setLevel(logging.DEBUG)

    # Deserialize the node from the serialized json
    with open(serialized_json, "r") as f:
        data = json.load(f)
    node = INode.deserialize(data)

    # Retrieve the upstream output data
    for name, input_plug in data["inputs"].items():
        for identifier, output_plug in input_plug["connections"].items():
            upstream_node = JsonDatabase.get(identifier)
            node.inputs[name].value = upstream_node.outputs[output_plug].value

    # Specifically assign the batch frames here if applicable
    if frames is not None:
        all_frames = node.inputs["frames"]
        node.inputs["frames"] = frames

    # Actually evalute the node
    node.evaluate()

    # Store the result back into the same file ONLY once
    # ALL batch processes access the same json file so the result is only stored
    # for the last batch, knowing that the last batch in numbers might not be
    # the last batch actually executed
    if frames is not None and frames[-1] != all_frames[-1]:
        return

    with open(serialized_json, "w") as f:
        json.dump(node.serialize(), f, indent=2)


# -----------------------------------------------------------------------------
#
# Examples
#
# -----------------------------------------------------------------------------


@Node(outputs=["renderings"], metadata={"interpreter": "maya"})
def MayaRender(frames, scene_file):
    """Render the given frames from the given scene.."""
    return {"renderings": "/renderings/file.%04d.exr"}


@Node(outputs=["status"])
def UpdateDatabase(id_, images):
    """Update the database entries of the given asset with the given data."""
    return {"status": True}


def implicit_batching(frames, batch_size):
    """Batches are created during the farm conversion."""
    graph = Graph(name="Rendering")
    render = MayaRender(
        graph=graph,
        frames=list(range(frames)),
        scene_file="/scene/for/rendering.ma",
        metadata={"batch_size": batch_size},
    )
    update = UpdateDatabase(graph=graph, id_=123456)
    render.outputs["renderings"].connect(update.inputs["images"])

    print(graph)
    print(json.dumps(convert_graph_to_job(graph), indent=2))


def explicit_batching(frames, batch_size):
    """Batches are already part of the graph."""
    graph = Graph(name="Rendering")
    update_database = UpdateDatabase(graph=graph, id_=123456)
    for i in range(0, frames, batch_size):
        maya_render = MayaRender(
            name="MayaRender{0}-{1}".format(i, i + batch_size),
            graph=graph,
            frames=list(range(i, i + batch_size)),
            scene_file="/scene/for/rendering.ma",
        )
        maya_render.outputs["renderings"].connect(
            update_database.inputs["images"][str(i)]
        )

    print(graph)
    print(json.dumps(convert_graph_to_job(graph), indent=2))


if __name__ == "__main__":
    implicit_batching(30, 10)
    explicit_batching(30, 10)

VFX Rendering

"""Demo a complex workflow of a rendering with a series of subsequent actions:

- Render a CG render out of Maya
- Check the resulting images for potential defects caused by potential server glitches
- Register the CG render in the database
- Create and render a slap comp
- Convert the rendered slapcomp to a quicktime

+---------------------------+          +----------------------+           +-------------------------+          +-----------------------+          +-----------------------+
|      MayaRender0-10       |          |   CheckImages0-10    |           |     CreateSlapComp      |          |    NukeRender0-10     |          |       Quicktime       |
|---------------------------|          |----------------------|           |-------------------------|          |-----------------------|          |-----------------------|
o frames<[0, 1, 2, >        |     +--->o images<>             |           % images                  |          o frames<[0, 1, 2, >    |          % images                |
o scene_file<"/scene/fo>    |     |    |               images o---------->o  images.0<>             |     +--->o scene_file<>          |     +--->o  images.0<>           |
|                renderings o-----+    +----------------------+      |--->o  images.10<>            |   --+    |            renderings o-----+--->o  images.10<>          |
+---------------------------+          +-----------------------+     |--->o  images.20<>            |     |    +-----------------------+     |--->o  images.20<>          |
+---------------------------+          |   CheckImages10-20    |     |    o template<"nuke_temp>    |     |    +-----------------------+     |    |             quicktime o
|      MayaRender10-20      |          |-----------------------|     |    |                slapcomp o---  |    |    NukeRender10-20    |     |    +-----------------------+
|---------------------------|     +--->o images<>              |     |    +-------------------------+     |    |-----------------------|     |
o frames<[10, 11, 1>        |     |    |                images o-----|    +-----------------------+       |    o frames<[10, 11, 1>    |     |
o scene_file<"/scene/fo>    |     |    +-----------------------+     |    |    UpdateDatabase     |       +--->o scene_file<>          |     |
|                renderings o-----+    +-----------------------+     |    |-----------------------|       |    |            renderings o-----+
+---------------------------+          |   CheckImages20-30    |     |    o id_<123456>           |       |    +-----------------------+     |
+---------------------------+          |-----------------------|     |    % images                |       |    +-----------------------+     |
|      MayaRender20-30      |     +--->o images<>              |     +--->o  images.0<>           |       |    |    NukeRender20-30    |     |
|---------------------------|     |    |                images o-----+--->o  images.10<>          |       |    |-----------------------|     |
o frames<[20, 21, 2>        |     |    +-----------------------+     +--->o  images.20<>          |       |    o frames<[20, 21, 2>    |     |
o scene_file<"/scene/fo>    |     |                                       |                status o       +--->o scene_file<>          |     |
|                renderings o-----+                                       +-----------------------+            |            renderings o-----+
+---------------------------+                                                                                  +-----------------------+
"""

from flowpipe import Graph, Node


@Node(outputs=["renderings"], metadata={"interpreter": "maya"})
def MayaRender(frames, scene_file):
    return {"renderings": "/renderings/file.%04d.exr"}


@Node(outputs=["images"])
def CheckImages(images):
    return {"images": images}


@Node(outputs=["slapcomp"])
def CreateSlapComp(images, template):
    return {"slapcomp": "slapcomp.nk"}


@Node(outputs=["renderings"], metadata={"interpreter": "nuke"})
def NukeRender(frames, scene_file):
    return {"renderings": "/renderings/file.%04d.exr"}


@Node(outputs=["quicktime"])
def Quicktime(images):
    return {"quicktime": "resulting.mov"}


@Node(outputs=["status"])
def UpdateDatabase(id_, images):
    """Update the database entries of the given asset with the given data."""
    return {"status": True}


def complex_cg_render(frames, batch_size):
    graph = Graph(name="Rendering")

    slapcomp = CreateSlapComp(graph=graph, template="nuke_template.nk")
    update_database = UpdateDatabase(graph=graph, id_=123456)

    for i in range(0, frames, batch_size):
        maya_render = MayaRender(
            name="MayaRender{0}-{1}".format(i, i + batch_size),
            graph=graph,
            frames=range(i, i + batch_size),
            scene_file="/scene/for/rendering.ma",
        )
        check_images = CheckImages(
            name="CheckImages{0}-{1}".format(i, i + batch_size), graph=graph
        )
        maya_render.outputs["renderings"].connect(
            check_images.inputs["images"]
        )
        check_images.outputs["images"].connect(
            slapcomp.inputs["images"][str(i)]
        )
        check_images.outputs["images"].connect(
            update_database.inputs["images"][str(i)]
        )

    quicktime = Quicktime()

    for i in range(0, frames, batch_size):
        nuke_render = NukeRender(
            name="NukeRender{0}-{1}".format(i, i + batch_size),
            graph=graph,
            frames=range(i, i + batch_size),
        )
        slapcomp.outputs["slapcomp"].connect(nuke_render.inputs["scene_file"])
        nuke_render.outputs["renderings"].connect(
            quicktime.inputs["images"][str(i)]
        )

    print(graph)


if __name__ == "__main__":
    complex_cg_render(30, 10)

Workflow Design Pattern

"""Demonstration of the Workflow Design Pattern.

As the name suggests, this pattern wants to represent workflows.
It is basically an extension of the 'Command Pattern' meant for more complex,
long-running commands consisting of multiple sub-commands. Workflows also
provide multiple ways of evaluation, usually local and remote.

A workflow would be a common, pre-defined set of tasks frequently used in a
pipeline, for example:
    - prepare a delivery to the client
    - publish geometry with a subsequent turntable rendering
    - ingest data from vendors, including data cleanup and transformation

The Workflow builds a Graph and initializes it with user provided settings as
well as data taken from other sources (database, filesystem).
"""
import getpass

from flowpipe import Graph, Node


class Workflow(object):
    """Abstract base class defining a workflow, based on a flowpipe graph.

    The Workflow holds a graph and provides two ways to evaluate the graph,
    locally and remotely.
    """

    def __init__(self):
        self.graph = Graph()

    def evaluate_locally(self):
        """Evaluate the graph locally."""
        self.graph.evaluate()

    def evaluate_remotely(self):
        """See examples/vfx_render_farm_conversion.py on how to implement a
        conversion from flowpipe graphs to your render farm.
        """
        pass


class PublishWorkflow(Workflow):
    """Publish a model and add a turntable render of it to the database."""

    def __init__(self, source_file):
        super(PublishWorkflow, self).__init__()
        publish = Publish(graph=self.graph)
        message = SendMessage(graph=self.graph)
        turntable = CreateTurntable(graph=self.graph)
        update_database = UpdateDatabase(graph=self.graph)
        publish.outputs["published_file"].connect(
            turntable.inputs["alembic_cache"]
        )
        publish.outputs["published_file"].connect(
            message.inputs["values"]["path"]
        )
        turntable.outputs["turntable"].connect(
            update_database.inputs["images"]
        )

        # Initialize the graph from user input
        publish.inputs["source_file"].value = source_file

        # Initialize the graph through pipeline logic
        # These things can also be done in the nodes themselves of course,
        # it's a design choice and depends on the case
        message.inputs["template"].value = (
            "Hello,\n\n"
            "The following file has been published: {path}\n\n"
            "Thank you,\n\n"
            "{sender}"
        )
        message.inputs["values"]["sender"].value = getpass.getuser()
        message.inputs["values"]["recipients"].value = [
            "john@mail.com",
            "jane@mail.com",
        ]
        turntable.inputs["render_template"].value = "template.ma"
        update_database.inputs["asset"].value = source_file.split(".")[0]
        update_database.inputs["status"].value = "published"


# -----------------------------------------------------------------------------
#
# The Nodes used in the Graph
#
# -----------------------------------------------------------------------------


@Node(outputs=["published_file"])
def Publish(source_file):
    """Publish the given source file."""
    return {"published_file": "/published/file.abc"}


@Node(outputs=["return_status"])
def SendMessage(template, values, recipients):
    """Send message to given recipients."""
    print("--------------------------------------")
    print(template.format(**values))
    print("--------------------------------------")
    return {"return_status": 0}


@Node(outputs=["turntable"])
def CreateTurntable(alembic_cache, render_template):
    """Load the given cache into the given template file and render."""
    return {"turntable": "/turntable/turntable.%04d.jpg"}


@Node(outputs=["asset"])
def UpdateDatabase(asset, images, status):
    """Update the database entries of the given asset with the given data."""
    return {"asset": asset}


if __name__ == "__main__":
    workflow = PublishWorkflow("model.ma")
    print(workflow.graph)
    workflow.evaluate_locally()

World Clock

"""Demonstrating the basic capabilities of flowpipe.

A graph implementation of a world clock for demonstrational purposes:

+------------------+          +---------------------+          +----------------------+
|   CurrentTime    |          |       London        |          |      WorldClock      |
|------------------|          |---------------------|          |----------------------|
|             time o-----+--->o time<>              |          % times                |
+------------------+     |    o timezone<0>         |     +--->o  times.London<>      |
                         |    |      converted_time o-----+--->o  times.Munich<>      |
                         |    +---------------------+     +--->o  times.Vancouver<>   |
                         |    +---------------------+     |    +----------------------+
                         |    |       Munich        |     |
                         |    |---------------------|     |
                         |--->o time<>              |     |
                         |    o timezone<1>         |     |
                         |    |      converted_time o-----|
                         |    +---------------------+     |
                         |    +---------------------+     |
                         |    |      Vancouver      |     |
                         |    |---------------------|     |
                         +--->o time<>              |     |
                              o timezone<-8>        |     |
                              |      converted_time o-----+
                              +---------------------+
"""
from datetime import datetime
from time import time

from flowpipe import Graph, INode, InputPlug, Node, OutputPlug


@Node(outputs=["time"])
def CurrentTime():
    """The @Node decorator turns the wrapped function into a Node object.

    Any arguments to the function are used as input plugs to the Node.
    The outputs are defined in the decorator explicitely.
    """
    return {"time": time()}


class ConvertTime(INode):
    """A node can be derived from the INode interface.

    The plugs are defined in the init method.
    The compute method received the inputs from any connected upstream nodes.
    """

    def __init__(self, time=None, timezone=0, **kwargs):
        super(ConvertTime, self).__init__(**kwargs)
        InputPlug("time", self)
        InputPlug("timezone", self, timezone)
        OutputPlug("converted_time", self)

    def compute(self, time, timezone):
        return {"converted_time": time + timezone * 60 * 60}


@Node()
def ShowTimes(times):
    """Nodes do not necessarily have to define output and input plugs."""
    print("-- World Clock -------------------")
    for location, t in times.items():
        print(
            "It is now: {time:%H:%M} in {location}".format(
                time=datetime.fromtimestamp(t), location=location
            )
        )
    print("----------------------------------")


# The Graph holds the nodes
graph = Graph(name="World Clock")
current_time = CurrentTime(graph=graph)
van = ConvertTime(name="Vancouver", timezone=-8, graph=graph)
ldn = ConvertTime(name="London", timezone=0, graph=graph)
muc = ConvertTime(name="Munich", timezone=1, graph=graph)
world_clock = ShowTimes(graph=graph)

# Connecting nodes can be done via the bit shift operator as well
current_time.outputs["time"].connect(van.inputs["time"])
current_time.outputs["time"].connect(ldn.inputs["time"])
current_time.outputs["time"].connect(muc.inputs["time"])
van.outputs["converted_time"] >> world_clock.inputs["times"]["Vancouver"]
ldn.outputs["converted_time"] >> world_clock.inputs["times"]["London"]
muc.outputs["converted_time"] >> world_clock.inputs["times"]["Munich"]

# Display the graph
print(graph)

graph.evaluate()

Indices and tables