Dynamiq
Advanced

Checkpoints

Persist flow state to a backend (in-memory, filesystem, PostgreSQL) and resume runs after crashes, timeouts, or human-input waits.

Checkpointing snapshots a flow's execution state — completed nodes, their outputs, agent loop progress, pending human-input requests — to a storage backend as the run progresses. If the process dies or a node fails after retries, you resume from the last checkpoint instead of re-running everything from scratch.

Enable checkpointing

Configure checkpointing at the flow level with CheckpointConfig and a backend:

from dynamiq.checkpoints import CheckpointBehavior, CheckpointConfig
from dynamiq.checkpoints.backends import FileSystem
from dynamiq.flows import Flow
from dynamiq.nodes.node import NodeDependency
from dynamiq.nodes.tools import Python
from dynamiq.nodes.utils import Input, Output

inp = Input(id="input", name="Input")
multiply = Python(
    id="multiply",
    name="multiply-by-10",
    code="def run(input_data): return {'value': input_data.get('value', 0) * 10}",
    depends=[NodeDependency(inp)],
)
out = Output(id="output", name="Output", depends=[NodeDependency(multiply)])

flow = Flow(
    nodes=[inp, multiply, out],
    checkpoint=CheckpointConfig(
        enabled=True,
        backend=FileSystem(base_path=".checkpoints"),
        behavior=CheckpointBehavior.APPEND,
        max_checkpoints=20,
    ),
)

result = flow.run_sync(input_data={"value": 4})

The config is two-layered: the flow-level CheckpointConfig holds structural defaults (backend, retention, behavior), and a run-level CheckpointConfig passed via RunnableConfig.checkpoint overrides any field for that run — including resume_from.

CheckpointConfig reference

enabledbool
Whether checkpointing is active. Default false.
backendCheckpointBackend
Storage backend instance. Default InMemory.
resume_fromstr | null
Checkpoint ID to resume from (per-run).
behavior"append" | "replace"
APPEND (default) creates a new snapshot per save, building a parent-linked chain for time travel; REPLACE overwrites the same checkpoint.
checkpoint_on_start_enabledbool
Persist a checkpoint at run start, before any node executes. Default true.
checkpoint_after_node_enabledbool
Create a checkpoint after each node completes. Default true.
checkpoint_on_failure_enabledbool
Create a checkpoint when the workflow fails. Default true.
checkpoint_on_cancel_enabledbool
Create a checkpoint when the workflow is canceled. Default true.
checkpoint_mid_agent_loop_enabledbool
Checkpoint during long agent loops so resumes can skip completed iterations. Default false.
checkpoint_on_input_timeout_enabledbool
Create a checkpoint when a streaming input wait times out (HITL). Default true.
max_checkpointsint
Max checkpoints kept per flow id; oldest are removed beyond this. Default 50.
max_ttl_minutesint | null
Delete checkpoints older than this many minutes.
exclude_node_idslist[str]
Node IDs to skip when checkpointing.

Backends

BackendImportNotes
InMemorydynamiq.checkpoints.backendsDefault. Process-local; useful for tests and time travel within one process.
FileSystemdynamiq.checkpoints.backendsJSON files under {base_path}/{flow_id}/{timestamp}__{run_id}/. Default base_path is .dynamiq/checkpoints.
PostgreSQLdynamiq.checkpoints.backendsDurable storage for production. Takes a dynamiq.connections.PostgreSQL connection, a table_name (default flow_checkpoints), and create_if_not_exist=True to auto-create the table. Call backend.close() when done.
from dynamiq.checkpoints.backends import PostgreSQL as PostgresCheckpointBackend
from dynamiq.connections import PostgreSQL as PostgresConn

backend = PostgresCheckpointBackend(
    connection=PostgresConn(),  # reads POSTGRESQL_HOST/PORT/DATABASE/USER/PASSWORD env vars
    table_name="flow_checkpoints",
    create_if_not_exist=True,
)

All backends share one interface: save, load, delete, get_latest_by_flow, get_list_by_flow, get_chain (walks parent_checkpoint_id links), and cleanup_by_flow(keep_count=...) — each with an _async variant.

Resuming a run

Find a checkpoint, then pass its id as resume_from. With input_data=None, the flow reuses the checkpoint's original_input; completed nodes are skipped and their saved outputs feed the remaining nodes:

from dynamiq.checkpoints import CheckpointConfig
from dynamiq.runnables import RunnableConfig

latest = flow.checkpoint.backend.get_latest_by_flow(flow.id)

config = RunnableConfig(checkpoint=CheckpointConfig(resume_from=latest.id))
result = flow.run_sync(input_data=None, config=config)

# Shorthand kwarg form:
result = flow.run_sync(input_data=None, resume_from=latest.id)

What resume restores, beyond completed node outputs:

  • Node internal state — each node's to_checkpoint_state() / from_checkpoint_state() round-trips node-specific state.
  • Agent loop progress — agents and orchestrators implement IterativeCheckpointMixin, so with mid-loop checkpoints enabled a resumed agent continues from its last completed iteration instead of restarting the loop.
  • Human-in-the-loop approvals — an approval response received before the crash is stored on the checkpoint, so the resumed node does not re-prompt the user. Nodes that were still waiting are re-run and ask again.

Inspecting checkpoints

Each FlowCheckpoint records id, flow_id, run_id, status (active, paused, completed, failed, canceled, pending_input), node_states keyed by node id, completed_node_ids, pending_node_ids, original_input, pending_inputs (HITL contexts), created_at, and parent_checkpoint_id:

backend = flow.checkpoint.backend

latest = backend.get_latest_by_flow(flow.id)
print(latest.status, latest.completed_node_ids)

for cp in backend.get_list_by_flow(flow.id, limit=10):   # newest first
    print(cp.id, cp.status.value, cp.parent_checkpoint_id)

chain = backend.get_chain(latest.id)                      # time-travel chain in APPEND mode
deleted = backend.cleanup_by_flow(flow.id, keep_count=2)  # retention

In APPEND mode you can resume from any checkpoint in the chain, not just the latest — useful for re-running a flow from an earlier decision point.

Runnable end-to-end demos: PostgreSQL checkpointing and sub-agent checkpoint + crash resume.

Next steps

On this page