Dynamiq
Concepts

Streaming & Callbacks

Stream tokens and intermediate steps from LLM and agent nodes with StreamingConfig, and hook every lifecycle event with callback handlers.

Two mechanisms make SDK runs observable in real time. Callbacks are handler objects that receive every lifecycle event of a run — workflow start, node end, errors, stream chunks. Streaming is a node-level feature that emits LLM tokens and agent steps as they're generated, delivered through a callback handler you iterate.

Callback handlers

A callback handler subclasses BaseCallbackHandler (from dynamiq.callbacks) and overrides any of these hooks:

HookFires
on_workflow_start / on_workflow_end / on_workflow_error / on_workflow_canceledAround the whole workflow run.
on_flow_start / on_flow_end / on_flow_error / on_flow_canceledAround the flow execution.
on_node_start / on_node_end / on_node_error / on_node_skip / on_node_canceledAround each node.
on_node_execute_start / on_node_execute_end / on_node_execute_error / on_node_execute_run / on_node_execute_streamInside node execution — including each retry attempt and each stream chunk.

Pass handlers per run via RunnableConfig(callbacks=[...]), or attach them to a single node with its callbacks field. Each hook receives the serialized entity, the data (input/output/error/chunk), and kwargs carrying run_id, wf_run_id, and parent_run_id so you can reconstruct the execution tree.

from typing import Any
from dynamiq.callbacks import BaseCallbackHandler
from dynamiq.runnables import RunnableConfig

class LoggingHandler(BaseCallbackHandler):
    def on_node_start(self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any):
        print(f"-> {serialized.get('name')} started")

    def on_node_end(self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any):
        print(f"<- {serialized.get('name')} finished")

result = workflow.run(input_data={"text": "..."}, config=RunnableConfig(callbacks=[LoggingHandler()]))

Built-in handlers include TracingCallbackHandler (builds the run tree), DynamiqTracingCallbackHandler (sends it to the platform — see Tracing to Dynamiq), and the streaming handlers below.

Enabling streaming on a node

Streaming is configured per node with StreamingConfig (from dynamiq.types.streaming), or the node.enable_streaming() shortcut:

enabledboolean
Turn streaming on. Default false.
eventstring
Event name stamped on every emitted message. Default "streaming". Use distinct names to tell multiple streaming nodes apart.
modeStreamingMode
FINAL (default) streams only the final output of agent nodes; ALL also streams intermediate steps — agent reasoning, tool calls, tool results.
include_usageboolean
Include token usage information in the stream. Default false.
min_chunk_charsinteger
Accumulate at least this many characters before emitting an event (0 = emit immediately). Reduces event volume.
stream_tool_inputlist[string] | null
Allowlist of tool names whose inputs are streamed as the LLM generates them; null streams all.
timeoutnumber
Timeout in seconds for input streaming. Default 600.

Consuming a stream

StreamingIteratorCallbackHandler (from dynamiq.callbacks.streaming) collects stream events on a queue and is itself an iterator. The complete pattern, runnable as-is with OPENAI_API_KEY set:

from dynamiq import Workflow
from dynamiq.callbacks.streaming import StreamingIteratorCallbackHandler
from dynamiq.connections import OpenAI as OpenAIConnection
from dynamiq.nodes.llms import OpenAI
from dynamiq.prompts import Prompt, Message
from dynamiq.runnables import RunnableConfig
from dynamiq.types.streaming import StreamingConfig

llm = OpenAI(
    id="writer",
    connection=OpenAIConnection(),
    model="gpt-4o-mini",
    prompt=Prompt(messages=[
        Message(role="user", content="Write a short poem about {{ topic }}."),
    ]),
    streaming=StreamingConfig(enabled=True, event="data"),
)

workflow = Workflow()
workflow.flow.add_nodes(llm)

handler = StreamingIteratorCallbackHandler()
workflow.run(
    input_data={"topic": "the sea"},
    config=RunnableConfig(callbacks=[handler]),
)

for event in handler:
    if event.event == "data":  # chunks from our streaming node
        content = event.data.get("choices", [{}])[0].get("delta", {}).get("content")
        if content:
            print(content, end="")

LLM chunks follow the OpenAI delta shape: the text lives at event.data["choices"][0]["delta"]["content"]. The iterator ends when the workflow finishes — the final event carries the workflow output.

For async applications, AsyncStreamingIteratorCallbackHandler is the same handler backed by an asyncio.Queue: start the run as a task (or in an executor), then async for event in handler to forward events to a WebSocket or SSE response as they arrive. The repository's examples/components/core/websocket directory shows full FastAPI servers built on this pattern.

The event schema

Every stream message is a StreamingEventMessage:

run_idstring
Id of the node run that emitted the event.
wf_run_idstring
Id of the enclosing workflow run.
entity_idstring
Id of the emitting node (or workflow for the final event).
dataanyrequired
The payload — an OpenAI-style delta chunk for LLM tokens, structured step data for agent events, or the final output.
eventstring
The event name from the node's StreamingConfig; "streaming" by default.
sourceobject
Emitting entity details: id, name, group (e.g. "llms", "agents"), and type.

Filter on event (or source.group) when several nodes stream on the same run. With mode=StreamingMode.ALL on an agent, you'll also receive structured intermediate events — reasoning thoughts, tool inputs as they're generated, and tool results with their tool_run_id and loop_num — so a UI can render the agent's progress step by step.

Per-run streaming control

Even with streaming.enabled=True on a node, LLM token streaming only actually happens when a streaming callback handler is present in the run's callbacks — without one, the node falls back to a normal (non-streamed) completion. So the same workflow serves both a streaming endpoint and a batch job: include the handler for interactive runs, omit it for batch.

For nodes that consume streamed input (such as the human-feedback tool behind a WebSocket), RunnableConfig.nodes_override carries per-node StreamingConfig overrides keyed by node id, so each run can wire its own input_queue without mutating shared node objects:

from dynamiq.runnables import RunnableConfig
from dynamiq.runnables.base import NodeRunnableConfig
from dynamiq.types.streaming import StreamingConfig

config = RunnableConfig(
    callbacks=[handler],
    nodes_override={
        "feedback-tool": NodeRunnableConfig(
            streaming=StreamingConfig(enabled=True, input_queue=queue, input_queue_done_event=done),
        ),
    },
)

Token output streaming is configured on the node itself — set streaming=StreamingConfig(enabled=True, ...) or call node.enable_streaming() on the node definition. The repository's examples/components/core/websocket/ws_server_fastapi.py shows nodes_override in action for per-connection input queues.

Deployed Apps expose this same event stream over HTTP: pass "stream": true and each SSE data: line is a serialized StreamingEventMessage. See Streaming and async and Call your App.

On this page