Skip to content

Retrieve Task

task task

The Retrieve Task connects to a url and downloads the content locally. This task is helpful when working with actions that require data to be available locally.

Example

The following shows a simple example using this task as part of a workflow.

from txtai.workflow import RetrieveTask, Workflow

workflow = Workflow([RetrieveTask(directory="/tmp")])
workflow(["https://file.to.download", "/local/file/to/copy"])

Configuration-driven example

This task can also be created with workflow configuration.

workflow:
  tasks:
    - task: retrieve
      directory: /tmp

Methods

Python documentation for the task.

__init__(self, action=None, select=None, unpack=True, column=None, merge='hstack', initialize=None, finalize=None, concurrency=None, onetomany=True, **kwargs) special

Source code in txtai/workflow/task/base.py
def __init__(
    self,
    action=None,
    select=None,
    unpack=True,
    column=None,
    merge="hstack",
    initialize=None,
    finalize=None,
    concurrency=None,
    onetomany=True,
    **kwargs,
):
    """
    Creates a new task. A task defines two methods, type of data it accepts and the action to execute
    for each data element. Action is a callable function or list of callable functions.

    Args:
        action: action(s) to execute on each data element
        select: filter(s) used to select data to process
        unpack: if data elements should be unpacked or unwrapped from (id, data, tag) tuples
        column: column index to select if element is a tuple, defaults to all
        merge: merge mode for joining multi-action outputs, defaults to hstack
        initialize: action to execute before processing
        finalize: action to execute after processing
        concurrency: sets concurrency method when execute instance available
                     valid values: "thread" for thread-based concurrency, "process" for process-based concurrency
        onetomany: if one-to-many data transformations should be enabled, defaults to True
        kwargs: additional keyword arguments
    """

    # Standardize into list of actions
    if not action:
        action = []
    elif not isinstance(action, list):
        action = [action]

    self.action = action
    self.select = select
    self.unpack = unpack
    self.column = column
    self.merge = merge
    self.initialize = initialize
    self.finalize = finalize
    self.concurrency = concurrency
    self.onetomany = onetomany

    # Check for custom registration. Adds additional instance members and validates required dependencies available.
    if hasattr(self, "register"):
        self.register(**kwargs)
    elif kwargs:
        # Raise error if additional keyword arguments passed in without register method
        kwargs = ", ".join(f"'{kw}'" for kw in kwargs)
        raise TypeError(f"__init__() got unexpected keyword arguments: {kwargs}")

register(self, directory=None)

Adds retrieve parameters to task.

Parameters:

Name Type Description Default
directory

local directory used to store retrieved files

None
Source code in txtai/workflow/task/retrieve.py
def register(self, directory=None):
    """
    Adds retrieve parameters to task.

    Args:
        directory: local directory used to store retrieved files
    """

    # pylint: disable=W0201
    # Create default temporary directory if not specified
    if not directory:
        # Save tempdir to prevent content from being deleted until this task is out of scope
        # pylint: disable=R1732
        self.tempdir = tempfile.TemporaryDirectory()
        directory = self.tempdir.name

    # Create output directory if necessary
    if not os.path.exists(directory):
        os.makedirs(directory)

    self.directory = directory