Workflows, Flows & Nodes
The three core SDK abstractions — Workflow, Flow, and Node — how the DAG is wired with depends_on() and .inputs(), and the node execution lifecycle.
Everything you build with the SDK is made of three layers: a Node does one unit of work, a Flow is the DAG of nodes, and a Workflow is the container you actually run. Understanding how data moves between them is most of what you need to be productive.
The three layers
| Class | Import | Role |
|---|---|---|
Workflow | from dynamiq import Workflow | Top-level runnable. Owns a Flow, an id, a name, and an optional version; fires workflow-level callbacks and handles YAML serialization (from_yaml_file / to_yaml_file). |
Flow | from dynamiq.flows import Flow | The DAG. Holds nodes, topologically sorts them by dependencies, and executes independent nodes in parallel on a thread executor (max_node_workers caps concurrency). Also owns the ConnectionManager. |
Node | dynamiq.nodes.* | One unit of execution — an LLM call, an agent, a tool, a retriever. Configured with a connection, error_handling, caching, streaming, transformers, and depends. |
Workflow() creates an empty Flow for you; wf.flow.add_nodes(node) and Workflow(flow=Flow(nodes=[...])) are equivalent ways to populate it.
from dynamiq import Workflow
from dynamiq.flows import Flow
wf = Workflow(flow=Flow(nodes=[node_a, node_b]))
# or
wf = Workflow()
wf.flow.add_nodes(node_a)
wf.flow.add_nodes(node_b)Every node also implements the same run() interface as the workflow, so any node can be executed standalone — handy for testing a single step.
Wiring the DAG
Two chainable methods declare the graph:
node.depends_on(other)— execution order. Accepts a single node or a list; the flow won't start this node until all dependencies have completed. Cycles are rejected at flow construction with aCycleError.node.inputs(key=value)— data mapping. Sets what each input field receives at runtime.
.inputs() values can be three things:
- A reference to another node's output —
other.outputs.<key>resolves at runtime to that key of the dependency's output dict. - A callable — receives
(inputs, outputs)whereoutputsmaps dependency node ids to their output dicts; return the value to inject. - A static value — passed through as-is.
from dynamiq import Workflow
from dynamiq.connections import OpenAI as OpenAIConnection
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Prompt, Message
connection = OpenAIConnection()
summarizer = OpenAI(
id="summarizer",
connection=connection,
model="gpt-4o-mini",
prompt=Prompt(messages=[
Message(role="user", content="Summarize in two sentences: {{ text }}"),
]),
)
critic = (
OpenAI(
id="critic",
connection=connection,
model="gpt-4o-mini",
prompt=Prompt(messages=[
Message(role="user", content="Rate this summary for {{ audience }}:\n{{ summary }}"),
]),
)
.inputs(
summary=summarizer.outputs.content, # output of the summarizer node
audience="busy executives", # static value
)
.depends_on(summarizer)
)
wf = Workflow()
wf.flow.add_nodes(summarizer)
wf.flow.add_nodes(critic)
result = wf.run(input_data={"text": "Long article text goes here..."})
print(result.output["critic"]["output"]["content"])The workflow's input_data is offered to every node; nodes with no dependencies render their prompts straight from it ({{ text }} above), while downstream nodes typically take their inputs from dependency outputs via .inputs().
Nodes that declare the same dependencies but not each other run in parallel — add two independent agents to a flow and the executor schedules them concurrently with no extra code.
.inputs() is the modern, type-safe way to map data. The lower-level input_transformer=InputTransformer(selector={...}) with JSONPath-style selectors (e.g. "$['node-id'].output.content") does the same job declaratively and is what the YAML format uses — see Input transformers and Jinja for the selector syntax.
Node anatomy
Every node shares this configuration surface (all optional):
idstringnamestringconnectionBaseConnectiondependslist[NodeDependency]input_transformer / input_mappingInputTransformer / dictoutput_transformerOutputTransformererror_handlingErrorHandlingcachingCachingConfigstreamingStreamingConfigcallbackslist[NodeCallbackHandler]The execution lifecycle
When a flow schedules a node, run_sync / run_async walks the same sequence:
- Dependency check — all dependencies must have completed. If a dependency failed or was skipped (and its
error_handling.behaviorisRAISE), this node is skipped with statusSKIPrather than executed. - Input assembly — workflow input and dependency results are merged, then
input_transformerand.inputs()mappings are applied. - Schema validation — the assembled input is validated against the node's input schema.
- Callbacks —
on_node_startfires for run- and node-level handlers. - Execute — the node's
execute()runs, wrapped in retry/timeout logic fromerror_handlingand the cache layer ifcaching.enabled. - Result — a
RunnableResultwith statusSUCCESS,FAILURE,SKIP, orCANCELEDis recorded; downstream nodes can now be scheduled.
The flow's final output is the per-node map of these results:
result = wf.run(input_data={"text": "..."})
result.output
# {
# "summarizer": {"status": "success", "input": {...}, "output": {"content": "..."}},
# "critic": {"status": "success", "input": {...}, "output": {"content": "..."}}
# }See Running workflows & results for statuses, errors, and RunnableConfig.
Running workflows & results
run() vs run_sync()/run_async(), RunnableResult, and cancellation.
Connections & credentials
How nodes authenticate to external services.
Error handling & retries
Timeouts, retries with backoff, and failure behaviors.
YAML workflows
Serialize the same DAG to the platform's declarative format.
SDK vs Platform
Decide when to build with the open-source Python SDK, when to use the Dynamiq platform, and how the two connect through tracing, YAML, the gateway, and deployment.
Connections & Credentials
How SDK connection classes hold credentials, which environment variables they read by default, and how the ConnectionManager caches service clients.