Orchestrators
Coordinate multiple agents in code — manager-led delegation with agents as tools, and the Graph Orchestrator state machine.
The SDK offers two multi-agent coordination patterns. For dynamic delegation, give a manager agent other agents as tools and let the LLM route work. For explicit control flow, use GraphOrchestrator — a state machine where you define states, edges, and conditions in code.
The dedicated LinearOrchestrator and AdaptiveOrchestrator classes were removed from the SDK; the manager-led pattern below covers both use cases. On the platform, Linear and Adaptive Orchestrator nodes remain available in the visual builder.
Manager-led delegation (agents as tools)
Pass specialist agents in a manager agent's tools list. The manager calls each specialist with {"input": "<task>"} payloads and assembles the final answer:
from dynamiq import Workflow
from dynamiq.connections import OpenAI as OpenAIConnection, ScaleSerp as ScaleSerpConnection
from dynamiq.flows import Flow
from dynamiq.nodes.agents import Agent
from dynamiq.nodes.llms import OpenAI
from dynamiq.nodes.tools.scale_serp import ScaleSerpTool
from dynamiq.nodes.types import Behavior, InferenceMode
search_tool = ScaleSerpTool(connection=ScaleSerpConnection()) # reads SERP_API_KEY
llm = OpenAI(connection=OpenAIConnection(), model="gpt-4o", temperature=0.1)
research_agent = Agent(
name="Research Analyst",
role="Find recent market news and provide referenced highlights.",
llm=llm,
tools=[search_tool],
inference_mode=InferenceMode.XML,
max_loops=6,
behaviour_on_max_loops=Behavior.RETURN,
)
writer_agent = Agent(
name="Brief Writer",
role="Turn research highlights into a concise executive brief.",
llm=llm,
inference_mode=InferenceMode.XML,
max_loops=4,
behaviour_on_max_loops=Behavior.RETURN,
)
manager_agent = Agent(
name="Manager",
role=(
"Delegate research and writing to sub-agents.\n"
"Always call tools with {'input': '<task>'} payloads and assemble the final brief."
),
llm=llm,
tools=[research_agent, writer_agent],
inference_mode=InferenceMode.XML,
parallel_tool_calls_enabled=True,
max_loops=8,
behaviour_on_max_loops=Behavior.RETURN,
)
workflow = Workflow(flow=Flow(nodes=[manager_agent]))
result = workflow.run(
input_data={"input": "Summarize the latest developments in battery technology for investors."},
)
print(result.output[manager_agent.id]["output"]["content"])Give each sub-agent a description so the manager knows when to call it, and set parallel_tool_calls_enabled=True on the manager to fan out independent subtasks. When parent and child agents both have memory, user_id/session_id from the run input propagate to children automatically.
Graph Orchestrator
GraphOrchestrator executes a directed graph of states. Each state runs one or more tasks — agents or plain Python callables — and edges (plain or conditional) decide what runs next. The orchestrator carries a shared context dict and a chat history across states.
from typing import Any
from dynamiq.connections import OpenAI as OpenAIConnection
from dynamiq.nodes import InputTransformer
from dynamiq.nodes.agents import Agent
from dynamiq.nodes.agents.orchestrators.graph import END, START, GraphOrchestrator
from dynamiq.nodes.agents.orchestrators.graph_manager import GraphAgentManager
from dynamiq.nodes.llms import OpenAI
llm = OpenAI(connection=OpenAIConnection(), model="gpt-4o", temperature=0.1)
email_writer = Agent(
name="email-writer-agent",
llm=llm,
role="Write personalized emails taking into account feedback.",
input_transformer=InputTransformer(selector={"input": "$.context.agent_input"}),
)
def gather_feedback(context: dict[str, Any], **kwargs):
"""Gather feedback about the email draft."""
draft = context.get("history", [{}])[-1].get("content", "No draft")
feedback = input(f"Email draft:\n{draft}\nPress Enter to send, or type feedback to refine: \n")
if feedback.strip() == "":
return {"result": "Email was sent!", "agent_input": None}
return {
"result": "Email was canceled!",
"agent_input": f"Draft of canceled email:\n{draft}\nUser feedback:\n{feedback}",
}
def router(context: dict[str, Any], **kwargs):
"""Determine the next state based on the provided feedback."""
if context.get("agent_input"):
return "generate_sketch"
return END
orchestrator = GraphOrchestrator(
name="Graph orchestrator",
manager=GraphAgentManager(llm=llm),
)
orchestrator.add_state_by_tasks("generate_sketch", [email_writer])
orchestrator.add_state_by_tasks("gather_feedback", [gather_feedback])
orchestrator.add_edge(START, "generate_sketch")
orchestrator.add_edge("generate_sketch", "gather_feedback")
orchestrator.add_conditional_edge("gather_feedback", ["generate_sketch", END], router)
orchestrator.run(input_data={"input": "Write and post email: invite the team to the offsite."})
print(orchestrator._chat_history[-1]["content"])Building blocks
| API | What it does |
|---|---|
add_state_by_tasks(state_id, tasks) | Creates a state from a list of Nodes and/or callables (callables are wrapped as function tools automatically) |
add_state(state) | Adds a pre-built GraphState |
add_edge(source, destination) | Unconditional transition |
add_conditional_edge(source, destinations, condition) | The condition callable (or a Python node) receives the shared context and returns the next state id |
START, END | Reserved state ids; execution begins at START (or initial_state) and finishes at END |
Key configuration on the orchestrator itself:
managerGraphAgentManagerrequiredinitial_statestrcontextdictmax_loopsintinput_analysis_enabledboolContext and state rules
- A callable task receives
context(which includes the runninghistory) and returns a dict; its keys are merged back into the shared context — that is howgather_feedbackpassesagent_inputto the router above. - A state containing an
Agenttask must have a manager available;add_state_by_taskswires the orchestrator's manager in automatically. - Agents inside states read orchestrator data through their
input_transformer— for example"$.context.agent_input"maps a context key to the agent'sinput.
For a larger build, the SDK repository ships complete graph examples (a code assistant, a trip planner, and a customer-service concierge) under examples/components/agents/orchestrators/graph_orchestrator. The platform equivalent is covered in the Graph Orchestrator guide and the Graph State node reference.
Checkpoints
Orchestrator and agent loops support checkpointing, so long multi-agent runs can resume after interruption — see Checkpoints.
Next steps
Memory
Give agents conversation memory with pluggable backends — in-memory, SQL, vector stores, or the Dynamiq platform — plus save modes and retrieval strategies.
RAG Pipeline
Build both halves of RAG in the SDK — an indexing flow that converts, splits, embeds, and stores documents, and a retrieval flow that answers questions over them.