跳到内容

服务任务

task task

服务任务从 HTTP 服务提取内容。

示例

以下展示了将此任务作为工作流一部分的简单示例。

from txtai.workflow import ServiceTask, Workflow

workflow = Workflow([ServiceTask(url="https://service.url/action)])
workflow(["parameter"])

配置驱动的示例

此任务也可以通过工作流配置创建。

workflow:
  tasks:
    - task: service
      url: https://service.url/action

方法

此任务的 Python 文档。

__init__(action=None, select=None, unpack=True, column=None, merge='hstack', initialize=None, finalize=None, concurrency=None, onetomany=True, **kwargs)

创建一个新任务。任务定义了两个方法、接受的数据类型以及对每个数据元素执行的操作。操作可以是可调用函数或可调用函数列表。

参数

名称 类型 描述 默认值
action

对每个数据元素执行的操作

None
select

用于选择要处理数据的过滤器

None
unpack

是否应从 (id, data, tag) 元组中解包或展开数据元素

True
column

如果元素是元组,要选择的列索引,默认为全部

None
merge

用于连接多操作输出的合并模式,默认为 hstack

'hstack'
initialize

处理前执行的操作

None
finalize

处理后执行的操作

None
concurrency

设置执行实例可用时的并发方法,有效值:"thread" 表示基于线程的并发,"process" 表示基于进程的并发

None
onetomany

是否启用一对多数据转换,默认为 True

True
kwargs

附加关键字参数

{}
源代码位于 txtai/workflow/task/base.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(
    self,
    action=None,
    select=None,
    unpack=True,
    column=None,
    merge="hstack",
    initialize=None,
    finalize=None,
    concurrency=None,
    onetomany=True,
    **kwargs,
):
    """
    Creates a new task. A task defines two methods, type of data it accepts and the action to execute
    for each data element. Action is a callable function or list of callable functions.

    Args:
        action: action(s) to execute on each data element
        select: filter(s) used to select data to process
        unpack: if data elements should be unpacked or unwrapped from (id, data, tag) tuples
        column: column index to select if element is a tuple, defaults to all
        merge: merge mode for joining multi-action outputs, defaults to hstack
        initialize: action to execute before processing
        finalize: action to execute after processing
        concurrency: sets concurrency method when execute instance available
                     valid values: "thread" for thread-based concurrency, "process" for process-based concurrency
        onetomany: if one-to-many data transformations should be enabled, defaults to True
        kwargs: additional keyword arguments
    """

    # Standardize into list of actions
    if not action:
        action = []
    elif not isinstance(action, list):
        action = [action]

    self.action = action
    self.select = select
    self.unpack = unpack
    self.column = column
    self.merge = merge
    self.initialize = initialize
    self.finalize = finalize
    self.concurrency = concurrency
    self.onetomany = onetomany

    # Check for custom registration. Adds additional instance members and validates required dependencies available.
    if hasattr(self, "register"):
        self.register(**kwargs)
    elif kwargs:
        # Raise error if additional keyword arguments passed in without register method
        kwargs = ", ".join(f"'{kw}'" for kw in kwargs)
        raise TypeError(f"__init__() got unexpected keyword arguments: {kwargs}")

register(url=None, method=None, params=None, batch=True, extract=None)

将服务参数添加到任务。检查是否已安装所需的依赖项。

参数

名称 类型 描述 默认值
url

要连接的 url

None
method

http 方法,GET 或 POST

None
params

默认查询参数

None
batch

如果为 True,所有元素都在单个批量请求中传递,否则对每个元素执行一次服务调用

True
extract

从响应中提取的 sections 列表

None
源代码位于 txtai/workflow/task/service.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def register(self, url=None, method=None, params=None, batch=True, extract=None):
    """
    Adds service parameters to task. Checks if required dependencies are installed.

    Args:
        url: url to connect to
        method: http method, GET or POST
        params: default query parameters
        batch: if True, all elements are passed in a single batch request, otherwise a service call is executed per element
        extract: list of sections to extract from response
    """

    if not XML_TO_DICT:
        raise ImportError('ServiceTask is not available - install "workflow" extra to enable')

    # pylint: disable=W0201
    # Save URL, method and parameter defaults
    self.url = url
    self.method = method
    self.params = params

    # If True, all elements are passed in a single batch request, otherwise a service call is executed per element
    self.batch = batch

    # Save sections to extract. Supports both a single string and a hierarchical list of sections.
    self.extract = extract
    if self.extract:
        self.extract = [self.extract] if isinstance(self.extract, str) else self.extract