任务
工作流执行任务。任务是可调用对象,带有许多参数来控制给定步骤的数据处理。虽然与流水线相似,但任务封装了处理过程,本身不执行重大转换。任务执行逻辑来为底层动作准备内容。
下面展示一个简单的任务。
Task(lambda x: [y * 2 for y in x])
上面的任务对所有输入元素执行上面的函数。
任务与流水线配合良好,因为流水线是可调用对象。下面的例子将对每个输入元素进行摘要。
summary = Summary()
Task(summary)
任务可以独立运行,但与工作流配合最佳,因为工作流增加了大规模流处理能力。
summary = Summary()
task = Task(summary)
task(["Very long text here"])
workflow = Workflow([task])
list(workflow(["Very long text here"]))
任务也可以作为工作流的一部分通过配置创建。
workflow:
tasks:
- action: summary
__init__(action=None, select=None, unpack=True, column=None, merge='hstack', initialize=None, finalize=None, concurrency=None, onetomany=True, **kwargs)
创建一个新任务。任务定义了两个方法:它接受的数据类型和对每个数据元素执行的动作。动作可以是可调用函数或可调用函数列表。
参数
名称 | 类型 | 描述 | 默认值 |
---|---|---|---|
action
|
对每个数据元素执行的动作 |
None
|
|
select
|
用于选择要处理的数据的过滤器 |
None
|
|
unpack
|
是否应该将数据元素从 (id, data, tag) 元组中解包或展开 |
True
|
|
column
|
如果元素是元组,要选择的列索引,默认为全部 |
None
|
|
merge
|
用于连接多动作输出的合并模式,默认为 hstack |
'hstack'
|
|
initialize
|
在处理前执行的动作 |
None
|
|
finalize
|
在处理后执行的动作 |
None
|
|
concurrency
|
设置执行实例可用时的并发方法,有效值:"thread" 表示基于线程的并发,"process" 表示基于进程的并发 |
None
|
|
onetomany
|
是否启用一对多数据转换,默认为 True |
True
|
|
kwargs
|
额外的关键字参数 |
{}
|
源代码位于 txtai/workflow/task/base.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
|
多动作任务并发
默认的处理模式是顺序执行动作。多进程支持已在多个层面内置。例如,任何 GPU 模型都将最大限度地提高 GPU 利用率,即使在 CPU 模式下,也利用了并发。但任务动作并发仍有一些用例。例如,如果系统有多个 GPU,任务运行外部顺序代码,或者任务有大量 I/O 任务。
除了顺序处理之外,多动作任务可以以多线程或多进程方式运行。下面讨论每种方法的优势。
-
多线程 - 没有创建独立进程或数据序列化的开销。但由于 GIL 的存在,Python 只能执行一个线程,因此这种方法对 CPU 密集型动作没有帮助。这种方法适用于 I/O 密集型动作和 GPU 动作。
-
多进程 - 创建独立的子进程,并通过序列化交换数据。由于每个进程独立运行,这种方法可以充分利用所有 CPU 核心。这种方法适用于 CPU 密集型动作。
有关多进程的更多信息可以在 Python 文档中找到。
多动作任务合并
多动作任务将为输入数据生成并行输出。任务输出可以通过几种不同的方式合并。
hstack(outputs)
按列合并输出。返回一个元组列表,这将解释为一对一转换。
按列合并示例 (2 个动作)
输入: [a, b, c]
输出 => [[a1, b1, c1], [a2, b2, c2]]
按列合并 => [(a1, a2), (b1, b2), (c1, c2)]
参数
名称 | 类型 | 描述 | 默认值 |
---|---|---|---|
outputs
|
任务输出 |
必需 |
返回
类型 | 描述 |
---|---|
聚合/打包后的输出列表,作为元组(按列) |
源代码位于 txtai/workflow/task/base.py
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 |
|
vstack(outputs)
按行合并输出。返回一个列表的列表,这将解释为一对多转换。
按行合并示例 (2 个动作)
输入: [a, b, c]
输出 => [[a1, b1, c1], [a2, b2, c2]]
按行合并 => [[a1, a2], [b1, b2], [c1, c2]] = [a1, a2, b1, b2, c1, c2]
参数
名称 | 类型 | 描述 | 默认值 |
---|---|---|---|
outputs
|
任务输出 |
必需 |
返回
类型 | 描述 |
---|---|
聚合/打包后的输出列表,作为一对多转换(按行) |
源代码位于 txtai/workflow/task/base.py
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 |
|
concat(outputs)
按列合并输出,并将值串联成一个字符串。返回一个字符串列表。
串联合并示例 (2 个动作)
输入: [a, b, c]
输出 => [[a1, b1, c1], [a2, b2, c2]]
串联合并 => [(a1, a2), (b1, b2), (c1, c2)] => ["a1. a2", "b1. b2", "c1. c2"]
参数
名称 | 类型 | 描述 | 默认值 |
---|---|---|---|
outputs
|
任务输出 |
必需 |
返回
类型 | 描述 |
---|---|
串联输出列表 |
源代码位于 txtai/workflow/task/base.py
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 |
|
提取任务输出列
通过按列合并,每个输出行将是每个任务动作的输出值组成的元组。这可以作为输入馈送到下游任务,并且该任务可以有单独的任务处理每个元素。
一个简单示例
workflow = Workflow([Task(lambda x: [y * 3 for y in x], unpack=False, column=0)])
list(workflow([(2, 8)]))
对于输入的示例元组 (2, 2),工作流将仅选择第一个元素 (2) 并针对该元素运行任务。
workflow = Workflow([Task([lambda x: [y * 3 for y in x],
lambda x: [y - 1 for y in x]],
unpack=False, column={0:0, 1:1})])
list(workflow([(2, 8)]))
上面的示例对每个输入列应用了一个单独的动作。这种简单的结构可以帮助构建极其强大的工作流图!