跳过内容

文件任务

task task

文件任务验证文件是否存在。它处理文件路径和本地文件 URL。请注意,此任务适用于本地文件。

示例

以下是一个使用此任务作为工作流一部分的简单示例。

from txtai.workflow import FileTask, Workflow

workflow = Workflow([FileTask()])
workflow(["/path/to/file", "file:///path/to/file"])

配置驱动示例

此任务也可以通过工作流配置创建。

workflow:
  tasks:
    - task: file

方法

此任务的 Python 文档。

__init__(action=None, select=None, unpack=True, column=None, merge='hstack', initialize=None, finalize=None, concurrency=None, onetomany=True, **kwargs)

创建一个新任务。任务定义了两种方法:接受的数据类型以及针对每个数据元素执行的操作。操作是一个可调用函数或可调用函数列表。

参数

名称 类型 描述 默认值
action

对每个数据元素执行的操作

None
select

用于选择要处理的数据的过滤器

None
unpack

是否应从 (id, data, tag) 元组中解包或展开数据元素

True
column

如果元素是元组,要选择的列索引,默认为全部

None
merge

用于连接多操作输出的合并模式,默认为 hstack

'hstack'
initialize

处理前执行的操作

None
finalize

处理后执行的操作

None
concurrency

设置 execute 实例可用时的并发方法,有效值:“thread”表示基于线程的并发,“process”表示基于进程的并发

None
onetomany

是否应启用一对多数据转换,默认为 True

True
kwargs

附加关键字参数

{}
源代码位于 txtai/workflow/task/base.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(
    self,
    action=None,
    select=None,
    unpack=True,
    column=None,
    merge="hstack",
    initialize=None,
    finalize=None,
    concurrency=None,
    onetomany=True,
    **kwargs,
):
    """
    Creates a new task. A task defines two methods, type of data it accepts and the action to execute
    for each data element. Action is a callable function or list of callable functions.

    Args:
        action: action(s) to execute on each data element
        select: filter(s) used to select data to process
        unpack: if data elements should be unpacked or unwrapped from (id, data, tag) tuples
        column: column index to select if element is a tuple, defaults to all
        merge: merge mode for joining multi-action outputs, defaults to hstack
        initialize: action to execute before processing
        finalize: action to execute after processing
        concurrency: sets concurrency method when execute instance available
                     valid values: "thread" for thread-based concurrency, "process" for process-based concurrency
        onetomany: if one-to-many data transformations should be enabled, defaults to True
        kwargs: additional keyword arguments
    """

    # Standardize into list of actions
    if not action:
        action = []
    elif not isinstance(action, list):
        action = [action]

    self.action = action
    self.select = select
    self.unpack = unpack
    self.column = column
    self.merge = merge
    self.initialize = initialize
    self.finalize = finalize
    self.concurrency = concurrency
    self.onetomany = onetomany

    # Check for custom registration. Adds additional instance members and validates required dependencies available.
    if hasattr(self, "register"):
        self.register(**kwargs)
    elif kwargs:
        # Raise error if additional keyword arguments passed in without register method
        kwargs = ", ".join(f"'{kw}'" for kw in kwargs)
        raise TypeError(f"__init__() got unexpected keyword arguments: {kwargs}")