跳到内容

工作流

workflow workflow

工作流是一个简单而强大的构造,它接受一个可调用对象并返回元素。工作流与管道配合良好,但可以与任何可调用对象一起使用。工作流是流式的,按批次处理数据,从而可以高效地处理大量数据。

鉴于管道是可调用对象,工作流可以高效地处理管道数据。大型语言模型通常处理较小批量的数据,工作流非常适合向一系列 transformers 管道馈送数据。

最基本工作流的示例

workflow = Workflow([Task(lambda x: [y * 2 for y in x])])
list(workflow([1, 2, 3]))

此示例将每个输入值乘以 2,并通过生成器返回转换后的元素。

由于工作流作为生成器运行,因此必须消费其输出才能执行。以下代码片段展示了如何消费输出。

# Small dataset where output fits in memory
list(workflow(elements))

# Large dataset
for output in workflow(elements):
    function(output)

# Large dataset where output is discarded
for _ in workflow(elements):
    pass

工作流可以通过 Python 或配置运行。下面展示了这两种方法的示例。

示例

下面展示了一个功能齐全的 Python 示例。此工作流转录一组音频文件,将文本翻译成法语并索引数据。

from txtai import Embeddings
from txtai.pipeline import Transcription, Translation
from txtai.workflow import FileTask, Task, Workflow

# Embeddings instance
embeddings = Embeddings({
    "path": "sentence-transformers/paraphrase-MiniLM-L3-v2",
    "content": True
})

# Transcription instance
transcribe = Transcription()

# Translation instance
translate = Translation()

tasks = [
    FileTask(transcribe, r"\.wav$"),
    Task(lambda x: translate(x, "fr"))
]

# List of files to process
data = [
  "US_tops_5_million.wav",
  "Canadas_last_fully.wav",
  "Beijing_mobilises.wav",
  "The_National_Park.wav",
  "Maine_man_wins_1_mil.wav",
  "Make_huge_profits.wav"
]

# Workflow that translate text to French
workflow = Workflow(tasks)

# Index data
embeddings.index((uid, text, None) for uid, text in enumerate(workflow(data)))

# Search
embeddings.search("wildlife", 1)

配置驱动的示例

工作流也可以使用 YAML 配置定义。

writable: true
embeddings:
  path: sentence-transformers/paraphrase-MiniLM-L3-v2
  content: true

# Transcribe audio to text
transcription:

# Translate text between languages
translation:

workflow:
  index:
    tasks:
      - action: transcription
        select: "\\.wav$"
        task: file
      - action: translation
        args: ["fr"]
      - action: index
# Create and run the workflow
from txtai import Application

# Create and run the workflow
app = Application("workflow.yml")
list(app.workflow("index", [
  "US_tops_5_million.wav",
  "Canadas_last_fully.wav",
  "Beijing_mobilises.wav",
  "The_National_Park.wav",
  "Maine_man_wins_1_mil.wav",
  "Make_huge_profits.wav"
]))

# Search
app.search("wildlife")

上面的代码执行 `workflow.yml` 文件中定义的工作流。

LLM 工作流示例

工作流可以将多个 LLM 提示任务连接在一起。

llm:
  path: google/flan-t5-xl

workflow:
  llm:
    tasks:
      - task: template
        template: |
          Extract keywords for the following text.

          {text}
        action: llm
      - task: template
        template: |
          Translate the following text into French.

          {text}
        action: llm
from txtai import Application

app = Application("workflow.yml")
list(app.workflow("llm", [
  """
  txtai is an open-source platform for semantic search
  and workflows powered by language models.
  """
]))

任何 txtai 管道/工作流任务都可以在工作流中与 LLM 连接。

llm:
  path: google/flan-t5-xl

translation:

workflow:
  llm:
    tasks:
      - task: template
        template: |
          Extract keywords for the following text.

          {text}
        action: llm
      - action: translation
        args:
          - fr

有关更多信息,请参阅以下链接。

方法

工作流是可调用对象。工作流接受可迭代数据元素作为输入,并输出可迭代数据元素。

__init__(tasks, batch=100, workers=None, name=None, stream=None)

创建一个新的工作流。工作流是要执行的任务列表。

参数

名称 类型 描述 默认值
tasks

工作流任务列表

必需
batch

每次处理的项目数,默认为 100

100
workers

并行工作线程数

None
name

工作流名称

None
stream

工作流流处理器

None
源代码位于 txtai/workflow/base.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(self, tasks, batch=100, workers=None, name=None, stream=None):
    """
    Creates a new workflow. Workflows are lists of tasks to execute.

    Args:
        tasks: list of workflow tasks
        batch: how many items to process at a time, defaults to 100
        workers: number of concurrent workers
        name: workflow name
        stream: workflow stream processor
    """

    self.tasks = tasks
    self.batch = batch
    self.workers = workers
    self.name = name
    self.stream = stream

    # Set default number of executor workers to max number of actions in a task
    self.workers = max(len(task.action) for task in self.tasks) if not self.workers else self.workers

__call__(elements)

对输入元素执行工作流。此方法返回一个生成器,用于产生转换后的数据元素。

参数

名称 类型 描述 默认值
elements

可迭代数据元素

必需

返回值

类型 描述

产生转换后数据元素的生成器

源代码位于 txtai/workflow/base.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def __call__(self, elements):
    """
    Executes a workflow for input elements. This method returns a generator that yields transformed
    data elements.

    Args:
        elements: iterable data elements

    Returns:
        generator that yields transformed data elements
    """

    # Create execute instance for this run
    with Execute(self.workers) as executor:
        # Run task initializers
        self.initialize()

        # Process elements with stream processor, if available
        elements = self.stream(elements) if self.stream else elements

        # Process elements in batches
        for batch in self.chunk(elements):
            yield from self.process(batch, executor)

        # Run task finalizers
        self.finalize()

schedule(cron, elements, iterations=None)

使用 cron 表达式和元素计划(调度)一个工作流。

参数

名称 类型 描述 默认值
cron

cron 表达式

必需
elements

每次调用时传递给工作流的可迭代数据元素

必需
iterations

工作流运行次数,默认为无限次运行

None
源代码位于 txtai/workflow/base.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def schedule(self, cron, elements, iterations=None):
    """
    Schedules a workflow using a cron expression and elements.

    Args:
        cron: cron expression
        elements: iterable data elements passed to workflow each call
        iterations: number of times to run workflow, defaults to run indefinitely
    """

    # Check that croniter is installed
    if not CRONITER:
        raise ImportError('Workflow scheduling is not available - install "workflow" extra to enable')

    logger.info("'%s' scheduler started with schedule %s", self.name, cron)

    maxiterations = iterations
    while iterations is None or iterations > 0:
        # Schedule using localtime
        schedule = croniter(cron, datetime.now().astimezone()).get_next(datetime)
        logger.info("'%s' next run scheduled for %s", self.name, schedule.isoformat())
        time.sleep(schedule.timestamp() - time.time())

        # Run workflow
        # pylint: disable=W0703
        try:
            for _ in self(elements):
                pass
        except Exception:
            logger.error(traceback.format_exc())

        # Decrement iterations remaining, if necessary
        if iterations is not None:
            iterations -= 1

    logger.info("'%s' max iterations (%d) reached", self.name, maxiterations)

更多示例

有关工作流示例的完整列表,请参阅此链接