RAG Pipeline
Build both halves of RAG in the SDK — an indexing flow that converts, splits, embeds, and stores documents, and a retrieval flow that answers questions over them.
A RAG system in the SDK is two workflows. The indexing flow converts files to documents, splits them into chunks, embeds the chunks, and writes vectors to a store. The retrieval flow embeds the user's query, fetches the most similar chunks, and feeds them to an LLM to generate a grounded answer. This page builds both end to end with PyPDF, OpenAI embeddings, and Pinecone — every component is swappable, as the sibling pages show.
Indexing flow
from io import BytesIO
from dynamiq import Workflow
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Pinecone as PineconeConnection,
)
from dynamiq.nodes.converters import PyPDFConverter
from dynamiq.nodes.embedders import OpenAIDocumentEmbedder
from dynamiq.nodes.splitters.document import DocumentSplitter
from dynamiq.nodes.writers import PineconeDocumentWriter
rag_wf = Workflow()
# 1. Convert PDFs to documents
converter = PyPDFConverter(document_creation_mode="one-doc-per-page")
rag_wf.flow.add_nodes(converter)
# 2. Split documents into chunks
document_splitter = (
DocumentSplitter(split_by="sentence", split_length=10, split_overlap=1)
.inputs(documents=converter.outputs.documents)
.depends_on(converter)
)
rag_wf.flow.add_nodes(document_splitter)
# 3. Embed each chunk
embedder = (
OpenAIDocumentEmbedder(
connection=OpenAIConnection(),
model="text-embedding-3-small",
)
.inputs(documents=document_splitter.outputs.documents)
.depends_on(document_splitter)
)
rag_wf.flow.add_nodes(embedder)
# 4. Upsert vectors into the store
vector_store = (
PineconeDocumentWriter(
connection=PineconeConnection(),
index_name="quickstart",
dimension=1536,
create_if_not_exist=True,
index_type="serverless",
cloud="aws",
region="us-east-1",
)
.inputs(documents=embedder.outputs.documents)
.depends_on(embedder)
)
rag_wf.flow.add_nodes(vector_store)
# Run it over local PDFs
file_paths = ["example.pdf"]
rag_wf.run(
input_data={
"files": [BytesIO(open(path, "rb").read()) for path in file_paths],
"metadata": [{"filename": path} for path in file_paths],
}
)How the pieces connect:
.depends_on(node)declares execution order;.inputs(documents=node.outputs.documents)maps the upstream output into the downstream node's input. (The equivalent lower-level form is anInputTransformerwith a JSONPath selector.)- The writer reports how many vectors it stored under
upserted_countin its output. - Metadata you attach to files travels with every chunk, so you can filter on it at retrieval time.
Retrieval flow
from dynamiq import Workflow
from dynamiq.connections import (
OpenAI as OpenAIConnection,
Pinecone as PineconeConnection,
)
from dynamiq.nodes.embedders import OpenAITextEmbedder
from dynamiq.nodes.llms import OpenAI
from dynamiq.nodes.retrievers import PineconeDocumentRetriever
from dynamiq.prompts import Message, Prompt
retrieval_wf = Workflow()
openai_connection = OpenAIConnection()
# 1. Embed the query
embedder = OpenAITextEmbedder(
connection=openai_connection,
model="text-embedding-3-small",
)
retrieval_wf.flow.add_nodes(embedder)
# 2. Retrieve the closest chunks
document_retriever = (
PineconeDocumentRetriever(
connection=PineconeConnection(),
index_name="quickstart",
dimension=1536,
top_k=5,
)
.inputs(embedding=embedder.outputs.embedding)
.depends_on(embedder)
)
retrieval_wf.flow.add_nodes(document_retriever)
# 3. Generate a grounded answer
prompt_template = """
Please answer the question based on the provided context.
Question: {{ query }}
Context:
{% for document in documents %}
- {{ document.content }}
{% endfor %}
"""
answer_generator = (
OpenAI(
connection=openai_connection,
model="gpt-4o",
prompt=Prompt(messages=[Message(content=prompt_template, role="user")]),
)
.inputs(
documents=document_retriever.outputs.documents,
query=embedder.outputs.query,
)
.depends_on([embedder, document_retriever])
)
retrieval_wf.flow.add_nodes(answer_generator)
# Ask a question
result = retrieval_wf.run(input_data={"query": "What are the line items in the invoice?"})
print(result.output[answer_generator.id]["output"]["content"])The embedder outputs both the embedding (consumed by the retriever) and the original query string (reused in the prompt). The retriever outputs documents, each with content, metadata, and a similarity score.
Always embed queries with the same model used at indexing time — mixing embedders silently breaks retrieval quality.
Swapping components
Each stage is one node, so changing providers is a one-node change:
| Stage | This page used | Alternatives |
|---|---|---|
| Convert | PyPDFConverter | DOCX, PPTX, HTML, CSV, text, LLM vision, Unstructured — see Document Processing |
| Split | DocumentSplitter | Token, recursive-character, Markdown/HTML header, semantic, contextual, code, JSON — same page |
| Embed | OpenAI | Cohere, Bedrock, Mistral, Gemini, Hugging Face, watsonx, Vertex AI — see Embedders & Vector Stores |
| Store / retrieve | Pinecone | Weaviate, Qdrant, Milvus, Chroma, Elasticsearch, OpenSearch, pgvector — same page, plus Retrievers & Rankers |
Agentic RAG
Instead of a fixed retrieval flow, you can hand retrieval to an agent as a tool: VectorStoreRetriever bundles a text embedder and a retriever (optionally a reranker) behind a single query interface the agent calls on demand. See Retrievers & Rankers.
On the platform, the same two flows exist as a Knowledge Base's generated ingestion workflow and the search endpoint — see Build a RAG Pipeline.