工作流
工作流是一个简单而强大的构造,它接受一个可调用对象并返回元素。工作流与管道配合良好,但可以与任何可调用对象一起使用。工作流是流式的,按批次处理数据,从而可以高效地处理大量数据。
鉴于管道是可调用对象,工作流可以高效地处理管道数据。大型语言模型通常处理较小批量的数据,工作流非常适合向一系列 transformers 管道馈送数据。
最基本工作流的示例
workflow = Workflow([Task(lambda x: [y * 2 for y in x])])
list(workflow([1, 2, 3]))
此示例将每个输入值乘以 2,并通过生成器返回转换后的元素。
由于工作流作为生成器运行,因此必须消费其输出才能执行。以下代码片段展示了如何消费输出。
# Small dataset where output fits in memory
list(workflow(elements))
# Large dataset
for output in workflow(elements):
function(output)
# Large dataset where output is discarded
for _ in workflow(elements):
pass
工作流可以通过 Python 或配置运行。下面展示了这两种方法的示例。
示例
下面展示了一个功能齐全的 Python 示例。此工作流转录一组音频文件,将文本翻译成法语并索引数据。
from txtai import Embeddings
from txtai.pipeline import Transcription, Translation
from txtai.workflow import FileTask, Task, Workflow
# Embeddings instance
embeddings = Embeddings({
"path": "sentence-transformers/paraphrase-MiniLM-L3-v2",
"content": True
})
# Transcription instance
transcribe = Transcription()
# Translation instance
translate = Translation()
tasks = [
FileTask(transcribe, r"\.wav$"),
Task(lambda x: translate(x, "fr"))
]
# List of files to process
data = [
"US_tops_5_million.wav",
"Canadas_last_fully.wav",
"Beijing_mobilises.wav",
"The_National_Park.wav",
"Maine_man_wins_1_mil.wav",
"Make_huge_profits.wav"
]
# Workflow that translate text to French
workflow = Workflow(tasks)
# Index data
embeddings.index((uid, text, None) for uid, text in enumerate(workflow(data)))
# Search
embeddings.search("wildlife", 1)
配置驱动的示例
工作流也可以使用 YAML 配置定义。
writable: true
embeddings:
path: sentence-transformers/paraphrase-MiniLM-L3-v2
content: true
# Transcribe audio to text
transcription:
# Translate text between languages
translation:
workflow:
index:
tasks:
- action: transcription
select: "\\.wav$"
task: file
- action: translation
args: ["fr"]
- action: index
# Create and run the workflow
from txtai import Application
# Create and run the workflow
app = Application("workflow.yml")
list(app.workflow("index", [
"US_tops_5_million.wav",
"Canadas_last_fully.wav",
"Beijing_mobilises.wav",
"The_National_Park.wav",
"Maine_man_wins_1_mil.wav",
"Make_huge_profits.wav"
]))
# Search
app.search("wildlife")
上面的代码执行 `workflow.yml` 文件中定义的工作流。
LLM 工作流示例
工作流可以将多个 LLM 提示任务连接在一起。
llm:
path: google/flan-t5-xl
workflow:
llm:
tasks:
- task: template
template: |
Extract keywords for the following text.
{text}
action: llm
- task: template
template: |
Translate the following text into French.
{text}
action: llm
from txtai import Application
app = Application("workflow.yml")
list(app.workflow("llm", [
"""
txtai is an open-source platform for semantic search
and workflows powered by language models.
"""
]))
任何 txtai 管道/工作流任务都可以在工作流中与 LLM 连接。
llm:
path: google/flan-t5-xl
translation:
workflow:
llm:
tasks:
- task: template
template: |
Extract keywords for the following text.
{text}
action: llm
- action: translation
args:
- fr
有关更多信息,请参阅以下链接。
方法
工作流是可调用对象。工作流接受可迭代数据元素作为输入,并输出可迭代数据元素。
__init__(tasks, batch=100, workers=None, name=None, stream=None)
创建一个新的工作流。工作流是要执行的任务列表。
参数
名称 | 类型 | 描述 | 默认值 |
---|---|---|---|
tasks
|
工作流任务列表 |
必需 | |
batch
|
每次处理的项目数,默认为 100 |
100
|
|
workers
|
并行工作线程数 |
None
|
|
name
|
工作流名称 |
None
|
|
stream
|
工作流流处理器 |
None
|
源代码位于 txtai/workflow/base.py
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
|
__call__(elements)
对输入元素执行工作流。此方法返回一个生成器,用于产生转换后的数据元素。
参数
名称 | 类型 | 描述 | 默认值 |
---|---|---|---|
elements
|
可迭代数据元素 |
必需 |
返回值
类型 | 描述 |
---|---|
产生转换后数据元素的生成器 |
源代码位于 txtai/workflow/base.py
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
|
schedule(cron, elements, iterations=None)
使用 cron 表达式和元素计划(调度)一个工作流。
参数
名称 | 类型 | 描述 | 默认值 |
---|---|---|---|
cron
|
cron 表达式 |
必需 | |
elements
|
每次调用时传递给工作流的可迭代数据元素 |
必需 | |
iterations
|
工作流运行次数,默认为无限次运行 |
None
|
源代码位于 txtai/workflow/base.py
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
|
更多示例
有关工作流示例的完整列表,请参阅此链接。