Checkpoints
Persist flow state to a backend (in-memory, filesystem, PostgreSQL) and resume runs after crashes, timeouts, or human-input waits.
Checkpointing snapshots a flow's execution state — completed nodes, their outputs, agent loop progress, pending human-input requests — to a storage backend as the run progresses. If the process dies or a node fails after retries, you resume from the last checkpoint instead of re-running everything from scratch.
Enable checkpointing
Configure checkpointing at the flow level with CheckpointConfig and a backend:
from dynamiq.checkpoints import CheckpointBehavior, CheckpointConfig
from dynamiq.checkpoints.backends import FileSystem
from dynamiq.flows import Flow
from dynamiq.nodes.node import NodeDependency
from dynamiq.nodes.tools import Python
from dynamiq.nodes.utils import Input, Output
inp = Input(id="input", name="Input")
multiply = Python(
id="multiply",
name="multiply-by-10",
code="def run(input_data): return {'value': input_data.get('value', 0) * 10}",
depends=[NodeDependency(inp)],
)
out = Output(id="output", name="Output", depends=[NodeDependency(multiply)])
flow = Flow(
nodes=[inp, multiply, out],
checkpoint=CheckpointConfig(
enabled=True,
backend=FileSystem(base_path=".checkpoints"),
behavior=CheckpointBehavior.APPEND,
max_checkpoints=20,
),
)
result = flow.run_sync(input_data={"value": 4})The config is two-layered: the flow-level CheckpointConfig holds structural defaults (backend, retention, behavior), and a run-level CheckpointConfig passed via RunnableConfig.checkpoint overrides any field for that run — including resume_from.
CheckpointConfig reference
enabledboolbackendCheckpointBackendresume_fromstr | nullbehavior"append" | "replace"checkpoint_on_start_enabledboolcheckpoint_after_node_enabledboolcheckpoint_on_failure_enabledboolcheckpoint_on_cancel_enabledboolcheckpoint_mid_agent_loop_enabledboolcheckpoint_on_input_timeout_enabledboolmax_checkpointsintmax_ttl_minutesint | nullexclude_node_idslist[str]Backends
| Backend | Import | Notes |
|---|---|---|
InMemory | dynamiq.checkpoints.backends | Default. Process-local; useful for tests and time travel within one process. |
FileSystem | dynamiq.checkpoints.backends | JSON files under {base_path}/{flow_id}/{timestamp}__{run_id}/. Default base_path is .dynamiq/checkpoints. |
PostgreSQL | dynamiq.checkpoints.backends | Durable storage for production. Takes a dynamiq.connections.PostgreSQL connection, a table_name (default flow_checkpoints), and create_if_not_exist=True to auto-create the table. Call backend.close() when done. |
from dynamiq.checkpoints.backends import PostgreSQL as PostgresCheckpointBackend
from dynamiq.connections import PostgreSQL as PostgresConn
backend = PostgresCheckpointBackend(
connection=PostgresConn(), # reads POSTGRESQL_HOST/PORT/DATABASE/USER/PASSWORD env vars
table_name="flow_checkpoints",
create_if_not_exist=True,
)All backends share one interface: save, load, delete, get_latest_by_flow, get_list_by_flow, get_chain (walks parent_checkpoint_id links), and cleanup_by_flow(keep_count=...) — each with an _async variant.
Resuming a run
Find a checkpoint, then pass its id as resume_from. With input_data=None, the flow reuses the checkpoint's original_input; completed nodes are skipped and their saved outputs feed the remaining nodes:
from dynamiq.checkpoints import CheckpointConfig
from dynamiq.runnables import RunnableConfig
latest = flow.checkpoint.backend.get_latest_by_flow(flow.id)
config = RunnableConfig(checkpoint=CheckpointConfig(resume_from=latest.id))
result = flow.run_sync(input_data=None, config=config)
# Shorthand kwarg form:
result = flow.run_sync(input_data=None, resume_from=latest.id)What resume restores, beyond completed node outputs:
- Node internal state — each node's
to_checkpoint_state()/from_checkpoint_state()round-trips node-specific state. - Agent loop progress — agents and orchestrators implement
IterativeCheckpointMixin, so with mid-loop checkpoints enabled a resumed agent continues from its last completed iteration instead of restarting the loop. - Human-in-the-loop approvals — an approval response received before the crash is stored on the checkpoint, so the resumed node does not re-prompt the user. Nodes that were still waiting are re-run and ask again.
Inspecting checkpoints
Each FlowCheckpoint records id, flow_id, run_id, status (active, paused, completed, failed, canceled, pending_input), node_states keyed by node id, completed_node_ids, pending_node_ids, original_input, pending_inputs (HITL contexts), created_at, and parent_checkpoint_id:
backend = flow.checkpoint.backend
latest = backend.get_latest_by_flow(flow.id)
print(latest.status, latest.completed_node_ids)
for cp in backend.get_list_by_flow(flow.id, limit=10): # newest first
print(cp.id, cp.status.value, cp.parent_checkpoint_id)
chain = backend.get_chain(latest.id) # time-travel chain in APPEND mode
deleted = backend.cleanup_by_flow(flow.id, keep_count=2) # retentionIn APPEND mode you can resume from any checkpoint in the chain, not just the latest — useful for re-running a flow from an earlier decision point.
Runnable end-to-end demos: PostgreSQL checkpointing and sub-agent checkpoint + crash resume.