Tasks
Workflows execute tasks. Tasks are callable objects with a number of parameters to control the processing of data at a given step. While similar to pipelines, tasks encapsulate processing and don't perform signficant transformations on their own. Tasks perform logic to prepare content for the underlying action(s).
A simple task is shown below.
Task(lambda x: [y * 2 for y in x])
The task above executes the function above for all input elements.
Tasks work well with pipelines, since pipelines are callable objects. The example below will summarize each input element.
summary = Summary()
Task(summary)
Tasks can operate independently but work best with workflows, as workflows add large-scale stream processing.
summary = Summary()
task = Task(summary)
task(["Very long text here"])
workflow = Workflow([task])
list(workflow(["Very long text here"]))
Tasks can also be created with configuration as part of a workflow.
workflow:
tasks:
- action: summary
__init__(action=None, select=None, unpack=True, column=None, merge='hstack', initialize=None, finalize=None, concurrency=None, onetomany=True, **kwargs)
Creates a new task. A task defines two methods, type of data it accepts and the action to execute for each data element. Action is a callable function or list of callable functions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action
|
action(s) to execute on each data element |
None
|
|
select
|
filter(s) used to select data to process |
None
|
|
unpack
|
if data elements should be unpacked or unwrapped from (id, data, tag) tuples |
True
|
|
column
|
column index to select if element is a tuple, defaults to all |
None
|
|
merge
|
merge mode for joining multi-action outputs, defaults to hstack |
'hstack'
|
|
initialize
|
action to execute before processing |
None
|
|
finalize
|
action to execute after processing |
None
|
|
concurrency
|
sets concurrency method when execute instance available valid values: "thread" for thread-based concurrency, "process" for process-based concurrency |
None
|
|
onetomany
|
if one-to-many data transformations should be enabled, defaults to True |
True
|
|
kwargs
|
additional keyword arguments |
{}
|
Source code in txtai/workflow/task/base.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
|
Multi-action task concurrency
The default processing mode is to run actions sequentially. Multiprocessing support is already built in at a number of levels. Any of the GPU models will maximize GPU utilization for example and even in CPU mode, concurrency is utilized. But there are still use cases for task action concurrency. For example, if the system has multiple GPUs, the task runs external sequential code, or the task has a large number of I/O tasks.
In addition to sequential processing, multi-action tasks can run either multithreaded or with multiple processes. The advantages of each approach are discussed below.
-
multithreading - no overhead of creating separate processes or pickling data. But Python can only execute a single thread due the GIL, so this approach won't help with CPU bound actions. This method works well with I/O bound actions and GPU actions.
-
multiprocessing - separate subprocesses are created and data is exchanged via pickling. This method can fully utilize all CPU cores since each process runs independently. This method works well with CPU bound actions.
More information on multiprocessing can be found in the Python documentation.
Multi-action task merges
Multi-action tasks will generate parallel outputs for the input data. The task output can be merged together in a couple different ways.
hstack(outputs)
Merges outputs column-wise. Returns a list of tuples which will be interpreted as a one to one transformation.
Column-wise merge example (2 actions)
Inputs: [a, b, c]
Outputs => [[a1, b1, c1], [a2, b2, c2]]
Column Merge => [(a1, a2), (b1, b2), (c1, c2)]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs
|
task outputs |
required |
Returns:
Type | Description |
---|---|
list of aggregated/zipped outputs as tuples (column-wise) |
Source code in txtai/workflow/task/base.py
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 |
|
vstack(outputs)
Merges outputs row-wise. Returns a list of lists which will be interpreted as a one to many transformation.
Row-wise merge example (2 actions)
Inputs: [a, b, c]
Outputs => [[a1, b1, c1], [a2, b2, c2]]
Row Merge => [[a1, a2], [b1, b2], [c1, c2]] = [a1, a2, b1, b2, c1, c2]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs
|
task outputs |
required |
Returns:
Type | Description |
---|---|
list of aggregated/zipped outputs as one to many transforms (row-wise) |
Source code in txtai/workflow/task/base.py
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 |
|
concat(outputs)
Merges outputs column-wise and concats values together into a string. Returns a list of strings.
Concat merge example (2 actions)
Inputs: [a, b, c]
Outputs => [[a1, b1, c1], [a2, b2, c2]]
Concat Merge => [(a1, a2), (b1, b2), (c1, c2)] => ["a1. a2", "b1. b2", "c1. c2"]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs
|
task outputs |
required |
Returns:
Type | Description |
---|---|
list of concat outputs |
Source code in txtai/workflow/task/base.py
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 |
|
Extract task output columns
With column-wise merging, each output row will be a tuple of output values for each task action. This can be fed as input to a downstream task and that task can have separate tasks work with each element.
A simple example:
workflow = Workflow([Task(lambda x: [y * 3 for y in x], unpack=False, column=0)])
list(workflow([(2, 8)]))
For the example input tuple of (2, 2), the workflow will only select the first element (2) and run the task against that element.
workflow = Workflow([Task([lambda x: [y * 3 for y in x],
lambda x: [y - 1 for y in x]],
unpack=False, column={0:0, 1:1})])
list(workflow([(2, 8)]))
The example above applies a separate action to each input column. This simple construct can help build extremely powerful workflow graphs!