Dynamiq
RAG

RAG Pipeline

Build both halves of RAG in the SDK — an indexing flow that converts, splits, embeds, and stores documents, and a retrieval flow that answers questions over them.

A RAG system in the SDK is two workflows. The indexing flow converts files to documents, splits them into chunks, embeds the chunks, and writes vectors to a store. The retrieval flow embeds the user's query, fetches the most similar chunks, and feeds them to an LLM to generate a grounded answer. This page builds both end to end with PyPDF, OpenAI embeddings, and Pinecone — every component is swappable, as the sibling pages show.

Indexing flow

from io import BytesIO

from dynamiq import Workflow
from dynamiq.connections import (
    OpenAI as OpenAIConnection,
    Pinecone as PineconeConnection,
)
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.writers import PineconeDocumentWriter

rag_wf = Workflow()

# 1. Convert PDFs to documents
converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
rag_wf.flow.add_nodes(converter)

# 2. Split documents into chunks
document_splitter = (
    DocumentSplitter(split_by="sentence", split_length=10, split_overlap=1)
    .inputs(documents=converter.outputs.documents)
    .depends_on(converter)
)
rag_wf.flow.add_nodes(document_splitter)

# 3. Embed each chunk
embedder = (
    OpenAIDocumentEmbedder(
        connection=OpenAIConnection(),
        model="text-embedding-3-small",
    )
    .inputs(documents=document_splitter.outputs.documents)
    .depends_on(document_splitter)
)
rag_wf.flow.add_nodes(embedder)

# 4. Upsert vectors into the store
vector_store = (
    PineconeDocumentWriter(
        connection=PineconeConnection(),
        index_name="quickstart",
        dimension=1536,
        create_if_not_exist=True,
        index_type="serverless",
        cloud="aws",
        region="us-east-1",
    )
    .inputs(documents=embedder.outputs.documents)
    .depends_on(embedder)
)
rag_wf.flow.add_nodes(vector_store)

# Run it over local PDFs
file_paths = ["example.pdf"]
rag_wf.run(
    input_data={
        "files": [BytesIO(open(path, "rb").read()) for path in file_paths],
        "metadata": [{"filename": path} for path in file_paths],
    }
)

How the pieces connect:

  • .depends_on(node) declares execution order; .inputs(documents=node.outputs.documents) maps the upstream output into the downstream node's input. (The equivalent lower-level form is an InputTransformer with a JSONPath selector.)
  • The writer reports how many vectors it stored under upserted_count in its output.
  • Metadata you attach to files travels with every chunk, so you can filter on it at retrieval time.

Retrieval flow

from dynamiq import Workflow
from dynamiq.connections import (
    OpenAI as OpenAIConnection,
    Pinecone as PineconeConnection,
)
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.llms import OpenAI
from dynamiq.nodes.retrievers import PineconeDocumentRetriever
from dynamiq.prompts import Message, Prompt

retrieval_wf = Workflow()
openai_connection = OpenAIConnection()

# 1. Embed the query
embedder = OpenAITextEmbedder(
    connection=openai_connection,
    model="text-embedding-3-small",
)
retrieval_wf.flow.add_nodes(embedder)

# 2. Retrieve the closest chunks
document_retriever = (
    PineconeDocumentRetriever(
        connection=PineconeConnection(),
        index_name="quickstart",
        dimension=1536,
        top_k=5,
    )
    .inputs(embedding=embedder.outputs.embedding)
    .depends_on(embedder)
)
retrieval_wf.flow.add_nodes(document_retriever)

# 3. Generate a grounded answer
prompt_template = """
Please answer the question based on the provided context.

Question: {{ query }}

Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}
"""

answer_generator = (
    OpenAI(
        connection=openai_connection,
        model="gpt-4o",
        prompt=Prompt(messages=[Message(content=prompt_template, role="user")]),
    )
    .inputs(
        documents=document_retriever.outputs.documents,
        query=embedder.outputs.query,
    )
    .depends_on([embedder, document_retriever])
)
retrieval_wf.flow.add_nodes(answer_generator)

# Ask a question
result = retrieval_wf.run(input_data={"query": "What are the line items in the invoice?"})
print(result.output[answer_generator.id]["output"]["content"])

The embedder outputs both the embedding (consumed by the retriever) and the original query string (reused in the prompt). The retriever outputs documents, each with content, metadata, and a similarity score.

Always embed queries with the same model used at indexing time — mixing embedders silently breaks retrieval quality.

Swapping components

Each stage is one node, so changing providers is a one-node change:

StageThis page usedAlternatives
ConvertPyPDFConverterDOCX, PPTX, HTML, CSV, text, LLM vision, Unstructured — see Document Processing
SplitDocumentSplitterToken, recursive-character, Markdown/HTML header, semantic, contextual, code, JSON — same page
EmbedOpenAICohere, Bedrock, Mistral, Gemini, Hugging Face, watsonx, Vertex AI — see Embedders & Vector Stores
Store / retrievePineconeWeaviate, Qdrant, Milvus, Chroma, Elasticsearch, OpenSearch, pgvector — same page, plus Retrievers & Rankers

Agentic RAG

Instead of a fixed retrieval flow, you can hand retrieval to an agent as a tool: VectorStoreRetriever bundles a text embedder and a retriever (optionally a reranker) behind a single query interface the agent calls on demand. See Retrievers & Rankers.

On the platform, the same two flows exist as a Knowledge Base's generated ingestion workflow and the search endpoint — see Build a RAG Pipeline.

Next steps

On this page