跳至内容

方法

API

基类: Application

基础 API 模板。API 是一个扩展的 txtai 应用程序,增加了将 API 实例集群在一起的能力。

下游应用程序可以扩展此基础模板来添加/修改功能。

源代码位于 txtai/api/base.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class API(Application):
    """
    Base API template. The API is an extended txtai application, adding the ability to cluster API instances together.

    Downstream applications can extend this base template to add/modify functionality.
    """

    def __init__(self, config, loaddata=True):
        super().__init__(config, loaddata)

        # Embeddings cluster
        self.cluster = None
        if self.config.get("cluster"):
            self.cluster = Cluster(self.config["cluster"])

    # pylint: disable=W0221
    def search(self, query, limit=None, weights=None, index=None, parameters=None, graph=False, request=None):
        # When search is invoked via the API, limit is set from the request
        # When search is invoked directly, limit is set using the method parameter
        limit = self.limit(request.query_params.get("limit") if request and hasattr(request, "query_params") else limit)
        weights = self.weights(request.query_params.get("weights") if request and hasattr(request, "query_params") else weights)
        index = request.query_params.get("index") if request and hasattr(request, "query_params") else index
        parameters = request.query_params.get("parameters") if request and hasattr(request, "query_params") else parameters
        graph = request.query_params.get("graph") if request and hasattr(request, "query_params") else graph

        # Decode parameters
        parameters = json.loads(parameters) if parameters and isinstance(parameters, str) else parameters

        if self.cluster:
            return self.cluster.search(query, limit, weights, index, parameters, graph)

        return super().search(query, limit, weights, index, parameters, graph)

    def batchsearch(self, queries, limit=None, weights=None, index=None, parameters=None, graph=False):
        if self.cluster:
            return self.cluster.batchsearch(queries, self.limit(limit), weights, index, parameters, graph)

        return super().batchsearch(queries, limit, weights, index, parameters, graph)

    def add(self, documents):
        """
        Adds a batch of documents for indexing.

        Downstream applications can override this method to also store full documents in an external system.

        Args:
            documents: list of {id: value, text: value}

        Returns:
            unmodified input documents
        """

        if self.cluster:
            self.cluster.add(documents)
        else:
            super().add(documents)

        return documents

    def index(self):
        """
        Builds an embeddings index for previously batched documents.
        """

        if self.cluster:
            self.cluster.index()
        else:
            super().index()

    def upsert(self):
        """
        Runs an embeddings upsert operation for previously batched documents.
        """

        if self.cluster:
            self.cluster.upsert()
        else:
            super().upsert()

    def delete(self, ids):
        """
        Deletes from an embeddings index. Returns list of ids deleted.

        Args:
            ids: list of ids to delete

        Returns:
            ids deleted
        """

        if self.cluster:
            return self.cluster.delete(ids)

        return super().delete(ids)

    def reindex(self, config, function=None):
        """
        Recreates this embeddings index using config. This method only works if document content storage is enabled.

        Args:
            config: new config
            function: optional function to prepare content for indexing
        """

        if self.cluster:
            self.cluster.reindex(config, function)
        else:
            super().reindex(config, function)

    def count(self):
        """
        Total number of elements in this embeddings index.

        Returns:
            number of elements in embeddings index
        """

        if self.cluster:
            return self.cluster.count()

        return super().count()

    def limit(self, limit):
        """
        Parses the number of results to return from the request. Allows range of 1-250, with a default of 10.

        Args:
            limit: limit parameter

        Returns:
            bounded limit
        """

        # Return between 1 and 250 results, defaults to 10
        return max(1, min(250, int(limit) if limit else 10))

    def weights(self, weights):
        """
        Parses the weights parameter from the request.

        Args:
            weights: weights parameter

        Returns:
            weights
        """

        return float(weights) if weights else weights

add(documents)

添加一批文档用于索引。

下游应用程序可以覆盖此方法,以便也将完整文档存储在外部系统中。

参数

名称 类型 描述 默认值
documents

list of {id: value, text: value}

必填

返回值

类型 描述

未修改的输入文档

源代码位于 txtai/api/base.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def add(self, documents):
    """
    Adds a batch of documents for indexing.

    Downstream applications can override this method to also store full documents in an external system.

    Args:
        documents: list of {id: value, text: value}

    Returns:
        unmodified input documents
    """

    if self.cluster:
        self.cluster.add(documents)
    else:
        super().add(documents)

    return documents

addobject(data, uid, field)

构建一批对象文档的辅助方法。

参数

名称 类型 描述 默认值
data

对象内容

必填
uid

可选的对应 uid 列表

必填
field

可选要设置的字段

必填

返回值

类型 描述

documents

源代码位于 txtai/app/base.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
def addobject(self, data, uid, field):
    """
    Helper method that builds a batch of object documents.

    Args:
        data: object content
        uid: optional list of corresponding uids
        field: optional field to set

    Returns:
        documents
    """

    # Raise error if index is not writable
    if not self.config.get("writable"):
        raise ReadOnlyError("Attempting to add documents to a read-only index (writable != True)")

    documents = []
    for x, content in enumerate(data):
        if field:
            row = {"id": uid[x], field: content} if uid else {field: content}
        elif uid:
            row = (uid[x], content)
        else:
            row = content

        documents.append(row)

    return self.add(documents)

agent(name, *args, **kwargs)

执行一个代理。

参数

名称 类型 描述 默认值
name

代理名称

必填
args

代理位置参数

()
kwargs

代理关键字参数

{}
源代码位于 txtai/app/base.py
784
785
786
787
788
789
790
791
792
793
794
795
796
797
def agent(self, name, *args, **kwargs):
    """
    Executes an agent.

    Args:
        name: agent name
        args: agent positional arguments
        kwargs: agent keyword arguments
    """

    if name in self.agents:
        return self.agents[name](*args, **kwargs)

    return None

batchexplain(queries, texts=None, limit=10)

解释文本中每个输入 token 对于查询列表的重要性。

参数

名称 类型 描述 默认值
queries

查询文本

必填
texts

可选的文本列表,否则运行搜索查询

None
limit

如果 texts 为 None 时的可选限制

10

返回值

类型 描述

每条输入文本和每个查询的字典列表,其中较高的 token 分数表示相对于查询更高的重要性

源代码位于 txtai/app/base.py
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
def batchexplain(self, queries, texts=None, limit=10):
    """
    Explains the importance of each input token in text for a list of queries.

    Args:
        query: queries text
        texts: optional list of text, otherwise runs search queries
        limit: optional limit if texts is None

    Returns:
        list of dict per input text per query where a higher token scores represents higher importance relative to the query
    """

    if self.embeddings:
        with self.lock:
            return self.embeddings.batchexplain(queries, texts, limit)

    return None

batchsimilarity(queries, texts)

计算查询列表和文本列表之间的相似度。返回一个按每个查询的最高分数排序的 {id: value, score: value} 列表,其中 id 是文本列表中的索引。

参数

名称 类型 描述 默认值
queries

查询文本

必填
texts

文本列表

必填

返回值

类型 描述

每个查询的 {id: value, score: value} 列表

源代码位于 txtai/app/base.py
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
def batchsimilarity(self, queries, texts):
    """
    Computes the similarity between list of queries and list of text. Returns a list
    of {id: value, score: value} sorted by highest score per query, where id is the
    index in texts.

    Args:
        queries: queries text
        texts: list of text

    Returns:
        list of {id: value, score: value} per query
    """

    # Use similarity instance if available otherwise fall back to embeddings model
    if "similarity" in self.pipelines:
        return [[{"id": uid, "score": float(score)} for uid, score in r] for r in self.pipelines["similarity"](queries, texts)]
    if self.embeddings:
        return [[{"id": uid, "score": float(score)} for uid, score in r] for r in self.embeddings.batchsimilarity(queries, texts)]

    return None

batchtransform(texts, category=None, index=None)

将文本列表转换为嵌入数组。

参数

名称 类型 描述 默认值
texts

文本列表

必填
category

基于指令的嵌入的类别

None
index

索引名称(如果适用)

None

返回值

类型 描述

嵌入数组

源代码位于 txtai/app/base.py
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
def batchtransform(self, texts, category=None, index=None):
    """
    Transforms list of text into embeddings arrays.

    Args:
        texts: list of text
        category: category for instruction-based embeddings
        index: index name, if applicable

    Returns:
        embeddings arrays
    """

    if self.embeddings:
        return [[float(x) for x in result] for result in self.embeddings.batchtransform(texts, category, index)]

    return None

count()

此嵌入索引中的元素总数。

返回值

类型 描述

嵌入索引中的元素数量

源代码位于 txtai/api/base.py
121
122
123
124
125
126
127
128
129
130
131
132
def count(self):
    """
    Total number of elements in this embeddings index.

    Returns:
        number of elements in embeddings index
    """

    if self.cluster:
        return self.cluster.count()

    return super().count()

createagents()

创建代理。

源代码位于 txtai/app/base.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def createagents(self):
    """
    Create agents.
    """

    # Agent definitions
    self.agents = {}

    # Create agents
    if "agent" in self.config:
        for agent, config in self.config["agent"].items():
            # Create copy of config
            config = config.copy()

            # Resolve LLM
            config["llm"] = self.function("llm")

            # Resolve tools
            for tool in config.get("tools", []):
                if isinstance(tool, dict) and "target" in tool:
                    tool["target"] = self.function(tool["target"])

            # Create agent
            self.agents[agent] = Agent(**config)

createpipelines()

创建流水线。

源代码位于 txtai/app/base.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def createpipelines(self):
    """
    Create pipelines.
    """

    # Pipeline definitions
    self.pipelines = {}

    # Default pipelines
    pipelines = list(PipelineFactory.list().keys())

    # Add custom pipelines
    for key in self.config:
        if "." in key:
            pipelines.append(key)

    # Move dependent pipelines to end of list
    dependent = ["similarity", "extractor", "rag"]
    pipelines = sorted(pipelines, key=lambda x: dependent.index(x) + 1 if x in dependent else 0)

    # Create pipelines
    for pipeline in pipelines:
        if pipeline in self.config:
            config = self.config[pipeline] if self.config[pipeline] else {}

            # Add application reference, if requested
            if "application" in config:
                config["application"] = self

            # Custom pipeline parameters
            if pipeline in ["extractor", "rag"]:
                if "similarity" not in config:
                    # Add placeholder, will be set to embeddings index once initialized
                    config["similarity"] = None

                # Resolve reference pipelines
                if config.get("similarity") in self.pipelines:
                    config["similarity"] = self.pipelines[config["similarity"]]

                if config.get("path") in self.pipelines:
                    config["path"] = self.pipelines[config["path"]]

            elif pipeline == "similarity" and "path" not in config and "labels" in self.pipelines:
                config["model"] = self.pipelines["labels"]

            self.pipelines[pipeline] = PipelineFactory.create(config, pipeline)

delete(ids)

从嵌入索引中删除。返回已删除的 id 列表。

参数

名称 类型 描述 默认值
ids

要删除的 id 列表

必填

返回值

类型 描述

已删除的 id

源代码位于 txtai/api/base.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def delete(self, ids):
    """
    Deletes from an embeddings index. Returns list of ids deleted.

    Args:
        ids: list of ids to delete

    Returns:
        ids deleted
    """

    if self.cluster:
        return self.cluster.delete(ids)

    return super().delete(ids)

explain(query, texts=None, limit=10)

解释文本中每个输入 token 对于一个查询的重要性。

参数

名称 类型 描述 默认值
queries

查询文本

必填
texts

可选的文本列表,否则运行搜索查询

None
limit

如果 texts 为 None 时的可选限制

10

返回值

类型 描述

每条输入文本的字典列表,其中较高的 token 分数表示相对于查询更高的重要性

源代码位于 txtai/app/base.py
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
def explain(self, query, texts=None, limit=10):
    """
    Explains the importance of each input token in text for a query.

    Args:
        query: query text
        texts: optional list of text, otherwise runs search query
        limit: optional limit if texts is None

    Returns:
        list of dict per input text where a higher token scores represents higher importance relative to the query
    """

    if self.embeddings:
        with self.lock:
            return self.embeddings.explain(query, texts, limit)

    return None

extract(queue, texts=None)

提取输入问题的答案。

参数

名称 类型 描述 默认值
queue

list of {name: value, query: value, question: value, snippet: value}

必填
texts

可选的文本列表

None

返回值

类型 描述

list of {name: value, answer: value}

源代码位于 txtai/app/base.py
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
def extract(self, queue, texts=None):
    """
    Extracts answers to input questions.

    Args:
        queue: list of {name: value, query: value, question: value, snippet: value}
        texts: optional list of text

    Returns:
        list of {name: value, answer: value}
    """

    if self.embeddings and "extractor" in self.pipelines:
        # Get extractor instance
        extractor = self.pipelines["extractor"]

        # Run extractor and return results as dicts
        return extractor(queue, texts)

    return None

index()

为之前分批的文档构建嵌入索引。

源代码位于 txtai/api/base.py
71
72
73
74
75
76
77
78
79
def index(self):
    """
    Builds an embeddings index for previously batched documents.
    """

    if self.cluster:
        self.cluster.index()
    else:
        super().index()

label(text, labels)

使用标签列表对文本应用零样本分类器。返回一个按最高分数排序的 {id: value, score: value} 列表,其中 id 是标签列表中的索引。

参数

名称 类型 描述 默认值
text

text|list

必填
labels

标签列表

必填

返回值

类型 描述

每个文本元素的 {id: value, score: value} 列表

源代码位于 txtai/app/base.py
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
def label(self, text, labels):
    """
    Applies a zero shot classifier to text using a list of labels. Returns a list of
    {id: value, score: value} sorted by highest score, where id is the index in labels.

    Args:
        text: text|list
        labels: list of labels

    Returns:
        list of {id: value, score: value} per text element
    """

    if "labels" in self.pipelines:
        # Text is a string
        if isinstance(text, str):
            return [{"id": uid, "score": float(score)} for uid, score in self.pipelines["labels"](text, labels)]

        # Text is a list
        return [[{"id": uid, "score": float(score)} for uid, score in result] for result in self.pipelines["labels"](text, labels)]

    return None

pipeline(name, *args, **kwargs)

通用流水线执行方法。

参数

名称 类型 描述 默认值
name

流水线名称

必填
args

流水线位置参数

()
kwargs

流水线关键字参数

{}

返回值

类型 描述

流水线结果

源代码位于 txtai/app/base.py
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
def pipeline(self, name, *args, **kwargs):
    """
    Generic pipeline execution method.

    Args:
        name: pipeline name
        args: pipeline positional arguments
        kwargs: pipeline keyword arguments

    Returns:
        pipeline results
    """

    # Backwards compatible with previous pipeline function arguments
    args = args[0] if args and len(args) == 1 and isinstance(args[0], tuple) else args

    if name in self.pipelines:
        return self.pipelines[name](*args, **kwargs)

    return None

reindex(config, function=None)

使用 config 重建此嵌入索引。此方法仅在启用文档内容存储时有效。

参数

名称 类型 描述 默认值
config

新配置

必填
function

可选的用于准备索引内容的函数

None
源代码位于 txtai/api/base.py
107
108
109
110
111
112
113
114
115
116
117
118
119
def reindex(self, config, function=None):
    """
    Recreates this embeddings index using config. This method only works if document content storage is enabled.

    Args:
        config: new config
        function: optional function to prepare content for indexing
    """

    if self.cluster:
        self.cluster.reindex(config, function)
    else:
        super().reindex(config, function)

similarity(query, texts)

计算查询和文本列表之间的相似度。返回一个按最高分数排序的 {id: value, score: value} 列表,其中 id 是文本列表中的索引。

参数

名称 类型 描述 默认值
queries

查询文本

必填
texts

文本列表

必填

返回值

类型 描述

list of {id: value, score: value}

源代码位于 txtai/app/base.py
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
def similarity(self, query, texts):
    """
    Computes the similarity between query and list of text. Returns a list of
    {id: value, score: value} sorted by highest score, where id is the index
    in texts.

    Args:
        query: query text
        texts: list of text

    Returns:
        list of {id: value, score: value}
    """

    # Use similarity instance if available otherwise fall back to embeddings model
    if "similarity" in self.pipelines:
        return [{"id": uid, "score": float(score)} for uid, score in self.pipelines["similarity"](query, texts)]
    if self.embeddings:
        return [{"id": uid, "score": float(score)} for uid, score in self.embeddings.similarity(query, texts)]

    return None

transform(text, category=None, index=None)

将文本转换为嵌入数组。

参数

名称 类型 描述 默认值
text

输入文本

必填
category

基于指令的嵌入的类别

None
index

索引名称(如果适用)

None

返回值

类型 描述

嵌入数组

源代码位于 txtai/app/base.py
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
def transform(self, text, category=None, index=None):
    """
    Transforms text into embeddings arrays.

    Args:
        text: input text
        category: category for instruction-based embeddings
        index: index name, if applicable

    Returns:
        embeddings array
    """

    if self.embeddings:
        return [float(x) for x in self.embeddings.transform(text, category, index)]

    return None

upsert()

对之前分批的文档运行嵌入 upsert 操作。

源代码位于 txtai/api/base.py
81
82
83
84
85
86
87
88
89
def upsert(self):
    """
    Runs an embeddings upsert operation for previously batched documents.
    """

    if self.cluster:
        self.cluster.upsert()
    else:
        super().upsert()

wait()

关闭线程池并等待完成。

源代码位于 txtai/app/base.py
799
800
801
802
803
804
805
806
807
def wait(self):
    """
    Closes threadpool and waits for completion.
    """

    if self.pool:
        self.pool.close()
        self.pool.join()
        self.pool = None

workflow(name, elements)

执行一个工作流。

参数

名称 类型 描述 默认值
name

工作流名称

必填
elements

要处理的元素

必填

返回值

类型 描述

已处理的元素

源代码位于 txtai/app/base.py
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
def workflow(self, name, elements):
    """
    Executes a workflow.

    Args:
        name: workflow name
        elements: elements to process

    Returns:
        processed elements
    """

    if hasattr(elements, "__len__") and hasattr(elements, "__getitem__"):
        # Convert to tuples and return as a list since input is sized
        elements = [tuple(element) if isinstance(element, list) else element for element in elements]
    else:
        # Convert to tuples and return as a generator since input is not sized
        elements = (tuple(element) if isinstance(element, list) else element for element in elements)

    # Execute workflow
    return self.workflows[name](elements)