Skip to content

Workflow

workflow workflow

Workflows are a simple yet powerful construct that takes a callable and returns elements. Workflows operate well with pipelines but can work with any callable object. Workflows are streaming and work on data in batches, allowing large volumes of data to be processed efficiently.

Given that pipelines are callable objects, workflows enable efficient processing of pipeline data. Transformers models typically work with smaller batches of data, workflows are well suited to feed a series of transformers pipelines.

An example of the most basic workflow:

workflow = Workflow([Task(lambda x: [y * 2 for y in x])])
list(workflow([1, 2, 3]))

This example multiplies each input value by 2 and returns transformed elements via a generator.

Since workflows run as generators, output must be consumed for execution to occur. The following snippets show how output can be consumed.

# Small dataset where output fits in memory
list(workflow(elements))

# Large dataset
for output in workflow(elements):
    function(output)

# Large dataset where output is discarded
for _ in workflow(elements):
    pass

Workflows are run with Python or configuration. Examples of both methods are shown below.

Example

A full-featured example is shown below in Python. This workflow transcribes a set of audio files, translates the text into French and indexes the data.

from txtai.embeddings import Embeddings
from txtai.pipeline import Transcription, Translation
from txtai.workflow import FileTask, Task, Workflow

# Embeddings instance
embeddings = Embeddings({
    "path": "sentence-transformers/paraphrase-MiniLM-L3-v2",
    "content": True
})

# Transcription instance
transcribe = Transcription()

# Translation instance
translate = Translation()

tasks = [
    FileTask(transcribe, r"\.wav$"),
    Task(lambda x: translate(x, "fr"))
]

# List of files to process
data = [
  "US_tops_5_million.wav",
  "Canadas_last_fully.wav",
  "Beijing_mobilises.wav",
  "The_National_Park.wav",
  "Maine_man_wins_1_mil.wav",
  "Make_huge_profits.wav"
]

# Workflow that translate text to French
workflow = Workflow(tasks)

# Index data
embeddings.index((uid, text, None) for uid, text in enumerate(workflow(data)))

# Search
embeddings.search("wildlife", 1)

Configuration-driven example

Workflows can be defined using Python as shown above but they can also run with YAML configuration.

writable: true
embeddings:
  path: sentence-transformers/paraphrase-MiniLM-L3-v2
  content: true

# Transcribe audio to text
transcription:

# Translate text between languages
translation:

workflow:
  index:
    tasks:
      - action: transcription
        select: "\\.wav$"
        task: file
      - action: translation
        args: ["fr"]
      - action: index
# Create and run the workflow
from txtai.app import Application

# Create and run the workflow
app = Application("workflow.yml")
list(app.workflow("index", [
  "US_tops_5_million.wav",
  "Canadas_last_fully.wav",
  "Beijing_mobilises.wav",
  "The_National_Park.wav",
  "Maine_man_wins_1_mil.wav",
  "Make_huge_profits.wav"
]))

# Search
app.search("wildlife")

The code above executes a workflow defined in the file workflow.yml. The API is used to run the workflow locally, there is minimal overhead running workflows in this manner. It's a matter of preference.

See the following links for more information.

Methods

Workflows are callable objects. Workflows take an input of iterable data elements and output iterable data elements.

__init__(self, tasks, batch=100, workers=None, name=None) special

Creates a new workflow. Workflows are lists of tasks to execute.

Parameters:

Name Type Description Default
tasks

list of workflow tasks

required
batch

how many items to process at a time, defaults to 100

100
workers

number of concurrent workers

None
name

workflow name

None
Source code in txtai/workflow/base.py
def __init__(self, tasks, batch=100, workers=None, name=None):
    """
    Creates a new workflow. Workflows are lists of tasks to execute.

    Args:
        tasks: list of workflow tasks
        batch: how many items to process at a time, defaults to 100
        workers: number of concurrent workers
        name: workflow name
    """

    self.tasks = tasks
    self.batch = batch
    self.workers = workers
    self.name = name

    # Set default number of executor workers to max number of actions in a task
    self.workers = max(len(task.action) for task in self.tasks) if not self.workers else self.workers

__call__(self, elements) special

Executes a workflow for input elements. This method returns a generator that yields transformed data elements.

Parameters:

Name Type Description Default
elements

iterable data elements

required

Returns:

Type Description

generator that yields transformed data elements

Source code in txtai/workflow/base.py
def __call__(self, elements):
    """
    Executes a workflow for input elements. This method returns a generator that yields transformed
    data elements.

    Args:
        elements: iterable data elements

    Returns:
        generator that yields transformed data elements
    """

    # Create execute instance for this run
    with Execute(self.workers) as executor:
        # Run task initializers
        self.initialize()

        # Process elements in batches
        for batch in self.chunk(elements):
            yield from self.process(batch, executor)

        # Run task finalizers
        self.finalize()

schedule(self, cron, elements, iterations=None)

Schedules a workflow using a cron expression and elements.

Parameters:

Name Type Description Default
cron

cron expression

required
elements

iterable data elements passed to workflow each call

required
iterations

number of times to run workflow, defaults to run indefinitely

None
Source code in txtai/workflow/base.py
def schedule(self, cron, elements, iterations=None):
    """
    Schedules a workflow using a cron expression and elements.

    Args:
        cron: cron expression
        elements: iterable data elements passed to workflow each call
        iterations: number of times to run workflow, defaults to run indefinitely
    """

    # Check that croniter is installed
    if not CRONITER:
        raise ImportError('Workflow scheduling is not available - install "workflow" extra to enable')

    logger.info("'%s' scheduler started with schedule %s", self.name, cron)

    maxiterations = iterations
    while iterations is None or iterations > 0:
        # Schedule using localtime
        schedule = croniter(cron, datetime.now().astimezone()).get_next(datetime)
        logger.info("'%s' next run scheduled for %s", self.name, schedule.isoformat())
        time.sleep(schedule.timestamp() - time.time())

        # Run workflow
        # pylint: disable=W0703
        try:
            for _ in self(elements):
                pass
        except Exception:
            logger.error(traceback.format_exc())

        # Decrement iterations remaining, if necessary
        if iterations is not None:
            iterations -= 1

    logger.info("'%s' max iterations (%d) reached", self.name, maxiterations)

More examples

See this link for a full list of workflow examples.