Skip to content

RAG

pipeline pipeline

The RAG pipeline (aka Extractor) joins a prompt, context data store and generative model together to extract knowledge.

The data store can be an embeddings database or a similarity instance with associated input text. The generative model can be a prompt-driven large language model (LLM), an extractive question-answering model or a custom pipeline. This is known as retrieval augmented generation (RAG).

Example

The following shows a simple example using this pipeline.

from txtai import Embeddings, RAG

# Input data
data = [
  "US tops 5 million confirmed virus cases",
  "Canada's last fully intact ice shelf has suddenly collapsed, " +
  "forming a Manhattan-sized iceberg",
  "Beijing mobilises invasion craft along coast as Taiwan tensions escalate",
  "The National Park Service warns against sacrificing slower friends " +
  "in a bear attack",
  "Maine man wins $1M from $25 lottery ticket",
  "Make huge profits without work, earn up to $100,000 a day"
]

# Build embeddings index
embeddings = Embeddings(content=True)
embeddings.index(data)

# Create and run pipeline
rag = RAG(embeddings, "google/flan-t5-base", template="""
  Answer the following question using the provided context.

  Question:
  {question}

  Context:
  {context}
""")

rag("What was won?")

See the Embeddings and LLM pages for additional configuration options.

See the links below for more detailed examples.

Notebook Description
Prompt-driven search with LLMs Embeddings-guided and Prompt-driven search with Large Language Models (LLMs) Open In Colab
Prompt templates and task chains Build model prompts and connect tasks together with workflows Open In Colab
Build RAG pipelines with txtai Guide on retrieval augmented generation including how to create citations Open In Colab
Integrate LLM frameworks Integrate llama.cpp, LiteLLM and custom generation frameworks Open In Colab
Generate knowledge with Semantic Graphs and RAG Knowledge exploration and discovery with Semantic Graphs and RAG Open In Colab
Build knowledge graphs with LLMs Build knowledge graphs with LLM-driven entity extraction Open In Colab
Advanced RAG with graph path traversal Graph path traversal to collect complex sets of data for advanced RAG Open In Colab
Advanced RAG with guided generation Retrieval Augmented and Guided Generation Open In Colab
RAG with llama.cpp and external API services RAG with additional vector and LLM frameworks Open In Colab
How RAG with txtai works Create RAG processes, API services and Docker instances Open In Colab
Speech to Speech RAG ▶️ Full cycle speech to speech workflow with RAG Open In Colab
Generative Audio Storytelling with generative audio workflows Open In Colab
Extractive QA with txtai Introduction to extractive question-answering with txtai Open In Colab
Extractive QA with Elasticsearch Run extractive question-answering queries with Elasticsearch Open In Colab
Extractive QA to build structured data Build structured datasets using extractive question-answering Open In Colab

Configuration-driven example

Pipelines are run with Python or configuration. Pipelines can be instantiated in configuration using the lower case name of the pipeline. Configuration-driven pipelines are run with workflows or the API.

config.yml

# Allow documents to be indexed
writable: True

# Content is required for extractor pipeline
embeddings:
  content: True

rag:
  path: google/flan-t5-base
  template: |
    Answer the following question using the provided context.

    Question:
    {question}

    Context:
    {context}

workflow:
  search:
    tasks:
      - action: rag

Run with Workflows

Built in tasks make using the extractor pipeline easier.

from txtai import Application

# Create and run pipeline with workflow
app = Application("config.yml")
app.add([
  "US tops 5 million confirmed virus cases",
  "Canada's last fully intact ice shelf has suddenly collapsed, " +
  "forming a Manhattan-sized iceberg",
  "Beijing mobilises invasion craft along coast as Taiwan tensions escalate",
  "The National Park Service warns against sacrificing slower friends " +
  "in a bear attack",
  "Maine man wins $1M from $25 lottery ticket",
  "Make huge profits without work, earn up to $100,000 a day"
])
app.index()

list(app.workflow("search", ["What was won?"]))

Run with API

CONFIG=config.yml uvicorn "txtai.api:app" &

curl \
  -X POST "http://localhost:8000/workflow" \
  -H "Content-Type: application/json" \
  -d '{"name": "search", "elements": ["What was won"]}'

Methods

Python documentation for the pipeline.

__init__(similarity, path, quantize=False, gpu=True, model=None, tokenizer=None, minscore=None, mintokens=None, context=None, task=None, output='default', template=None, separator=' ', system=None, **kwargs)

Builds a new RAG pipeline.

Parameters:

Name Type Description Default
similarity

similarity instance (embeddings or similarity pipeline)

required
path

path to model, supports a LLM, Questions or custom pipeline

required
quantize

True if model should be quantized before inference, False otherwise.

False
gpu

if gpu inference should be used (only works if GPUs are available)

True
model

optional existing pipeline model to wrap

None
tokenizer

Tokenizer class

None
minscore

minimum score to include context match, defaults to None

None
mintokens

minimum number of tokens to include context match, defaults to None

None
context

topn context matches to include, defaults to 3

None
task

model task (language-generation, sequence-sequence or question-answering), defaults to auto-detect

None
output

output format, 'default' returns (name, answer), 'flatten' returns answers and 'reference' returns (name, answer, reference)

'default'
template

prompt template, it must have a parameter for {question} and {context}, defaults to "{question} {context}"

None
separator

context separator

' '
system

system prompt, defaults to None

None
kwargs

additional keyword arguments to pass to pipeline model

{}
Source code in txtai/pipeline/llm/rag.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(
    self,
    similarity,
    path,
    quantize=False,
    gpu=True,
    model=None,
    tokenizer=None,
    minscore=None,
    mintokens=None,
    context=None,
    task=None,
    output="default",
    template=None,
    separator=" ",
    system=None,
    **kwargs,
):
    """
    Builds a new RAG pipeline.

    Args:
        similarity: similarity instance (embeddings or similarity pipeline)
        path: path to model, supports a LLM, Questions or custom pipeline
        quantize: True if model should be quantized before inference, False otherwise.
        gpu: if gpu inference should be used (only works if GPUs are available)
        model: optional existing pipeline model to wrap
        tokenizer: Tokenizer class
        minscore: minimum score to include context match, defaults to None
        mintokens: minimum number of tokens to include context match, defaults to None
        context: topn context matches to include, defaults to 3
        task: model task (language-generation, sequence-sequence or question-answering), defaults to auto-detect
        output: output format, 'default' returns (name, answer), 'flatten' returns answers and 'reference' returns (name, answer, reference)
        template: prompt template, it must have a parameter for {question} and {context}, defaults to "{question} {context}"
        separator: context separator
        system: system prompt, defaults to None
        kwargs: additional keyword arguments to pass to pipeline model
    """

    # Similarity instance
    self.similarity = similarity

    # Model can be a LLM, Questions or custom pipeline
    self.model = self.load(path, quantize, gpu, model, task, **kwargs)

    # Tokenizer class use default method if not set
    self.tokenizer = tokenizer if tokenizer else Tokenizer() if hasattr(self.similarity, "scoring") and self.similarity.isweighted() else None

    # Minimum score to include context match
    self.minscore = minscore if minscore is not None else 0.0

    # Minimum number of tokens to include context match
    self.mintokens = mintokens if mintokens is not None else 0.0

    # Top n context matches to include for context
    self.context = context if context else 3

    # Output format
    self.output = output

    # Prompt template
    self.template = template if template else "{question} {context}"

    # Context separator
    self.separator = separator

    # System prompt template
    self.system = system

__call__(queue, texts=None, **kwargs)

Finds answers to input questions. This method runs queries to find the top n best matches and uses that as the context. A model is then run against the context for each input question, with the answer returned.

Parameters:

Name Type Description Default
queue

input question queue (name, query, question, snippet), can be list of tuples/dicts/strings or a single input element

required
texts

optional list of text for context, otherwise runs embeddings search

None
kwargs

additional keyword arguments to pass to pipeline model

{}

Returns:

Type Description

list of answers matching input format (tuple or dict) containing fields as specified by output format

Source code in txtai/pipeline/llm/rag.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def __call__(self, queue, texts=None, **kwargs):
    """
    Finds answers to input questions. This method runs queries to find the top n best matches and uses that as the context.
    A model is then run against the context for each input question, with the answer returned.

    Args:
        queue: input question queue (name, query, question, snippet), can be list of tuples/dicts/strings or a single input element
        texts: optional list of text for context, otherwise runs embeddings search
        kwargs: additional keyword arguments to pass to pipeline model

    Returns:
        list of answers matching input format (tuple or dict) containing fields as specified by output format
    """

    # Save original queue format
    inputs = queue

    # Convert queue to list, if necessary
    queue = queue if isinstance(queue, list) else [queue]

    # Convert dictionary inputs to tuples
    if queue and isinstance(queue[0], dict):
        # Convert dict to tuple
        queue = [tuple(row.get(x) for x in ["name", "query", "question", "snippet"]) for row in queue]

    if queue and isinstance(queue[0], str):
        # Convert string questions to tuple
        queue = [(None, row, row, None) for row in queue]

    # Rank texts by similarity for each query
    results = self.query([query for _, query, _, _ in queue], texts)

    # Build question-context pairs
    names, queries, questions, contexts, topns, snippets = [], [], [], [], [], []
    for x, (name, query, question, snippet) in enumerate(queue):
        # Get top n best matching segments
        topn = sorted(results[x], key=lambda y: y[2], reverse=True)[: self.context]

        # Generate context using ordering from texts, if available, otherwise order by score
        context = self.separator.join(text for _, text, _ in (sorted(topn, key=lambda y: y[0]) if texts else topn))

        names.append(name)
        queries.append(query)
        questions.append(question)
        contexts.append(context)
        topns.append(topn)
        snippets.append(snippet)

    # Run pipeline and return answers
    answers = self.answers(questions, contexts, **kwargs)

    # Apply output formatting to answers and return
    return self.apply(inputs, names, queries, answers, topns, snippets) if isinstance(answers, list) else answers