跳到内容

任务

task task

工作流执行任务。任务是可调用对象,带有许多参数来控制给定步骤的数据处理。虽然与流水线相似,但任务封装了处理过程,本身不执行重大转换。任务执行逻辑来为底层动作准备内容。

下面展示一个简单的任务。

Task(lambda x: [y * 2 for y in x])

上面的任务对所有输入元素执行上面的函数。

任务与流水线配合良好,因为流水线是可调用对象。下面的例子将对每个输入元素进行摘要。

summary = Summary()
Task(summary)

任务可以独立运行,但与工作流配合最佳,因为工作流增加了大规模流处理能力。

summary = Summary()
task = Task(summary)
task(["Very long text here"])

workflow = Workflow([task])
list(workflow(["Very long text here"]))

任务也可以作为工作流的一部分通过配置创建。

workflow:
  tasks:
    - action: summary 

__init__(action=None, select=None, unpack=True, column=None, merge='hstack', initialize=None, finalize=None, concurrency=None, onetomany=True, **kwargs)

创建一个新任务。任务定义了两个方法:它接受的数据类型和对每个数据元素执行的动作。动作可以是可调用函数或可调用函数列表。

参数

名称 类型 描述 默认值
action

对每个数据元素执行的动作

None
select

用于选择要处理的数据的过滤器

None
unpack

是否应该将数据元素从 (id, data, tag) 元组中解包或展开

True
column

如果元素是元组,要选择的列索引,默认为全部

None
merge

用于连接多动作输出的合并模式,默认为 hstack

'hstack'
initialize

在处理前执行的动作

None
finalize

在处理后执行的动作

None
concurrency

设置执行实例可用时的并发方法,有效值:"thread" 表示基于线程的并发,"process" 表示基于进程的并发

None
onetomany

是否启用一对多数据转换,默认为 True

True
kwargs

额外的关键字参数

{}
源代码位于 txtai/workflow/task/base.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(
    self,
    action=None,
    select=None,
    unpack=True,
    column=None,
    merge="hstack",
    initialize=None,
    finalize=None,
    concurrency=None,
    onetomany=True,
    **kwargs,
):
    """
    Creates a new task. A task defines two methods, type of data it accepts and the action to execute
    for each data element. Action is a callable function or list of callable functions.

    Args:
        action: action(s) to execute on each data element
        select: filter(s) used to select data to process
        unpack: if data elements should be unpacked or unwrapped from (id, data, tag) tuples
        column: column index to select if element is a tuple, defaults to all
        merge: merge mode for joining multi-action outputs, defaults to hstack
        initialize: action to execute before processing
        finalize: action to execute after processing
        concurrency: sets concurrency method when execute instance available
                     valid values: "thread" for thread-based concurrency, "process" for process-based concurrency
        onetomany: if one-to-many data transformations should be enabled, defaults to True
        kwargs: additional keyword arguments
    """

    # Standardize into list of actions
    if not action:
        action = []
    elif not isinstance(action, list):
        action = [action]

    self.action = action
    self.select = select
    self.unpack = unpack
    self.column = column
    self.merge = merge
    self.initialize = initialize
    self.finalize = finalize
    self.concurrency = concurrency
    self.onetomany = onetomany

    # Check for custom registration. Adds additional instance members and validates required dependencies available.
    if hasattr(self, "register"):
        self.register(**kwargs)
    elif kwargs:
        # Raise error if additional keyword arguments passed in without register method
        kwargs = ", ".join(f"'{kw}'" for kw in kwargs)
        raise TypeError(f"__init__() got unexpected keyword arguments: {kwargs}")

多动作任务并发

默认的处理模式是顺序执行动作。多进程支持已在多个层面内置。例如,任何 GPU 模型都将最大限度地提高 GPU 利用率,即使在 CPU 模式下,也利用了并发。但任务动作并发仍有一些用例。例如,如果系统有多个 GPU,任务运行外部顺序代码,或者任务有大量 I/O 任务。

除了顺序处理之外,多动作任务可以以多线程或多进程方式运行。下面讨论每种方法的优势。

  • 多线程 - 没有创建独立进程或数据序列化的开销。但由于 GIL 的存在,Python 只能执行一个线程,因此这种方法对 CPU 密集型动作没有帮助。这种方法适用于 I/O 密集型动作和 GPU 动作。

  • 多进程 - 创建独立的子进程,并通过序列化交换数据。由于每个进程独立运行,这种方法可以充分利用所有 CPU 核心。这种方法适用于 CPU 密集型动作。

有关多进程的更多信息可以在 Python 文档中找到。

多动作任务合并

多动作任务将为输入数据生成并行输出。任务输出可以通过几种不同的方式合并。

hstack(outputs)

按列合并输出。返回一个元组列表,这将解释为一对一转换。

按列合并示例 (2 个动作)

输入: [a, b, c]

输出 => [[a1, b1, c1], [a2, b2, c2]]

按列合并 => [(a1, a2), (b1, b2), (c1, c2)]

参数

名称 类型 描述 默认值
outputs

任务输出

必需

返回

类型 描述

聚合/打包后的输出列表,作为元组(按列)

源代码位于 txtai/workflow/task/base.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def hstack(self, outputs):
    """
    Merges outputs column-wise. Returns a list of tuples which will be interpreted as a one to one transformation.

    Column-wise merge example (2 actions)

      Inputs: [a, b, c]

      Outputs => [[a1, b1, c1], [a2, b2, c2]]

      Column Merge => [(a1, a2), (b1, b2), (c1, c2)]

    Args:
        outputs: task outputs

    Returns:
        list of aggregated/zipped outputs as tuples (column-wise)
    """

    # If all outputs are numpy arrays, use native method
    if all(isinstance(output, np.ndarray) for output in outputs):
        return np.stack(outputs, axis=1)

    # If all outputs are torch tensors, use native method
    # pylint: disable=E1101
    if all(torch.is_tensor(output) for output in outputs):
        return torch.stack(outputs, axis=1)

    return list(zip(*outputs))

vstack(outputs)

按行合并输出。返回一个列表的列表,这将解释为一对多转换。

按行合并示例 (2 个动作)

输入: [a, b, c]

输出 => [[a1, b1, c1], [a2, b2, c2]]

按行合并 => [[a1, a2], [b1, b2], [c1, c2]] = [a1, a2, b1, b2, c1, c2]

参数

名称 类型 描述 默认值
outputs

任务输出

必需

返回

类型 描述

聚合/打包后的输出列表,作为一对多转换(按行)

源代码位于 txtai/workflow/task/base.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def vstack(self, outputs):
    """
    Merges outputs row-wise. Returns a list of lists which will be interpreted as a one to many transformation.

    Row-wise merge example (2 actions)

      Inputs: [a, b, c]

      Outputs => [[a1, b1, c1], [a2, b2, c2]]

      Row Merge => [[a1, a2], [b1, b2], [c1, c2]] = [a1, a2, b1, b2, c1, c2]

    Args:
        outputs: task outputs

    Returns:
        list of aggregated/zipped outputs as one to many transforms (row-wise)
    """

    # If all outputs are numpy arrays, use native method
    if all(isinstance(output, np.ndarray) for output in outputs):
        return np.concatenate(np.stack(outputs, axis=1))

    # If all outputs are torch tensors, use native method
    # pylint: disable=E1101
    if all(torch.is_tensor(output) for output in outputs):
        return torch.cat(tuple(torch.stack(outputs, axis=1)))

    # Flatten into lists of outputs per input row. Wrap as one to many transformation.
    merge = []
    for x in zip(*outputs):
        combine = []
        for y in x:
            if isinstance(y, list):
                combine.extend(y)
            else:
                combine.append(y)

        merge.append(OneToMany(combine))

    return merge

concat(outputs)

按列合并输出,并将值串联成一个字符串。返回一个字符串列表。

串联合并示例 (2 个动作)

输入: [a, b, c]

输出 => [[a1, b1, c1], [a2, b2, c2]]

串联合并 => [(a1, a2), (b1, b2), (c1, c2)] => ["a1. a2", "b1. b2", "c1. c2"]

参数

名称 类型 描述 默认值
outputs

任务输出

必需

返回

类型 描述

串联输出列表

源代码位于 txtai/workflow/task/base.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def concat(self, outputs):
    """
    Merges outputs column-wise and concats values together into a string. Returns a list of strings.

    Concat merge example (2 actions)

      Inputs: [a, b, c]

      Outputs => [[a1, b1, c1], [a2, b2, c2]]

      Concat Merge => [(a1, a2), (b1, b2), (c1, c2)] => ["a1. a2", "b1. b2", "c1. c2"]

    Args:
        outputs: task outputs

    Returns:
        list of concat outputs
    """

    return [". ".join([str(y) for y in x if y]) for x in self.hstack(outputs)]

提取任务输出列

通过按列合并,每个输出行将是每个任务动作的输出值组成的元组。这可以作为输入馈送到下游任务,并且该任务可以有单独的任务处理每个元素。

一个简单示例

workflow = Workflow([Task(lambda x: [y * 3 for y in x], unpack=False, column=0)])
list(workflow([(2, 8)]))

对于输入的示例元组 (2, 2),工作流将仅选择第一个元素 (2) 并针对该元素运行任务。

workflow = Workflow([Task([lambda x: [y * 3 for y in x], 
                           lambda x: [y - 1 for y in x]],
                           unpack=False, column={0:0, 1:1})])
list(workflow([(2, 8)]))

上面的示例对每个输入列应用了一个单独的动作。这种简单的结构可以帮助构建极其强大的工作流图!