Dynamiq
Advanced

Custom Nodes

Subclass Node to build your own workflow components — input schemas, the execute contract, the execution lifecycle, and connection handling with ConnectionNode.

Every built-in component — LLMs, tools, retrievers, agents — is a subclass of dynamiq.nodes.node.Node. Your custom nodes follow the exact same contract, which means they get dependency wiring, input/output transformers, retries and timeouts, caching, streaming callbacks, and checkpointing for free.

The Node contract

A minimal custom node declares three things and implements execute:

groupNodeGrouprequired
Category of the node. For agent-callable tools use NodeGroup.TOOLS; other values include LLMS, RETRIEVERS, CONVERTERS, SPLITTERS, EMBEDDERS, WRITERS, VALIDATORS, UTILS, and more.
namestr
Human-readable name. For tools, the agent sees it when selecting tools.
descriptionstr
For tools, this is the main signal the LLM uses to decide when to call the node — describe what it does and the expected input shape.
input_schemaClassVar[type[BaseModel]]
Optional Pydantic model. When set, inputs are validated before execute() and the model also drives the agent-facing tool schema.
execute(input_data, config, **kwargs)methodrequired
The unit of work. Receives validated input (an input_schema instance when defined, otherwise a dict) and returns the output — for tools, a dict with the result under "content".

Every node also inherits configuration fields you do not have to implement: id, error_handling, input_transformer / output_transformer, caching, streaming, approval (human-in-the-loop), and depends.

A complete custom tool

This node uses only the standard library, so you can run it as-is:

from typing import Any, ClassVar, Literal

from pydantic import BaseModel, Field

from dynamiq.nodes import NodeGroup
from dynamiq.nodes.node import Node, ensure_config
from dynamiq.runnables import RunnableConfig


class WordStatsInputSchema(BaseModel):
    text: str = Field(..., description="Text to analyze.")
    top_n: int = Field(default=5, description="How many of the most frequent words to return.")


class WordStatsTool(Node):
    group: Literal[NodeGroup.TOOLS] = NodeGroup.TOOLS
    name: str = "word-stats"
    description: str = (
        "Counts words in a text and returns the most frequent ones. "
        "Provide the text to analyze and optionally top_n."
    )
    input_schema: ClassVar[type[WordStatsInputSchema]] = WordStatsInputSchema

    def execute(
        self, input_data: WordStatsInputSchema, config: RunnableConfig = None, **kwargs
    ) -> dict[str, Any]:
        config = ensure_config(config)
        self.run_on_node_execute_run(config.callbacks, **kwargs)

        words = input_data.text.lower().split()
        counts: dict[str, int] = {}
        for word in words:
            counts[word] = counts.get(word, 0) + 1
        top = sorted(counts.items(), key=lambda kv: kv[1], reverse=True)[: input_data.top_n]

        return {"content": {"total_words": len(words), "top_words": dict(top)}}


if __name__ == "__main__":
    tool = WordStatsTool()
    result = tool.run(input_data={"text": "the quick brown fox jumps over the lazy fox", "top_n": 3})
    print(result.output)

Key points, all from the base-class behavior in dynamiq/nodes/node.py:

  • ensure_config + run_on_node_execute_run wire your node into callbacks and tracing. Call them first in execute.
  • Validated input. Because input_schema is set, execute receives a WordStatsInputSchema instance — invalid input fails before your code runs. Without a schema, execute receives the raw input dict.
  • Standalone runs. Any node implements the same run() / run_sync() / run_async() interface as a workflow, so you can test it in isolation. See Running Workflows & Results.
  • {"content": ...} is the convention agents read as the tool observation when the node is used in an agent's tools list.

For simple stateless functions you don't need a class at all — the function_tool decorator wraps a plain Python function into a tool node. See Tools & Function Tools.

Execution lifecycle

When a flow (or an agent) runs your node, run_sync executes this pipeline around your execute:

  1. Dependency validation — results of depends nodes are checked; a failed or skipped dependency (with behavior="raise") skips this node.
  2. Approval — if approval.enabled, the human-in-the-loop gate runs first.
  3. Input transformationinput_transformer (JSONPath path / selector) and input_mapping (values set via .inputs()) build the input dict from raw input plus dependency outputs.
  4. Schema validationinput_schema is applied if defined.
  5. Cache lookup — if caching is enabled, a hit returns the cached output without calling execute.
  6. execute_with_retry — your execute, wrapped in the timeout/retry loop configured by error_handling (see Error Handling & Retries).
  7. Output transformationoutput_transformer reshapes the result before it is handed to dependents.

If you need native async execution (your node awaits I/O), override execute_async as well — run_async detects it and runs it directly on the event loop instead of offloading the sync execute to a thread.

Nodes that need a connection

Subclass ConnectionNode when your node talks to an external service. It adds two fields and a client lifecycle:

  • connection: BaseConnection | None — a typed Connection (e.g. connection: Tavily in the built-in TavilyTool). Declaring the field with a concrete connection class makes it required and validated.
  • client: Any | None — built automatically in init_components by the flow's ConnectionManager from the connection; you can also inject a pre-built client directly. At least one of connection or client must be provided.
  • ensure_client() — called before every execution attempt; if the client reports itself closed, it is reinitialized from the connection automatically, and reconnection failures participate in the retry loop.

Inside execute, use self.client for API calls. For a complete reference implementation, read the built-in TavilyTool (dynamiq/nodes/tools/tavily.py): it declares connection: Tavily, a rich input_schema, and an execute that calls self.client and raises ToolExecutionException on recoverable errors so the agent can correct its input and retry.

There is also VectorStoreNode (a ConnectionNode specialization) for nodes backed by a vector store — it manages a vector_store instance the same way. The built-in retrievers and writers in dynamiq.nodes.retrievers / dynamiq.nodes.writers are the reference implementations.

Useful subclass switches

A few inherited fields change how agents treat your node when it is used as a tool:

FieldDefaultEffect
is_files_allowedFalsePermits the node to access files.
is_parallel_execution_allowedFalseLets an agent run this tool in parallel with other tool calls.
is_optimized_for_agentsFalseMarks output as already formatted for agent consumption.
input_param_modes{}Per-field overrides on input_schema: "required" forces an optional param to be provided; "hidden" removes it from the agent-facing schema while keeping its default.

Using the node

Custom nodes participate in flows and agents exactly like built-ins:

from dynamiq import Workflow
from dynamiq.flows import Flow

wf = Workflow(flow=Flow(nodes=[WordStatsTool(id="stats")]))
result = wf.run(input_data={"text": "to be or not to be", "top_n": 2})
print(result.output["stats"]["output"]["content"])

Or hand it to an agent — Agent(llm=llm, tools=[WordStatsTool()], ...) — and the agent reads name, description, and the input schema to decide when to call it. The custom tools in the examples repo (calculator, file reader, scraper) follow this exact pattern.

Next steps

On this page