Workflow
Workflows are a simple yet powerful construct that takes a callable and returns elements. Workflows operate well with pipelines but can work with any callable object. Workflows are streaming and work on data in batches, allowing large volumes of data to be processed efficiently.
Given that pipelines are callable objects, workflows enable efficient processing of pipeline data. Large language models typically work with smaller batches of data, workflows are well suited to feed a series of transformers pipelines.
An example of the most basic workflow:
workflow = Workflow([Task(lambda x: [y * 2 for y in x])])
list(workflow([1, 2, 3]))
This example multiplies each input value by 2 and returns transformed elements via a generator.
Since workflows run as generators, output must be consumed for execution to occur. The following snippets show how output can be consumed.
# Small dataset where output fits in memory
list(workflow(elements))
# Large dataset
for output in workflow(elements):
function(output)
# Large dataset where output is discarded
for _ in workflow(elements):
pass
Workflows are run with Python or configuration. Examples of both methods are shown below.
Example
A full-featured example is shown below in Python. This workflow transcribes a set of audio files, translates the text into French and indexes the data.
from txtai import Embeddings
from txtai.pipeline import Transcription, Translation
from txtai.workflow import FileTask, Task, Workflow
# Embeddings instance
embeddings = Embeddings({
"path": "sentence-transformers/paraphrase-MiniLM-L3-v2",
"content": True
})
# Transcription instance
transcribe = Transcription()
# Translation instance
translate = Translation()
tasks = [
FileTask(transcribe, r"\.wav$"),
Task(lambda x: translate(x, "fr"))
]
# List of files to process
data = [
"US_tops_5_million.wav",
"Canadas_last_fully.wav",
"Beijing_mobilises.wav",
"The_National_Park.wav",
"Maine_man_wins_1_mil.wav",
"Make_huge_profits.wav"
]
# Workflow that translate text to French
workflow = Workflow(tasks)
# Index data
embeddings.index((uid, text, None) for uid, text in enumerate(workflow(data)))
# Search
embeddings.search("wildlife", 1)
Configuration-driven example
Workflows can also be defined with YAML configuration.
writable: true
embeddings:
path: sentence-transformers/paraphrase-MiniLM-L3-v2
content: true
# Transcribe audio to text
transcription:
# Translate text between languages
translation:
workflow:
index:
tasks:
- action: transcription
select: "\\.wav$"
task: file
- action: translation
args: ["fr"]
- action: index
# Create and run the workflow
from txtai import Application
# Create and run the workflow
app = Application("workflow.yml")
list(app.workflow("index", [
"US_tops_5_million.wav",
"Canadas_last_fully.wav",
"Beijing_mobilises.wav",
"The_National_Park.wav",
"Maine_man_wins_1_mil.wav",
"Make_huge_profits.wav"
]))
# Search
app.search("wildlife")
The code above executes a workflow defined in the file `workflow.yml.
LLM workflow example
Workflows can connect multiple LLM prompting tasks together.
llm:
path: google/flan-t5-xl
workflow:
llm:
tasks:
- task: template
template: |
Extract keywords for the following text.
{text}
action: llm
- task: template
template: |
Translate the following text into French.
{text}
action: llm
from txtai import Application
app = Application("workflow.yml")
list(app.workflow("llm", [
"""
txtai is an open-source platform for semantic search
and workflows powered by language models.
"""
]))
Any txtai pipeline/workflow task can be connected in workflows with LLMs.
llm:
path: google/flan-t5-xl
translation:
workflow:
llm:
tasks:
- task: template
template: |
Extract keywords for the following text.
{text}
action: llm
- action: translation
args:
- fr
See the following links for more information.
Methods
Workflows are callable objects. Workflows take an input of iterable data elements and output iterable data elements.
__init__(tasks, batch=100, workers=None, name=None, stream=None)
Creates a new workflow. Workflows are lists of tasks to execute.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks |
list of workflow tasks |
required | |
batch |
how many items to process at a time, defaults to 100 |
100
|
|
workers |
number of concurrent workers |
None
|
|
name |
workflow name |
None
|
|
stream |
workflow stream processor |
None
|
Source code in txtai/workflow/base.py
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
|
__call__(elements)
Executes a workflow for input elements. This method returns a generator that yields transformed data elements.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
elements |
iterable data elements |
required |
Returns:
Type | Description |
---|---|
generator that yields transformed data elements |
Source code in txtai/workflow/base.py
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
|
schedule(cron, elements, iterations=None)
Schedules a workflow using a cron expression and elements.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cron |
cron expression |
required | |
elements |
iterable data elements passed to workflow each call |
required | |
iterations |
number of times to run workflow, defaults to run indefinitely |
None
|
Source code in txtai/workflow/base.py
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
|
More examples
See this link for a full list of workflow examples.