Streaming & Callbacks
Stream tokens and intermediate steps from LLM and agent nodes with StreamingConfig, and hook every lifecycle event with callback handlers.
Two mechanisms make SDK runs observable in real time. Callbacks are handler objects that receive every lifecycle event of a run — workflow start, node end, errors, stream chunks. Streaming is a node-level feature that emits LLM tokens and agent steps as they're generated, delivered through a callback handler you iterate.
Callback handlers
A callback handler subclasses BaseCallbackHandler (from dynamiq.callbacks) and overrides any of these hooks:
| Hook | Fires |
|---|---|
on_workflow_start / on_workflow_end / on_workflow_error / on_workflow_canceled | Around the whole workflow run. |
on_flow_start / on_flow_end / on_flow_error / on_flow_canceled | Around the flow execution. |
on_node_start / on_node_end / on_node_error / on_node_skip / on_node_canceled | Around each node. |
on_node_execute_start / on_node_execute_end / on_node_execute_error / on_node_execute_run / on_node_execute_stream | Inside node execution — including each retry attempt and each stream chunk. |
Pass handlers per run via RunnableConfig(callbacks=[...]), or attach them to a single node with its callbacks field. Each hook receives the serialized entity, the data (input/output/error/chunk), and kwargs carrying run_id, wf_run_id, and parent_run_id so you can reconstruct the execution tree.
from typing import Any
from dynamiq.callbacks import BaseCallbackHandler
from dynamiq.runnables import RunnableConfig
class LoggingHandler(BaseCallbackHandler):
def on_node_start(self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any):
print(f"-> {serialized.get('name')} started")
def on_node_end(self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any):
print(f"<- {serialized.get('name')} finished")
result = workflow.run(input_data={"text": "..."}, config=RunnableConfig(callbacks=[LoggingHandler()]))Built-in handlers include TracingCallbackHandler (builds the run tree), DynamiqTracingCallbackHandler (sends it to the platform — see Tracing to Dynamiq), and the streaming handlers below.
Enabling streaming on a node
Streaming is configured per node with StreamingConfig (from dynamiq.types.streaming), or the node.enable_streaming() shortcut:
enabledbooleaneventstringmodeStreamingModeinclude_usagebooleanmin_chunk_charsintegerstream_tool_inputlist[string] | nulltimeoutnumberConsuming a stream
StreamingIteratorCallbackHandler (from dynamiq.callbacks.streaming) collects stream events on a queue and is itself an iterator. The complete pattern, runnable as-is with OPENAI_API_KEY set:
from dynamiq import Workflow
from dynamiq.callbacks.streaming import StreamingIteratorCallbackHandler
from dynamiq.connections import OpenAI as OpenAIConnection
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Prompt, Message
from dynamiq.runnables import RunnableConfig
from dynamiq.types.streaming import StreamingConfig
llm = OpenAI(
id="writer",
connection=OpenAIConnection(),
model="gpt-4o-mini",
prompt=Prompt(messages=[
Message(role="user", content="Write a short poem about {{ topic }}."),
]),
streaming=StreamingConfig(enabled=True, event="data"),
)
workflow = Workflow()
workflow.flow.add_nodes(llm)
handler = StreamingIteratorCallbackHandler()
workflow.run(
input_data={"topic": "the sea"},
config=RunnableConfig(callbacks=[handler]),
)
for event in handler:
if event.event == "data": # chunks from our streaming node
content = event.data.get("choices", [{}])[0].get("delta", {}).get("content")
if content:
print(content, end="")LLM chunks follow the OpenAI delta shape: the text lives at event.data["choices"][0]["delta"]["content"]. The iterator ends when the workflow finishes — the final event carries the workflow output.
For async applications, AsyncStreamingIteratorCallbackHandler is the same handler backed by an asyncio.Queue: start the run as a task (or in an executor), then async for event in handler to forward events to a WebSocket or SSE response as they arrive. The repository's examples/components/core/websocket directory shows full FastAPI servers built on this pattern.
The event schema
Every stream message is a StreamingEventMessage:
run_idstringwf_run_idstringentity_idstringdataanyrequiredeventstringsourceobjectFilter on event (or source.group) when several nodes stream on the same run. With mode=StreamingMode.ALL on an agent, you'll also receive structured intermediate events — reasoning thoughts, tool inputs as they're generated, and tool results with their tool_run_id and loop_num — so a UI can render the agent's progress step by step.
Per-run streaming control
Even with streaming.enabled=True on a node, LLM token streaming only actually happens when a streaming callback handler is present in the run's callbacks — without one, the node falls back to a normal (non-streamed) completion. So the same workflow serves both a streaming endpoint and a batch job: include the handler for interactive runs, omit it for batch.
For nodes that consume streamed input (such as the human-feedback tool behind a WebSocket), RunnableConfig.nodes_override carries per-node StreamingConfig overrides keyed by node id, so each run can wire its own input_queue without mutating shared node objects:
from dynamiq.runnables import RunnableConfig
from dynamiq.runnables.base import NodeRunnableConfig
from dynamiq.types.streaming import StreamingConfig
config = RunnableConfig(
callbacks=[handler],
nodes_override={
"feedback-tool": NodeRunnableConfig(
streaming=StreamingConfig(enabled=True, input_queue=queue, input_queue_done_event=done),
),
},
)Token output streaming is configured on the node itself — set streaming=StreamingConfig(enabled=True, ...) or call node.enable_streaming() on the node definition. The repository's examples/components/core/websocket/ws_server_fastapi.py shows nodes_override in action for per-connection input queues.
Deployed Apps expose this same event stream over HTTP: pass "stream": true and each SSE data: line is a serialized StreamingEventMessage. See Streaming and async and Call your App.