Dynamiq
Concepts

Workflows, Flows & Nodes

The three core SDK abstractions — Workflow, Flow, and Node — how the DAG is wired with depends_on() and .inputs(), and the node execution lifecycle.

Everything you build with the SDK is made of three layers: a Node does one unit of work, a Flow is the DAG of nodes, and a Workflow is the container you actually run. Understanding how data moves between them is most of what you need to be productive.

The three layers

ClassImportRole
Workflowfrom dynamiq import WorkflowTop-level runnable. Owns a Flow, an id, a name, and an optional version; fires workflow-level callbacks and handles YAML serialization (from_yaml_file / to_yaml_file).
Flowfrom dynamiq.flows import FlowThe DAG. Holds nodes, topologically sorts them by dependencies, and executes independent nodes in parallel on a thread executor (max_node_workers caps concurrency). Also owns the ConnectionManager.
Nodedynamiq.nodes.*One unit of execution — an LLM call, an agent, a tool, a retriever. Configured with a connection, error_handling, caching, streaming, transformers, and depends.

Workflow() creates an empty Flow for you; wf.flow.add_nodes(node) and Workflow(flow=Flow(nodes=[...])) are equivalent ways to populate it.

from dynamiq import Workflow
from dynamiq.flows import Flow

wf = Workflow(flow=Flow(nodes=[node_a, node_b]))
# or
wf = Workflow()
wf.flow.add_nodes(node_a)
wf.flow.add_nodes(node_b)

Every node also implements the same run() interface as the workflow, so any node can be executed standalone — handy for testing a single step.

Wiring the DAG

Two chainable methods declare the graph:

  • node.depends_on(other) — execution order. Accepts a single node or a list; the flow won't start this node until all dependencies have completed. Cycles are rejected at flow construction with a CycleError.
  • node.inputs(key=value) — data mapping. Sets what each input field receives at runtime.

.inputs() values can be three things:

  1. A reference to another node's outputother.outputs.<key> resolves at runtime to that key of the dependency's output dict.
  2. A callable — receives (inputs, outputs) where outputs maps dependency node ids to their output dicts; return the value to inject.
  3. A static value — passed through as-is.
from dynamiq import Workflow
from dynamiq.connections import OpenAI as OpenAIConnection
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Prompt, Message

connection = OpenAIConnection()

summarizer = OpenAI(
    id="summarizer",
    connection=connection,
    model="gpt-4o-mini",
    prompt=Prompt(messages=[
        Message(role="user", content="Summarize in two sentences: {{ text }}"),
    ]),
)

critic = (
    OpenAI(
        id="critic",
        connection=connection,
        model="gpt-4o-mini",
        prompt=Prompt(messages=[
            Message(role="user", content="Rate this summary for {{ audience }}:\n{{ summary }}"),
        ]),
    )
    .inputs(
        summary=summarizer.outputs.content,  # output of the summarizer node
        audience="busy executives",          # static value
    )
    .depends_on(summarizer)
)

wf = Workflow()
wf.flow.add_nodes(summarizer)
wf.flow.add_nodes(critic)

result = wf.run(input_data={"text": "Long article text goes here..."})
print(result.output["critic"]["output"]["content"])

The workflow's input_data is offered to every node; nodes with no dependencies render their prompts straight from it ({{ text }} above), while downstream nodes typically take their inputs from dependency outputs via .inputs().

Nodes that declare the same dependencies but not each other run in parallel — add two independent agents to a flow and the executor schedules them concurrently with no extra code.

.inputs() is the modern, type-safe way to map data. The lower-level input_transformer=InputTransformer(selector={...}) with JSONPath-style selectors (e.g. "$['node-id'].output.content") does the same job declaratively and is what the YAML format uses — see Input transformers and Jinja for the selector syntax.

Node anatomy

Every node shares this configuration surface (all optional):

idstring
Unique node id; auto-generated UUID by default. Output of a workflow run is keyed by this.
namestring
Human-readable name used in logs, traces, and streaming event sources.
connectionBaseConnection
Credentials/endpoint for nodes that call external services. See Connections & credentials.
dependslist[NodeDependency]
Dependencies; usually set via .depends_on(). A dependency can carry a condition for branching.
input_transformer / input_mappingInputTransformer / dict
Declarative selector or .inputs() bindings that shape the node input from workflow input and dependency outputs.
output_transformerOutputTransformer
JSONPath/selector reshaping applied to the node output.
error_handlingErrorHandling
timeout_seconds, max_retries, retry_interval_seconds, backoff_rate, and behavior (raise or return).
cachingCachingConfig
Enable per-node output caching (requires a cache backend in RunnableConfig).
streamingStreamingConfig
Token/event streaming settings. See Streaming & callbacks.
callbackslist[NodeCallbackHandler]
Node-scoped callback handlers, in addition to run-level ones.

The execution lifecycle

When a flow schedules a node, run_sync / run_async walks the same sequence:

  1. Dependency check — all dependencies must have completed. If a dependency failed or was skipped (and its error_handling.behavior is RAISE), this node is skipped with status SKIP rather than executed.
  2. Input assembly — workflow input and dependency results are merged, then input_transformer and .inputs() mappings are applied.
  3. Schema validation — the assembled input is validated against the node's input schema.
  4. Callbackson_node_start fires for run- and node-level handlers.
  5. Execute — the node's execute() runs, wrapped in retry/timeout logic from error_handling and the cache layer if caching.enabled.
  6. Result — a RunnableResult with status SUCCESS, FAILURE, SKIP, or CANCELED is recorded; downstream nodes can now be scheduled.

The flow's final output is the per-node map of these results:

result = wf.run(input_data={"text": "..."})
result.output
# {
#   "summarizer": {"status": "success", "input": {...}, "output": {"content": "..."}},
#   "critic":     {"status": "success", "input": {...}, "output": {"content": "..."}}
# }

See Running workflows & results for statuses, errors, and RunnableConfig.

On this page