Running Workflows & Results
The run() interface, RunnableResult and statuses, RunnableConfig options, async execution, and mid-run cancellation.
Workflows, flows, and nodes all implement the same Runnable interface: one run() entry point that returns a RunnableResult. This page covers what goes in, what comes out, and how to control a run while it's in flight.
run(), run_sync(), run_async()
run() is a dispatcher. Called from synchronous code it executes immediately and returns a RunnableResult; called from an async context it returns a coroutine you await. You can force a mode with is_async, or call the explicit variants:
# Synchronous
result = workflow.run(input_data={"text": "Hola Mundo!"})
# Asynchronous
result = await workflow.run(input_data={"text": "Hola Mundo!"})
# Explicit, no context detection
result = workflow.run_sync(input_data={"text": "Hola Mundo!"})
result = await workflow.run_async(input_data={"text": "Hola Mundo!"})In async runs, nodes that implement native async execution run directly on the event loop; sync-only nodes are offloaded to a thread pool automatically, so mixing node types in one flow is safe.
RunnableResult
Every run returns a RunnableResult:
statusRunnableStatusrequiredinputAnyoutputAnyerrorRunnableResultError | nullA typical read path:
from dynamiq.runnables import RunnableStatus
result = workflow.run(input_data={"text": "Hola Mundo!"})
if result.status == RunnableStatus.SUCCESS:
node_result = result.output["translator"] # keyed by node id
print(node_result["output"]["content"])
else:
print(result.error.message)
for node in result.error.failed_nodes:
print(f"{node.name} ({node.id}): {node.error_message}")A failed workflow doesn't raise — it returns status == FAILURE with the error attached, so your calling code decides how to react. Whether a single node failure fails the whole run depends on that node's error_handling.behavior (raise by default; return lets the flow continue) — see Error handling & retries.
Statuses you'll see per node inside result.output: success, failure, skip (a dependency failed or a branch condition wasn't met), and canceled.
RunnableConfig
The second argument to every run() is a RunnableConfig carrying per-run options:
run_idstringcallbackslist[BaseCallbackHandler]cacheCacheConfigmax_node_workersintnodes_overridedict[str, NodeRunnableConfig]checkpointCheckpointConfigcancellationCancellationConfigdry_runDryRunConfigfrom dynamiq.runnables import RunnableConfig
config = RunnableConfig(callbacks=[my_handler], max_node_workers=4)
result = workflow.run(input_data={"text": "..."}, config=config)Cancellation
Cancellation is built into every run. Each RunnableConfig carries a thread-safe CancellationToken; signal it from anywhere (an API handler, a timeout watchdog, a UI button) and the run stops at the next checkpoint with status CANCELED:
import threading
from dynamiq.runnables import RunnableConfig, RunnableStatus
config = RunnableConfig()
# from another thread:
threading.Timer(30.0, config.cancellation.token.cancel).start()
result = workflow.run(input_data={"input": "long task..."}, config=config)
if result.status == RunnableStatus.CANCELED:
print("Run was canceled")You can also construct your own token (e.g. one shared across several runs) via CancellationConfig(token=my_token) from dynamiq.types.cancellation. In async code, cancelling the asyncio task that awaits run_async() has the same effect: the SDK translates the CancelledError into a cooperative cancel, drains in-flight node threads, and returns a CANCELED result instead of propagating the exception.
Resuming from checkpoints
Flows can persist per-node state and resume a failed or interrupted run from where it stopped:
result = workflow.run_sync(input_data=None, resume_from=checkpoint_id)Checkpoint backends (filesystem, in-memory, PostgreSQL) and configuration live on the flow — see Checkpoints.
Connections & Credentials
How SDK connection classes hold credentials, which environment variables they read by default, and how the ConnectionManager caches service clients.
Streaming & Callbacks
Stream tokens and intermediate steps from LLM and agent nodes with StreamingConfig, and hook every lifecycle event with callback handlers.