Dynamiq
Concepts

Running Workflows & Results

The run() interface, RunnableResult and statuses, RunnableConfig options, async execution, and mid-run cancellation.

Workflows, flows, and nodes all implement the same Runnable interface: one run() entry point that returns a RunnableResult. This page covers what goes in, what comes out, and how to control a run while it's in flight.

run(), run_sync(), run_async()

run() is a dispatcher. Called from synchronous code it executes immediately and returns a RunnableResult; called from an async context it returns a coroutine you await. You can force a mode with is_async, or call the explicit variants:

# Synchronous
result = workflow.run(input_data={"text": "Hola Mundo!"})

# Asynchronous
result = await workflow.run(input_data={"text": "Hola Mundo!"})

# Explicit, no context detection
result = workflow.run_sync(input_data={"text": "Hola Mundo!"})
result = await workflow.run_async(input_data={"text": "Hola Mundo!"})

In async runs, nodes that implement native async execution run directly on the event loop; sync-only nodes are offloaded to a thread pool automatically, so mixing node types in one flow is safe.

RunnableResult

Every run returns a RunnableResult:

statusRunnableStatusrequired
success, failure, skip, canceled, or undefined (not yet run).
inputAny
The input data the runnable received.
outputAny
For a workflow: a dict keyed by node id, each entry holding that node's status, input, and output. For a single node: the node's output dict.
errorRunnableResultError | null
On failure: exception type, message, recoverable flag, and failed_nodes — the nodes whose error_handling.behavior is "raise" that caused the failure (each with id, name, error_message).

A typical read path:

from dynamiq.runnables import RunnableStatus

result = workflow.run(input_data={"text": "Hola Mundo!"})

if result.status == RunnableStatus.SUCCESS:
    node_result = result.output["translator"]          # keyed by node id
    print(node_result["output"]["content"])
else:
    print(result.error.message)
    for node in result.error.failed_nodes:
        print(f"{node.name} ({node.id}): {node.error_message}")

A failed workflow doesn't raise — it returns status == FAILURE with the error attached, so your calling code decides how to react. Whether a single node failure fails the whole run depends on that node's error_handling.behavior (raise by default; return lets the flow continue) — see Error handling & retries.

Statuses you'll see per node inside result.output: success, failure, skip (a dependency failed or a branch condition wasn't met), and canceled.

RunnableConfig

The second argument to every run() is a RunnableConfig carrying per-run options:

run_idstring
Identifier for this run; auto-generated UUID by default.
callbackslist[BaseCallbackHandler]
Handlers that receive workflow/flow/node lifecycle events — tracing, streaming, custom. See Streaming & callbacks.
cacheCacheConfig
Cache backend configuration used by nodes with caching.enabled.
max_node_workersint
Cap on concurrent node execution for this run.
nodes_overridedict[str, NodeRunnableConfig]
Per-node overrides keyed by node id — currently the node's StreamingConfig, consulted by nodes that accept streamed input (e.g. per-run input queues).
checkpointCheckpointConfig
Per-run checkpoint overrides, including resume_from. See Checkpoints.
cancellationCancellationConfig
Always present — every config gets a fresh CancellationToken. Signal it to abort the run.
dry_runDryRunConfig
Run in dry-run mode; nodes that create external resources clean them up afterward.
from dynamiq.runnables import RunnableConfig

config = RunnableConfig(callbacks=[my_handler], max_node_workers=4)
result = workflow.run(input_data={"text": "..."}, config=config)

Cancellation

Cancellation is built into every run. Each RunnableConfig carries a thread-safe CancellationToken; signal it from anywhere (an API handler, a timeout watchdog, a UI button) and the run stops at the next checkpoint with status CANCELED:

import threading
from dynamiq.runnables import RunnableConfig, RunnableStatus

config = RunnableConfig()

# from another thread:
threading.Timer(30.0, config.cancellation.token.cancel).start()

result = workflow.run(input_data={"input": "long task..."}, config=config)
if result.status == RunnableStatus.CANCELED:
    print("Run was canceled")

You can also construct your own token (e.g. one shared across several runs) via CancellationConfig(token=my_token) from dynamiq.types.cancellation. In async code, cancelling the asyncio task that awaits run_async() has the same effect: the SDK translates the CancelledError into a cooperative cancel, drains in-flight node threads, and returns a CANCELED result instead of propagating the exception.

Resuming from checkpoints

Flows can persist per-node state and resume a failed or interrupted run from where it stopped:

result = workflow.run_sync(input_data=None, resume_from=checkpoint_id)

Checkpoint backends (filesystem, in-memory, PostgreSQL) and configuration live on the flow — see Checkpoints.

On this page