Skip to content

Tasks

task task

Workflows execute tasks. Tasks are callable objects with a number of parameters to control the processing of data at a given step. While similar to pipelines, tasks encapsulate processing and don't perform signficant transformations on their own. Tasks perform logic to prepare content for the underlying action(s).

A simple task is shown below.

Task(lambda x: [y * 2 for y in x])

The task above executes the function above for all input elements.

Tasks work well with pipelines, since pipelines are callable objects. The example below will summarize each input element.

summary = Summary()
Task(summary)

Tasks can operate independently but work best with workflows, as workflows add large-scale stream processing.

summary = Summary()
task = Task(summary)
task(["Very long text here"])

workflow = Workflow([task])
list(workflow(["Very long text here"]))

Tasks can also be created with configuration as part of a workflow.

workflow:
  tasks:
    - action: summary 

Creates a new task. A task defines two methods, type of data it accepts and the action to execute for each data element. Action is a callable function or list of callable functions.

Parameters:

Name Type Description Default
action

action(s) to execute on each data element

None
select

filter(s) used to select data to process

None
unpack

if data elements should be unpacked or unwrapped from (id, data, tag) tuples

True
column

column index to select if element is a tuple, defaults to all

None
merge

merge mode for joining multi-action outputs, defaults to hstack

'hstack'
initialize

action to execute before processing

None
finalize

action to execute after processing

None
concurrency

sets concurrency method when execute instance available valid values: "thread" for thread-based concurrency, "process" for process-based concurrency

None
onetomany

if one-to-many data transformations should be enabled, defaults to True

True
kwargs

additional keyword arguments

{}
Source code in txtai/workflow/task/base.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(
    self,
    action=None,
    select=None,
    unpack=True,
    column=None,
    merge="hstack",
    initialize=None,
    finalize=None,
    concurrency=None,
    onetomany=True,
    **kwargs,
):
    """
    Creates a new task. A task defines two methods, type of data it accepts and the action to execute
    for each data element. Action is a callable function or list of callable functions.

    Args:
        action: action(s) to execute on each data element
        select: filter(s) used to select data to process
        unpack: if data elements should be unpacked or unwrapped from (id, data, tag) tuples
        column: column index to select if element is a tuple, defaults to all
        merge: merge mode for joining multi-action outputs, defaults to hstack
        initialize: action to execute before processing
        finalize: action to execute after processing
        concurrency: sets concurrency method when execute instance available
                     valid values: "thread" for thread-based concurrency, "process" for process-based concurrency
        onetomany: if one-to-many data transformations should be enabled, defaults to True
        kwargs: additional keyword arguments
    """

    # Standardize into list of actions
    if not action:
        action = []
    elif not isinstance(action, list):
        action = [action]

    self.action = action
    self.select = select
    self.unpack = unpack
    self.column = column
    self.merge = merge
    self.initialize = initialize
    self.finalize = finalize
    self.concurrency = concurrency
    self.onetomany = onetomany

    # Check for custom registration. Adds additional instance members and validates required dependencies available.
    if hasattr(self, "register"):
        self.register(**kwargs)
    elif kwargs:
        # Raise error if additional keyword arguments passed in without register method
        kwargs = ", ".join(f"'{kw}'" for kw in kwargs)
        raise TypeError(f"__init__() got unexpected keyword arguments: {kwargs}")

Multi-action task concurrency

The default processing mode is to run actions sequentially. Multiprocessing support is already built in at a number of levels. Any of the GPU models will maximize GPU utilization for example and even in CPU mode, concurrency is utilized. But there are still use cases for task action concurrency. For example, if the system has multiple GPUs, the task runs external sequential code, or the task has a large number of I/O tasks.

In addition to sequential processing, multi-action tasks can run either multithreaded or with multiple processes. The advantages of each approach are discussed below.

  • multithreading - no overhead of creating separate processes or pickling data. But Python can only execute a single thread due the GIL, so this approach won't help with CPU bound actions. This method works well with I/O bound actions and GPU actions.

  • multiprocessing - separate subprocesses are created and data is exchanged via pickling. This method can fully utilize all CPU cores since each process runs independently. This method works well with CPU bound actions.

More information on multiprocessing can be found in the Python documentation.

Multi-action task merges

Multi-action tasks will generate parallel outputs for the input data. The task output can be merged together in a couple different ways.

Merges outputs column-wise. Returns a list of tuples which will be interpreted as a one to one transformation.

Column-wise merge example (2 actions)

Inputs: [a, b, c]

Outputs => [[a1, b1, c1], [a2, b2, c2]]

Column Merge => [(a1, a2), (b1, b2), (c1, c2)]

Parameters:

Name Type Description Default
outputs

task outputs

required

Returns:

Type Description

list of aggregated/zipped outputs as tuples (column-wise)

Source code in txtai/workflow/task/base.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def hstack(self, outputs):
    """
    Merges outputs column-wise. Returns a list of tuples which will be interpreted as a one to one transformation.

    Column-wise merge example (2 actions)

      Inputs: [a, b, c]

      Outputs => [[a1, b1, c1], [a2, b2, c2]]

      Column Merge => [(a1, a2), (b1, b2), (c1, c2)]

    Args:
        outputs: task outputs

    Returns:
        list of aggregated/zipped outputs as tuples (column-wise)
    """

    # If all outputs are numpy arrays, use native method
    if all(isinstance(output, np.ndarray) for output in outputs):
        return np.stack(outputs, axis=1)

    # If all outputs are torch tensors, use native method
    # pylint: disable=E1101
    if all(torch.is_tensor(output) for output in outputs):
        return torch.stack(outputs, axis=1)

    return list(zip(*outputs))

Merges outputs row-wise. Returns a list of lists which will be interpreted as a one to many transformation.

Row-wise merge example (2 actions)

Inputs: [a, b, c]

Outputs => [[a1, b1, c1], [a2, b2, c2]]

Row Merge => [[a1, a2], [b1, b2], [c1, c2]] = [a1, a2, b1, b2, c1, c2]

Parameters:

Name Type Description Default
outputs

task outputs

required

Returns:

Type Description

list of aggregated/zipped outputs as one to many transforms (row-wise)

Source code in txtai/workflow/task/base.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def vstack(self, outputs):
    """
    Merges outputs row-wise. Returns a list of lists which will be interpreted as a one to many transformation.

    Row-wise merge example (2 actions)

      Inputs: [a, b, c]

      Outputs => [[a1, b1, c1], [a2, b2, c2]]

      Row Merge => [[a1, a2], [b1, b2], [c1, c2]] = [a1, a2, b1, b2, c1, c2]

    Args:
        outputs: task outputs

    Returns:
        list of aggregated/zipped outputs as one to many transforms (row-wise)
    """

    # If all outputs are numpy arrays, use native method
    if all(isinstance(output, np.ndarray) for output in outputs):
        return np.concatenate(np.stack(outputs, axis=1))

    # If all outputs are torch tensors, use native method
    # pylint: disable=E1101
    if all(torch.is_tensor(output) for output in outputs):
        return torch.cat(tuple(torch.stack(outputs, axis=1)))

    # Flatten into lists of outputs per input row. Wrap as one to many transformation.
    merge = []
    for x in zip(*outputs):
        combine = []
        for y in x:
            if isinstance(y, list):
                combine.extend(y)
            else:
                combine.append(y)

        merge.append(OneToMany(combine))

    return merge

Merges outputs column-wise and concats values together into a string. Returns a list of strings.

Concat merge example (2 actions)

Inputs: [a, b, c]

Outputs => [[a1, b1, c1], [a2, b2, c2]]

Concat Merge => [(a1, a2), (b1, b2), (c1, c2)] => ["a1. a2", "b1. b2", "c1. c2"]

Parameters:

Name Type Description Default
outputs

task outputs

required

Returns:

Type Description

list of concat outputs

Source code in txtai/workflow/task/base.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def concat(self, outputs):
    """
    Merges outputs column-wise and concats values together into a string. Returns a list of strings.

    Concat merge example (2 actions)

      Inputs: [a, b, c]

      Outputs => [[a1, b1, c1], [a2, b2, c2]]

      Concat Merge => [(a1, a2), (b1, b2), (c1, c2)] => ["a1. a2", "b1. b2", "c1. c2"]

    Args:
        outputs: task outputs

    Returns:
        list of concat outputs
    """

    return [". ".join([str(y) for y in x if y]) for x in self.hstack(outputs)]

Extract task output columns

With column-wise merging, each output row will be a tuple of output values for each task action. This can be fed as input to a downstream task and that task can have separate tasks work with each element.

A simple example:

workflow = Workflow([Task(lambda x: [y * 3 for y in x], unpack=False, column=0)])
list(workflow([(2, 8)]))

For the example input tuple of (2, 2), the workflow will only select the first element (2) and run the task against that element.

workflow = Workflow([Task([lambda x: [y * 3 for y in x], 
                           lambda x: [y - 1 for y in x]],
                           unpack=False, column={0:0, 1:1})])
list(workflow([(2, 8)]))

The example above applies a separate action to each input column. This simple construct can help build extremely powerful workflow graphs!