"""Demonstrating how to convert a flowpipe graph to a render farm job.
This guide expects that your render farm can handle dependencies between tasks.
"""
import json
import logging
import os
from tempfile import gettempdir
from flowpipe import Graph, INode, Node
# -----------------------------------------------------------------------------
#
# Necessary utilities
#
# -----------------------------------------------------------------------------
class JsonDatabase:
"""The Database stores the JSON-serialized nodes.
The storage can also be handled via a database, this is just the easiest
way for demonstrational purposes. In production, a file based storage also
has advantages for debugging and allows for easy hacking by just altering
the JSON files directly.
"""
PATH = os.path.join(gettempdir(), "json-database", "{identifier}.json")
@staticmethod
def set(node):
"""Store the node under it's identifier."""
serialized_json = JsonDatabase.PATH.format(identifier=node.identifier)
if not os.path.exists(os.path.dirname(serialized_json)):
os.makedirs(os.path.dirname(serialized_json))
with open(serialized_json, "w") as f:
json.dump(node.serialize(), f, indent=2)
return serialized_json
@staticmethod
def get(identifier):
"""Retrieve the node behind the given identifier."""
serialized_json = JsonDatabase.PATH.format(identifier=identifier)
with open(serialized_json, "r") as f:
data = json.load(f)
return INode.deserialize(data)
# Command templates to execute a flowpipe node in the terminal.
# Uses different python interpreters and commands based on the host application
# The template just needs the path to the serialized json file and optionally
# a range of frames passed to the node for the implicit batch conversion.
COMMANDS = {
"python": (
"python -c '"
"from my_farm import conversion;"
'conversion.evaluate_on_farm("{serialized_json}", {frames})\''
),
"maya": (
"mayapy -c '"
"import maya.standalone;"
'maya.standalone.initialize(name="python");'
"from my_farm import conversion;"
'conversion.evaluate_on_farm("{serialized_json}", {frames})\''
),
}
def convert_graph_to_job(graph):
"""Convert the graph to a dict representing a typical render farm job."""
job = {"name": graph.name, "tasks": []}
# Turn every node into a farm task
tasks = {}
for node in graph.nodes:
serialized_json = JsonDatabase.set(node)
tasks[node.name] = []
# IMPLICIT BATCHING:
# Create individual tasks for each batch if the batch size is defined
# Feed the calculated frame range to each batch
if node.metadata.get("batch_size") is not None:
batch_size = node.metadata["batch_size"]
frames = node.inputs["frames"].value
i = 0
while i < len(frames) - 1:
end = i + batch_size
if end > len(frames) - 1:
end = len(frames)
f = frames[i:end]
task = {"name": "{0}-{1}".format(node.name, i / batch_size)}
command = COMMANDS.get(
node.metadata.get("interpreter", "python"), None
)
task["command"] = command.format(
serialized_json=serialized_json, frames=f
)
job["tasks"].append(task)
tasks[node.name].append(task)
i += batch_size
else:
task = {"name": node.name}
command = COMMANDS.get(
node.metadata.get("interpreter", "python"), None
)
task["command"] = command.format(
serialized_json=serialized_json, frames=None
)
job["tasks"].append(task)
tasks[node.name].append(task)
# The dependencies between the tasks based on the connections of the Nodes
for node_name in tasks:
for task in tasks[node_name]:
node = graph[node_name]
task["dependencies"] = []
for upstream in [n.name for n in node.upstream_nodes]:
task["dependencies"] += [t["name"] for t in tasks[upstream]]
return job
def evaluate_on_farm(serialized_json, frames=None):
"""Evaluate the node behind the given json file.
1. Deserialize the node
2. Collect any input values from any upstream dependencies
For implicit batching, the given frames are assigned to the node,
overriding whatever might be stored in the json file, becuase all
batches share the same json file.
3. Evaluate the node
4. Serialize the node back into its original file
For implicit farm conversion, the serialization only happens once,
for the 'last' batch, knowing that the last batch in numbers might
not be the 'last' batch actually executed.
"""
# Debug logs might be useful on the farm
logging.baseConfig.setLevel(logging.DEBUG)
# Deserialize the node from the serialized json
with open(serialized_json, "r") as f:
data = json.load(f)
node = INode.deserialize(data)
# Retrieve the upstream output data
for name, input_plug in data["inputs"].items():
for identifier, output_plug in input_plug["connections"].items():
upstream_node = JsonDatabase.get(identifier)
node.inputs[name].value = upstream_node.outputs[output_plug].value
# Specifically assign the batch frames here if applicable
if frames is not None:
all_frames = node.inputs["frames"]
node.inputs["frames"] = frames
# Actually evalute the node
node.evaluate()
# Store the result back into the same file ONLY once
# ALL batch processes access the same json file so the result is only stored
# for the last batch, knowing that the last batch in numbers might not be
# the last batch actually executed
if frames is not None and frames[-1] != all_frames[-1]:
return
with open(serialized_json, "w") as f:
json.dump(node.serialize(), f, indent=2)
# -----------------------------------------------------------------------------
#
# Examples
#
# -----------------------------------------------------------------------------
@Node(outputs=["renderings"], metadata={"interpreter": "maya"})
def MayaRender(frames, scene_file):
"""Render the given frames from the given scene.."""
return {"renderings": "/renderings/file.%04d.exr"}
@Node(outputs=["status"])
def UpdateDatabase(id_, images):
"""Update the database entries of the given asset with the given data."""
return {"status": True}
def implicit_batching(frames, batch_size):
"""Batches are created during the farm conversion."""
graph = Graph(name="Rendering")
render = MayaRender(
graph=graph,
frames=list(range(frames)),
scene_file="/scene/for/rendering.ma",
metadata={"batch_size": batch_size},
)
update = UpdateDatabase(graph=graph, id_=123456)
render.outputs["renderings"].connect(update.inputs["images"])
print(graph)
print(json.dumps(convert_graph_to_job(graph), indent=2))
def explicit_batching(frames, batch_size):
"""Batches are already part of the graph."""
graph = Graph(name="Rendering")
update_database = UpdateDatabase(graph=graph, id_=123456)
for i in range(0, frames, batch_size):
maya_render = MayaRender(
name="MayaRender{0}-{1}".format(i, i + batch_size),
graph=graph,
frames=list(range(i, i + batch_size)),
scene_file="/scene/for/rendering.ma",
)
maya_render.outputs["renderings"].connect(
update_database.inputs["images"][str(i)]
)
print(graph)
print(json.dumps(convert_graph_to_job(graph), indent=2))
if __name__ == "__main__":
implicit_batching(30, 10)
explicit_batching(30, 10)