"""Demonstration of the Workflow Design Pattern.
As the name suggests, this pattern wants to represent workflows.
It is basically an extension of the 'Command Pattern' meant for more complex,
long-running commands consisting of multiple sub-commands. Workflows also
provide multiple ways of evaluation, usually local and remote.
A workflow would be a common, pre-defined set of tasks frequently used in a
pipeline, for example:
- prepare a delivery to the client
- publish geometry with a subsequent turntable rendering
- ingest data from vendors, including data cleanup and transformation
The Workflow builds a Graph and initializes it with user provided settings as
well as data taken from other sources (database, filesystem).
"""
import getpass
from flowpipe import Graph, Node
class Workflow(object):
"""Abstract base class defining a workflow, based on a flowpipe graph.
The Workflow holds a graph and provides two ways to evaluate the graph,
locally and remotely.
"""
def __init__(self):
self.graph = Graph()
def evaluate_locally(self):
"""Evaluate the graph locally."""
self.graph.evaluate()
def evaluate_remotely(self):
"""See examples/vfx_render_farm_conversion.py on how to implement a
conversion from flowpipe graphs to your render farm.
"""
pass
class PublishWorkflow(Workflow):
"""Publish a model and add a turntable render of it to the database."""
def __init__(self, source_file):
super(PublishWorkflow, self).__init__()
publish = Publish(graph=self.graph)
message = SendMessage(graph=self.graph)
turntable = CreateTurntable(graph=self.graph)
update_database = UpdateDatabase(graph=self.graph)
publish.outputs["published_file"].connect(
turntable.inputs["alembic_cache"]
)
publish.outputs["published_file"].connect(
message.inputs["values"]["path"]
)
turntable.outputs["turntable"].connect(
update_database.inputs["images"]
)
# Initialize the graph from user input
publish.inputs["source_file"].value = source_file
# Initialize the graph through pipeline logic
# These things can also be done in the nodes themselves of course,
# it's a design choice and depends on the case
message.inputs["template"].value = (
"Hello,\n\n"
"The following file has been published: {path}\n\n"
"Thank you,\n\n"
"{sender}"
)
message.inputs["values"]["sender"].value = getpass.getuser()
message.inputs["values"]["recipients"].value = [
"john@mail.com",
"jane@mail.com",
]
turntable.inputs["render_template"].value = "template.ma"
update_database.inputs["asset"].value = source_file.split(".")[0]
update_database.inputs["status"].value = "published"
# -----------------------------------------------------------------------------
#
# The Nodes used in the Graph
#
# -----------------------------------------------------------------------------
@Node(outputs=["published_file"])
def Publish(source_file):
"""Publish the given source file."""
return {"published_file": "/published/file.abc"}
@Node(outputs=["return_status"])
def SendMessage(template, values, recipients):
"""Send message to given recipients."""
print("--------------------------------------")
print(template.format(**values))
print("--------------------------------------")
return {"return_status": 0}
@Node(outputs=["turntable"])
def CreateTurntable(alembic_cache, render_template):
"""Load the given cache into the given template file and render."""
return {"turntable": "/turntable/turntable.%04d.jpg"}
@Node(outputs=["asset"])
def UpdateDatabase(asset, images, status):
"""Update the database entries of the given asset with the given data."""
return {"asset": asset}
if __name__ == "__main__":
workflow = PublishWorkflow("model.ma")
print(workflow.graph)
workflow.evaluate_locally()