跳到内容

存储任务

task task

存储任务将本地目录或云存储桶扩展为要处理的 URL 列表。

示例

以下是一个简单示例,展示如何将此任务用作工作流的一部分。

from txtai.workflow import StorageTask, Workflow

workflow = Workflow([StorageTask()])
workflow(["s3://path/to/bucket", "local://local/directory"])

配置驱动的示例

此任务也可以通过工作流配置创建。

workflow:
  tasks:
    - task: storage

方法

该任务的 Python 文档。

__init__(action=None, select=None, unpack=True, column=None, merge='hstack', initialize=None, finalize=None, concurrency=None, onetomany=True, **kwargs)

创建一个新任务。任务定义了两种方法:它接受的数据类型以及对每个数据元素执行的操作。操作是可调用函数或可调用函数列表。

参数

名称 类型 描述 默认值
action

对每个数据元素执行的操作

None
select

用于选择要处理的数据的过滤器

None
unpack

数据元素是否应从 (id, data, tag) 元组中解包或展开

True
column

如果元素是元组,则选择的列索引,默认为全部

None
merge

用于连接多操作输出的合并模式,默认为 hstack

'hstack'
initialize

处理前执行的操作

None
finalize

处理后执行的操作

None
concurrency

设置 execute 实例可用时的并发方法,有效值:基于线程的并发为 "thread",基于进程的并发为 "process"

None
onetomany

是否启用一对多数据转换,默认为 True

True
kwargs

附加关键字参数

{}
源代码位于 txtai/workflow/task/base.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(
    self,
    action=None,
    select=None,
    unpack=True,
    column=None,
    merge="hstack",
    initialize=None,
    finalize=None,
    concurrency=None,
    onetomany=True,
    **kwargs,
):
    """
    Creates a new task. A task defines two methods, type of data it accepts and the action to execute
    for each data element. Action is a callable function or list of callable functions.

    Args:
        action: action(s) to execute on each data element
        select: filter(s) used to select data to process
        unpack: if data elements should be unpacked or unwrapped from (id, data, tag) tuples
        column: column index to select if element is a tuple, defaults to all
        merge: merge mode for joining multi-action outputs, defaults to hstack
        initialize: action to execute before processing
        finalize: action to execute after processing
        concurrency: sets concurrency method when execute instance available
                     valid values: "thread" for thread-based concurrency, "process" for process-based concurrency
        onetomany: if one-to-many data transformations should be enabled, defaults to True
        kwargs: additional keyword arguments
    """

    # Standardize into list of actions
    if not action:
        action = []
    elif not isinstance(action, list):
        action = [action]

    self.action = action
    self.select = select
    self.unpack = unpack
    self.column = column
    self.merge = merge
    self.initialize = initialize
    self.finalize = finalize
    self.concurrency = concurrency
    self.onetomany = onetomany

    # Check for custom registration. Adds additional instance members and validates required dependencies available.
    if hasattr(self, "register"):
        self.register(**kwargs)
    elif kwargs:
        # Raise error if additional keyword arguments passed in without register method
        kwargs = ", ".join(f"'{kw}'" for kw in kwargs)
        raise TypeError(f"__init__() got unexpected keyword arguments: {kwargs}")