Custom Nodes
Subclass Node to build your own workflow components — input schemas, the execute contract, the execution lifecycle, and connection handling with ConnectionNode.
Every built-in component — LLMs, tools, retrievers, agents — is a subclass of dynamiq.nodes.node.Node. Your custom nodes follow the exact same contract, which means they get dependency wiring, input/output transformers, retries and timeouts, caching, streaming callbacks, and checkpointing for free.
The Node contract
A minimal custom node declares three things and implements execute:
groupNodeGrouprequirednamestrdescriptionstrinput_schemaClassVar[type[BaseModel]]execute(input_data, config, **kwargs)methodrequiredEvery node also inherits configuration fields you do not have to implement: id, error_handling, input_transformer / output_transformer, caching, streaming, approval (human-in-the-loop), and depends.
A complete custom tool
This node uses only the standard library, so you can run it as-is:
from typing import Any, ClassVar, Literal
from pydantic import BaseModel, Field
from dynamiq.nodes import NodeGroup
from dynamiq.nodes.node import Node, ensure_config
from dynamiq.runnables import RunnableConfig
class WordStatsInputSchema(BaseModel):
text: str = Field(..., description="Text to analyze.")
top_n: int = Field(default=5, description="How many of the most frequent words to return.")
class WordStatsTool(Node):
group: Literal[NodeGroup.TOOLS] = NodeGroup.TOOLS
name: str = "word-stats"
description: str = (
"Counts words in a text and returns the most frequent ones. "
"Provide the text to analyze and optionally top_n."
)
input_schema: ClassVar[type[WordStatsInputSchema]] = WordStatsInputSchema
def execute(
self, input_data: WordStatsInputSchema, config: RunnableConfig = None, **kwargs
) -> dict[str, Any]:
config = ensure_config(config)
self.run_on_node_execute_run(config.callbacks, **kwargs)
words = input_data.text.lower().split()
counts: dict[str, int] = {}
for word in words:
counts[word] = counts.get(word, 0) + 1
top = sorted(counts.items(), key=lambda kv: kv[1], reverse=True)[: input_data.top_n]
return {"content": {"total_words": len(words), "top_words": dict(top)}}
if __name__ == "__main__":
tool = WordStatsTool()
result = tool.run(input_data={"text": "the quick brown fox jumps over the lazy fox", "top_n": 3})
print(result.output)Key points, all from the base-class behavior in dynamiq/nodes/node.py:
ensure_config+run_on_node_execute_runwire your node into callbacks and tracing. Call them first inexecute.- Validated input. Because
input_schemais set,executereceives aWordStatsInputSchemainstance — invalid input fails before your code runs. Without a schema,executereceives the raw input dict. - Standalone runs. Any node implements the same
run()/run_sync()/run_async()interface as a workflow, so you can test it in isolation. See Running Workflows & Results. {"content": ...}is the convention agents read as the tool observation when the node is used in an agent'stoolslist.
For simple stateless functions you don't need a class at all — the function_tool decorator wraps a plain Python function into a tool node. See Tools & Function Tools.
Execution lifecycle
When a flow (or an agent) runs your node, run_sync executes this pipeline around your execute:
- Dependency validation — results of
dependsnodes are checked; a failed or skipped dependency (withbehavior="raise") skips this node. - Approval — if
approval.enabled, the human-in-the-loop gate runs first. - Input transformation —
input_transformer(JSONPathpath/selector) andinput_mapping(values set via.inputs()) build the input dict from raw input plus dependency outputs. - Schema validation —
input_schemais applied if defined. - Cache lookup — if caching is enabled, a hit returns the cached output without calling
execute. execute_with_retry— yourexecute, wrapped in the timeout/retry loop configured byerror_handling(see Error Handling & Retries).- Output transformation —
output_transformerreshapes the result before it is handed to dependents.
If you need native async execution (your node awaits I/O), override execute_async as well — run_async detects it and runs it directly on the event loop instead of offloading the sync execute to a thread.
Nodes that need a connection
Subclass ConnectionNode when your node talks to an external service. It adds two fields and a client lifecycle:
connection: BaseConnection | None— a typed Connection (e.g.connection: Tavilyin the built-inTavilyTool). Declaring the field with a concrete connection class makes it required and validated.client: Any | None— built automatically ininit_componentsby the flow'sConnectionManagerfrom the connection; you can also inject a pre-built client directly. At least one ofconnectionorclientmust be provided.ensure_client()— called before every execution attempt; if the client reports itself closed, it is reinitialized from the connection automatically, and reconnection failures participate in the retry loop.
Inside execute, use self.client for API calls. For a complete reference implementation, read the built-in TavilyTool (dynamiq/nodes/tools/tavily.py): it declares connection: Tavily, a rich input_schema, and an execute that calls self.client and raises ToolExecutionException on recoverable errors so the agent can correct its input and retry.
There is also VectorStoreNode (a ConnectionNode specialization) for nodes backed by a vector store — it manages a vector_store instance the same way. The built-in retrievers and writers in dynamiq.nodes.retrievers / dynamiq.nodes.writers are the reference implementations.
Useful subclass switches
A few inherited fields change how agents treat your node when it is used as a tool:
| Field | Default | Effect |
|---|---|---|
is_files_allowed | False | Permits the node to access files. |
is_parallel_execution_allowed | False | Lets an agent run this tool in parallel with other tool calls. |
is_optimized_for_agents | False | Marks output as already formatted for agent consumption. |
input_param_modes | {} | Per-field overrides on input_schema: "required" forces an optional param to be provided; "hidden" removes it from the agent-facing schema while keeping its default. |
Using the node
Custom nodes participate in flows and agents exactly like built-ins:
from dynamiq import Workflow
from dynamiq.flows import Flow
wf = Workflow(flow=Flow(nodes=[WordStatsTool(id="stats")]))
result = wf.run(input_data={"text": "to be or not to be", "top_n": 2})
print(result.output["stats"]["output"]["content"])Or hand it to an agent — Agent(llm=llm, tools=[WordStatsTool()], ...) — and the agent reads name, description, and the input schema to decide when to call it. The custom tools in the examples repo (calculator, file reader, scraper) follow this exact pattern.