πŸ†• Build and deploy Haystack pipelines with deepset Studio

Run Tasks Concurrently Within a Custom Component


The execution logic of Haystack Pipelines is synchronous. Components, even if they belong to parallel branches, run one after the other.

This has several advantages, including ease of debugging and the ability to handle complex workflows. In many cases/applications, this execution logic works well.

Sometimes you may want to run some components concurrently. This can be useful for component that perform I/O-bound tasks, where most of the time is spent waiting for input/output operations. Consider, for example, waiting for a response from LLM API clients or database clients.

In this cookbook, we show how you can wrap multiple components into one that will run them concurrently in different threads.

Preparation

! pip install haystack-ai cohere-haystack elasticsearch-haystack
import os
from rich import print
import time
# this is only needed in Juptyer, where an asyncio event loop is already running

import nest_asyncio
nest_asyncio.apply()

Concurrent Generators

Use case: we want to send the same prompt to different generators and aggregate the results.

os.environ["OPENAI_API_KEY"]="your OpenAI API key"
os.environ["COHERE_API_KEY"]="your Cohere API key"
# an example Document to summarize

from haystack import Document


text="""
The giant panda (Ailuropoda melanoleuca), also known as the panda bear or simply panda, is a bear species endemic to China. It is characterised by its black-and-white coat and rotund body. The name "giant panda" is sometimes used to distinguish it from the red panda, a neighboring musteloid. Adult individuals average 100 to 115 kg (220 to 254 lb), and are typically 1.2 to 1.9 m (3 ft 11 in to 6 ft 3 in) long. The species is sexually dimorphic, as males are typically 10 to 20% larger. The fur is white, with black patches around the eyes, ears, legs and shoulders. A thumb is visible on the bear's forepaw, which helps in holding bamboo in place for feeding. Giant pandas have adapted larger molars and expanded temporal fossa to meet their dietary requirements.

The giant panda is exclusively found in six mountainous regions in a few provinces. It is also found in elevations of up to 3,000 m (9,800 ft). Its diet consists almost entirely of bamboo, making the bear mostly herbivorous, despite being classified in the order Carnivora. The shoot is an important energy source, as it contains starch and is 32% protein, hence pandas evolved the ability to effectively digest starch. They are solitary, only gathering in times of mating. Females rear cubs for an average of 18 to 24 months. Potential predators of sub-adult pandas include leopards. Giant pandas heavily rely on olfactory communication to communicate with one another; scent marks are used as chemical cues and on landmarks like rocks or trees. Giant pandas live long lives, with the oldest known individual dying at 38.

As a result of farming, deforestation, and other development, the giant panda has been driven out of the lowland areas where it once lived, and it is a conservation-reliant vulnerable species. A 2007 report showed 239 pandas living in captivity inside China and another 27 outside the country. Some reports also show that the number of giant pandas in the wild is on the rise. By March 2015, the wild giant panda population had increased to 1,864 individuals. In 2016, it was reclassified on the IUCN Red List from "endangered" to "vulnerable", affirming decade-long efforts to save the panda. In July 2021, Chinese authorities also reclassified the giant panda as vulnerable. The giant panda has often served as China's national symbol, appeared on Chinese Gold Panda coins since 1982 and as one of the five Fuwa mascots of the 2008 Summer Olympics held in Beijing.
"""

documents = [Document(content=text)]

Baseline: Pipeline with Generators running one after another

from haystack import Pipeline

from haystack.components.generators import OpenAIGenerator
from haystack_integrations.components.generators.cohere import CohereGenerator
from haystack.components.builders import PromptBuilder

documents = [Document(content=text)]


template = """Write a short summary of the following text.
Text:
{% for doc in documents %}
    {{ doc.content }}
{% endfor %}
"""

pipeline = Pipeline()
pipeline.add_component("prompt_builder", PromptBuilder(template))
pipeline.add_component("first_generator", OpenAIGenerator())
pipeline.add_component("second_generator", CohereGenerator())

pipeline.connect("prompt_builder", "first_generator")
pipeline.connect("prompt_builder", "second_generator")

start = time.time()
n=5
for i in range(n):
  result = pipeline.run({"prompt_builder": {"documents": documents}})
end = time.time()
print(result)
print(f"Time taken for {n} non concurrent calls: {end - start} seconds")
WARNING:haystack_integrations.components.generators.cohere.generator:The 'generate' API is marked as Legacy and is no longer maintained by Cohere. We recommend to use the CohereChatGenerator instead.
{
    'second_generator': {
        'replies': [
            ' The giant panda is a black and white bear species endemic to China. Scientists have struggled to 
classify the species due to its unique, exclusive diet of bamboo. The giant panda lives in mountainous regions, is 
mostly herbivorous, and is a solitary animal. Due to deforestation and development, the giant panda is now a 
vulnerable species, and relies on conservation efforts for its survival. '
        ],
        'meta': [{'finish_reason': 'COMPLETE'}]
    },
    'first_generator': {
        'replies': [
            'The giant panda is a bear species endemic to China, known for its black-and-white coat and reliance on
bamboo in its diet. Despite being classified as carnivores, they are mostly herbivorous. Giant pandas live in 
mountainous regions and are solitary animals, only coming together during mating. They are a conservation-reliant 
vulnerable species due to farming, deforestation, and other development. Efforts to save the giant panda have been 
successful, with the wild population increasing in recent years. It is also a symbol of China and has been featured
on Chinese coins and Olympic mascots.'
        ],
        'meta': [
            {
                'model': 'gpt-3.5-turbo-0125',
                'index': 0,
                'finish_reason': 'stop',
                'usage': {'completion_tokens': 117, 'prompt_tokens': 585, 'total_tokens': 702}
            }
        ]
    }
}
Time taken for 5 non concurrent calls: 28.85969638824463 seconds

Optimisation: Pipeline with Generators running concurrently

We wrap multiple generators into a single component that internally will run them concurrently in different threads. To simplify the threads orchestration, we use the to_thread function from asyncio. The component itself will stay synchronous, exposing the usual run method.

ConcurrentGenerators component

  • This component expects a list of generators and names as initialization parameters.

  • The _arun method is an async method, where concurrent execution takes place. It creates a thread for each generator and waits for all threads to finish. Then the results are collected, aggregated, and returned.

  • The run method is synchronous (as expected by the Haystack pipeline) and calls the _arun method using asyncio.run.

Learn about creating custom components in our documentation.

from haystack.core.component import Component
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders import PromptBuilder
from typing import List, Dict, Any, Optional
from haystack import component, default_from_dict, default_to_dict
from haystack.utils.type_serialization import deserialize_type
import asyncio


@component
class ConcurrentGenerators:
    def __init__(self, generators: List[Component], names: Optional[List[str]] = None):
        self.generators = generators
        if names is None:
            names = [f"generator_{i}" for i in range(len(generators))]
        self.names = names

			  # we set the output types here so that the results are not too nested
        output_types = {k: Dict[str, Any] for k in names}
        component.set_output_types(self, **output_types)

    def warm_up(self):
        """Warm up the generators."""
        for generator in self.generators:
            if hasattr(generator, "warm_up"):
                generator.warm_up()

    async def _arun(self, **kwargs):
        """
        Asynchrounous method to run the generators concurrently.
        """

        # the generators run in separate threads
        results = await asyncio.gather(
           *[asyncio.to_thread(generator.run, **kwargs) for generator in self.generators]
        )

        organized_results = {}
        for generator_name, res_ in zip(self.names, results):
            organized_results[generator_name] = res_
        return organized_results

    def run(self, prompt: str):
        """
        Synchronous run method that can be integrated into a classic synchronous pipeline.
        """
        results = asyncio.run(self._arun(prompt=prompt))
        return {"results": results}


    def to_dict(self):
        generators = [generator.to_dict() for generator in self.generators]
        return default_to_dict(self, generators=generators, names=self.names)

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "ConcurrentGenerators":
        init_params = data.get("init_parameters", {})

        # Deserialize the generators
        generators = []
        serialized_generators = init_params["generators"]
        for serialized_generator in serialized_generators:
            generator_class = deserialize_type(serialized_generator["type"])
            generator = generator_class.from_dict(serialized_generator)
            generators.append(generator)

        data["init_parameters"]["generators"] = generators
        return default_from_dict(cls, data)

Pipeline with ConcurrentGenerators component

pipeline = Pipeline()
pipeline.add_component("prompt_builder", PromptBuilder(template))
pipeline.add_component("concurrent_generators", ConcurrentGenerators(
    generators=[OpenAIGenerator(), CohereGenerator()],
    names=["openai", "cohere"]))

pipeline.connect("prompt_builder", "concurrent_generators")

start = time.time()
n=5
for i in range(n):
  result = pipeline.run({"prompt_builder": {"documents": documents}})
end = time.time()
print(result)
print(f"Time taken for {n} concurrent calls: {end - start} seconds")
WARNING:haystack_integrations.components.generators.cohere.generator:The 'generate' API is marked as Legacy and is no longer maintained by Cohere. We recommend to use the CohereChatGenerator instead.
{
    'concurrent_generators': {
        'results': {
            'openai': {
                'replies': [
                    'The text provides information about the giant panda, a bear species endemic to China known for
its black-and-white coat and rotund body. The species primarily feeds on bamboo and is classified as herbivorous 
despite being in the order Carnivora. Due to farming, deforestation, and development, the giant panda is a 
conservation-reliant vulnerable species. Efforts have been made to save the panda, resulting in an increase in the 
wild population. The giant panda is a national symbol of China and has been featured on various coins and mascots, 
including the 2008 Beijing Summer Olympics.'
                ],
                'meta': [
                    {
                        'model': 'gpt-3.5-turbo-0125',
                        'index': 0,
                        'finish_reason': 'stop',
                        'usage': {'completion_tokens': 119, 'prompt_tokens': 585, 'total_tokens': 704}
                    }
                ]
            },
            'cohere': {
                'replies': [
                    " The giant panda is a vulnerable species found only in China. Despite its dietary 
classification, the giant panda is herbivorous, feeding mostly on bamboo. They are heavy climbers and typically 
live alone or in pairs. \nThe giant panda has been China's national symbol and over the years, its 
conservation-reliant status has been a focal point for Chinese authorities. The giant panda was reclassified as 
vulnerable in 2016, and again in 2021. "
                ],
                'meta': [{'finish_reason': 'COMPLETE'}]
            }
        }
    }
}
Time taken for 5 concurrent calls: 17.31975769996643 seconds

Nice! Our approach does the trick.

Concurrent Retrievers

Use case: we want to send a user query to different retrievers concurrently. For example, we could send a user query to a keyword-based retriever and a semantic one, and join both sets of results.

⚠️ Some Document Stores have Hybrid Retrievers that internally send a single batch query to the DB. This solution is generally more efficient than the following approach.

We first generate some Documents with random content and embedding. The documents are written in ElasticsearchDocumentStore.

from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore

from haystack import Document
import numpy as np
import random

contents = ["The capital of Germany is Berlin", "The capital of France is Paris", "The capital of Spain is Madrid", "The capital of Italy is Rome"]

documents =[]
for i in range(1_000):
    doc = Document(
        content=random.choice(contents)+f" {i}",
        embedding=np.random.rand(768).tolist()
    )
    documents.append(doc)


document_store = ElasticsearchDocumentStore(cloud_id="your Elastic Cloud ID",
                                            api_key="your Elastic API key")
document_store.write_documents(documents)

print(document_store.count_documents())
print(document_store.filter_documents()[:6])
1000
[
    Document(id=c02b93cdfbc7afa9d8475f20500e340d43e1dbbd0b8e17458817d886f5ff3d68, content: 'The capital of Italy is
Rome 0', score: 1.0, embedding: vector of size 768),
    Document(id=49679c985b4ab56bddebd77190836cfe772eb537121f87dc5ec67313931a3db0, content: 'The capital of Italy is
Rome 1', score: 1.0, embedding: vector of size 768),
    Document(id=76f7ac287e02cf5972db85446efcc230b7a4f0d8988fc325ec15b1c4d8188eb5, content: 'The capital of Spain is
Madrid 2', score: 1.0, embedding: vector of size 768),
    Document(id=e44a482bfde5acf6636f9046f28634175dcd3f8021a7a7297e3aeb81f3aeef23, content: 'The capital of France 
is Paris 3', score: 1.0, embedding: vector of size 768),
    Document(id=3c14dad24490e08e8dad5b9ac53a7180181eb80f1656d31b08a3dc314a3983be, content: 'The capital of Italy is
Rome 4', score: 1.0, embedding: vector of size 768),
    Document(id=51aaaae5e07b07b3b53b07bf2cd159432ce0c4b2178a79f41999b6cb7c1f2af1, content: 'The capital of Italy is
Rome 5', score: 1.0, embedding: vector of size 768)
]

Baseline: Pipeline with Retrievers running one after another

from haystack_integrations.components.retrievers.elasticsearch import ElasticsearchBM25Retriever, ElasticsearchEmbeddingRetriever
from haystack import Pipeline
from haystack.components.joiners import DocumentJoiner



embedding_retriever = ElasticsearchEmbeddingRetriever(document_store=document_store)
bm25_retriever = ElasticsearchBM25Retriever(document_store=document_store)
document_joiner = DocumentJoiner(join_mode="merge")

hybrid_retrieval = Pipeline()
hybrid_retrieval.add_component("embedding_retriever", embedding_retriever)
hybrid_retrieval.add_component("bm25_retriever", bm25_retriever)
hybrid_retrieval.add_component("document_joiner", document_joiner)

hybrid_retrieval.connect("bm25_retriever", "document_joiner")
hybrid_retrieval.connect("embedding_retriever", "document_joiner")

query = "Madrid"
query_embedding = [0.1] * 768


start = time.time()
n = 100
for i in range(n):
  result = hybrid_retrieval.run(
          {"bm25_retriever": {"query": query}, "embedding_retriever": {"query_embedding": query_embedding}}
      )

end = time.time()
print(result)
print(f"Time taken for {n} non concurrent calls: {end - start} seconds")
{
    'document_joiner': {
        'documents': [
            Document(id=76f7ac287e02cf5972db85446efcc230b7a4f0d8988fc325ec15b1c4d8188eb5, content: 'The capital of 
Spain is Madrid 2', score: 0.6654567, embedding: vector of size 768),
            Document(id=81a55c519cd78fbe2eedf0d632cf2e256301ad42201b8be9b134eed7d5f8add6, content: 'The capital of 
Spain is Madrid 10', score: 0.6654567, embedding: vector of size 768),
            Document(id=ea817c00aee30178ebbfeb70f6e2a8853931c20ea47a1977227531ba175fc336, content: 'The capital of 
Spain is Madrid 12', score: 0.6654567, embedding: vector of size 768),
            Document(id=c43c8845ec4831d3321f94db7b256c86d1c4a4a3b077dd7acb0e826c65be7a20, content: 'The capital of 
Spain is Madrid 13', score: 0.6654567, embedding: vector of size 768),
            Document(id=b1da49017ec07aeed322e43c714e2f06e8048990af53c6b240c4a910f8588d75, content: 'The capital of 
Spain is Madrid 14', score: 0.6654567, embedding: vector of size 768),
            Document(id=e463016f308ca7446c7b1c2086fe0f5a7faf7fd23d1a036a1b4d7b384ca2ee0b, content: 'The capital of 
Spain is Madrid 16', score: 0.6654567, embedding: vector of size 768),
            Document(id=97d3185e7aee8f0c5fda8f10a4664ce9d41169d80cba822004016a7772df5a1f, content: 'The capital of 
Spain is Madrid 18', score: 0.6654567, embedding: vector of size 768),
            Document(id=5c41d72973050af64e932b8930982158cb2447bcba45d83c95f9b2a6bfa448f6, content: 'The capital of 
Spain is Madrid 22', score: 0.6654567, embedding: vector of size 768),
            Document(id=18b8a234a362a7f23ec85ca2490ca076f3a685ac2f54dcc51fc0dc08ce01ec48, content: 'The capital of 
Spain is Madrid 28', score: 0.6654567, embedding: vector of size 768),
            Document(id=894416844ee654aadc7b65a59e31d305ab2ff8f163d92cc8d55cc83087ab7cce, content: 'The capital of 
Spain is Madrid 37', score: 0.6654567, embedding: vector of size 768),
            Document(id=fb2394dc1f7ad21a80b143c423de21a4ce53ce785de1236331a28615dd3ba3e2, content: 'The capital of 
Spain is Madrid 194', score: 0.47077772, embedding: vector of size 768),
            Document(id=f8de88083f17f5011d81b5ac47366c3a31bb2a00d11fd67d0e9244f8579401c8, content: 'The capital of 
Italy is Rome 141', score: 0.47066435, embedding: vector of size 768),
            Document(id=a5b9809f86b2260ff622077d366ac2072d9d2f9c6603db62111578cbc53738c6, content: 'The capital of 
Spain is Madrid 538', score: 0.470319925, embedding: vector of size 768),
            Document(id=b3331a607a68e5cdf00857b31d5e48ad2dd1b3d3d63a32748938d381e6540146, content: 'The capital of 
Spain is Madrid 118', score: 0.470275225, embedding: vector of size 768),
            Document(id=3f510b7d9f4b8e8837396ecad9457ae169d38396d5385bff6eb6dd5fbcb3c75e, content: 'The capital of 
Italy is Rome 603', score: 0.4701038, embedding: vector of size 768),
            Document(id=66118fdd45b5b2f3fff2aa13d28b706efe4d3d239d11d7a335485d1f02da2eaf, content: 'The capital of 
Italy is Rome 612', score: 0.4700622, embedding: vector of size 768),
            Document(id=651529b836953ffd40a8620b2a34390fd627fbf3880ea740233a2f1974cfdda9, content: 'The capital of 
Italy is Rome 489', score: 0.47003627, embedding: vector of size 768),
            Document(id=6713ff1da7fece0bf2b16526aedd7f4044cf75a9be4928a85fade9c2446cded2, content: 'The capital of 
Germany is Berlin 735', score: 0.469990015, embedding: vector of size 768),
            Document(id=56cdccc03a89a1140bc3d58afb1fd945460e9ddb78858b893f96c3da9f3f6318, content: 'The capital of 
France is Paris 726', score: 0.4698249, embedding: vector of size 768),
            Document(id=1bfa297e32fb9d38ec2ecc59409937b910ce3cc9dab110ca40964fe00ce5dd5f, content: 'The capital of 
Spain is Madrid 977', score: 0.46978408, embedding: vector of size 768)
        ]
    }
}
Time taken for 100 non concurrent calls: 21.74193024635315 seconds

Optimisation: Pipeline with Retrievers running concurrently

We wrap multiple retrievers into a single component that internally will run them concurrently in different threads. To simplify the threads orchestration, we use the to_thread function from asyncio. The component itself will stay synchronous, exposing the usual run method.

ConcurrentRetrievers component

  • This component expects a list of retrievers and names as initialization parameters.

  • The _arun method is an async method, where concurrent execution takes place. It creates a thread for each retriever and waits for all threads to finish. Then the results are collected, aggregated, and returned.

  • The run method is synchronous (as expected by the Haystack pipeline) and calls the _arun method using asyncio.run.

  • Note: since different types of retrievers accept different query parameters in their run method (query, query_embedding, sparse_query_embedding), inspect.signature is used to determine if the retriever accepts a specific parameter.

from haystack import Pipeline
from haystack.components.joiners import DocumentJoiner
from haystack.dataclasses import SparseEmbedding
from haystack import component, Document
from haystack.core.component import Component
from typing import List, Optional, Dict, Any
import asyncio
import inspect


@component
class ConcurrentRetrievers:
    def __init__(self, retrievers: List[Component], names: Optional[List[str]] = None):
        self.retrievers = retrievers
        if names is None:
            names = [f"retriever_{i}" for i in range(len(retrievers))]
        self.names = names

        output_types = {k: List[Document] for k in names}
        component.set_output_types(self, **output_types)

    async def _arun(self, **kwargs):
        """
        Asynchrounous method to run the retrievers concurrently.
        """

        coroutines = []
        for retriever in self.retrievers:
            retriever_params = inspect.signature(getattr(retriever, "run")).parameters
            selected_params = {"top_k": kwargs.get("top_k"), "filters": kwargs.get("filters")}
            # each retriever accepts different parameters (keyword/BM25 retriever, embedding retriever, hybrid retriever)
            for query_param in ["query", "query_embedding", "sparse_query_embedding"]:
                if query_param in retriever_params:
                    selected_params[query_param] = kwargs.get(query_param)
            # the retrievers run in separate threads
            coroutines.append(asyncio.to_thread(retriever.run, **selected_params))

        results = await asyncio.gather(*coroutines)

        organized_results = {}
        for retriever_name, res_ in zip(self.names, results):
            organized_results[retriever_name] = res_["documents"]
        return organized_results

    def run(
        self,
        query: Optional[str] = None,
        query_embedding: Optional[List[float]] = None,
        sparse_query_embedding: Optional[SparseEmbedding] = None,
        filters: Optional[Dict[str, Any]] = None,
        top_k: Optional[int] = None,
    ):
        """
        Synchronous run method that can be integrated into a classic synchronous pipeline.
        """
        results = asyncio.run(
            self._arun(
                query=query,
                query_embedding=query_embedding,
                sparse_query_embedding=sparse_query_embedding,
                filters=filters,
                top_k=top_k,
            )
        )
        return results

# serialization/deserialization methods can be implemented like
# in ConcurrentGenerators component...

Pipeline with ConcurrentRetrievers component

embedding_retriever = ElasticsearchEmbeddingRetriever(document_store=document_store)
bm25_retriever = ElasticsearchBM25Retriever(document_store=document_store)
document_joiner = DocumentJoiner(join_mode="merge")

hybrid_retrieval = Pipeline()
hybrid_retrieval.add_component(
    "concurrent_retrievers",
    ConcurrentRetrievers(
        retrievers=[embedding_retriever, bm25_retriever],
        names=["embedding_retriever", "bm25_retriever"],
    ),
)
hybrid_retrieval.add_component("document_joiner", document_joiner)

hybrid_retrieval.connect("concurrent_retrievers.embedding_retriever", "document_joiner")
hybrid_retrieval.connect("concurrent_retrievers.bm25_retriever", "document_joiner")

query = "Madrid"
query_embedding = [0.1] * 768

start = time.time()
n = 100
for i in range(n):
    result = hybrid_retrieval.run(
        {"concurrent_retrievers": {"query": query, "query_embedding": query_embedding}}
    )

end = time.time()
print(result)
print(f"Time taken for {n} concurrent calls: {end - start} seconds")
{
    'document_joiner': {
        'documents': [
            Document(id=76f7ac287e02cf5972db85446efcc230b7a4f0d8988fc325ec15b1c4d8188eb5, content: 'The capital of 
Spain is Madrid 2', score: 0.6654567, embedding: vector of size 768),
            Document(id=81a55c519cd78fbe2eedf0d632cf2e256301ad42201b8be9b134eed7d5f8add6, content: 'The capital of 
Spain is Madrid 10', score: 0.6654567, embedding: vector of size 768),
            Document(id=ea817c00aee30178ebbfeb70f6e2a8853931c20ea47a1977227531ba175fc336, content: 'The capital of 
Spain is Madrid 12', score: 0.6654567, embedding: vector of size 768),
            Document(id=c43c8845ec4831d3321f94db7b256c86d1c4a4a3b077dd7acb0e826c65be7a20, content: 'The capital of 
Spain is Madrid 13', score: 0.6654567, embedding: vector of size 768),
            Document(id=b1da49017ec07aeed322e43c714e2f06e8048990af53c6b240c4a910f8588d75, content: 'The capital of 
Spain is Madrid 14', score: 0.6654567, embedding: vector of size 768),
            Document(id=e463016f308ca7446c7b1c2086fe0f5a7faf7fd23d1a036a1b4d7b384ca2ee0b, content: 'The capital of 
Spain is Madrid 16', score: 0.6654567, embedding: vector of size 768),
            Document(id=97d3185e7aee8f0c5fda8f10a4664ce9d41169d80cba822004016a7772df5a1f, content: 'The capital of 
Spain is Madrid 18', score: 0.6654567, embedding: vector of size 768),
            Document(id=5c41d72973050af64e932b8930982158cb2447bcba45d83c95f9b2a6bfa448f6, content: 'The capital of 
Spain is Madrid 22', score: 0.6654567, embedding: vector of size 768),
            Document(id=18b8a234a362a7f23ec85ca2490ca076f3a685ac2f54dcc51fc0dc08ce01ec48, content: 'The capital of 
Spain is Madrid 28', score: 0.6654567, embedding: vector of size 768),
            Document(id=894416844ee654aadc7b65a59e31d305ab2ff8f163d92cc8d55cc83087ab7cce, content: 'The capital of 
Spain is Madrid 37', score: 0.6654567, embedding: vector of size 768),
            Document(id=fb2394dc1f7ad21a80b143c423de21a4ce53ce785de1236331a28615dd3ba3e2, content: 'The capital of 
Spain is Madrid 194', score: 0.47077772, embedding: vector of size 768),
            Document(id=f8de88083f17f5011d81b5ac47366c3a31bb2a00d11fd67d0e9244f8579401c8, content: 'The capital of 
Italy is Rome 141', score: 0.47066435, embedding: vector of size 768),
            Document(id=a5b9809f86b2260ff622077d366ac2072d9d2f9c6603db62111578cbc53738c6, content: 'The capital of 
Spain is Madrid 538', score: 0.470319925, embedding: vector of size 768),
            Document(id=b3331a607a68e5cdf00857b31d5e48ad2dd1b3d3d63a32748938d381e6540146, content: 'The capital of 
Spain is Madrid 118', score: 0.470275225, embedding: vector of size 768),
            Document(id=3f510b7d9f4b8e8837396ecad9457ae169d38396d5385bff6eb6dd5fbcb3c75e, content: 'The capital of 
Italy is Rome 603', score: 0.4701038, embedding: vector of size 768),
            Document(id=66118fdd45b5b2f3fff2aa13d28b706efe4d3d239d11d7a335485d1f02da2eaf, content: 'The capital of 
Italy is Rome 612', score: 0.4700622, embedding: vector of size 768),
            Document(id=651529b836953ffd40a8620b2a34390fd627fbf3880ea740233a2f1974cfdda9, content: 'The capital of 
Italy is Rome 489', score: 0.47003627, embedding: vector of size 768),
            Document(id=6713ff1da7fece0bf2b16526aedd7f4044cf75a9be4928a85fade9c2446cded2, content: 'The capital of 
Germany is Berlin 735', score: 0.469990015, embedding: vector of size 768),
            Document(id=56cdccc03a89a1140bc3d58afb1fd945460e9ddb78858b893f96c3da9f3f6318, content: 'The capital of 
France is Paris 726', score: 0.4698249, embedding: vector of size 768),
            Document(id=1bfa297e32fb9d38ec2ecc59409937b910ce3cc9dab110ca40964fe00ce5dd5f, content: 'The capital of 
Spain is Madrid 977', score: 0.46978408, embedding: vector of size 768)
        ]
    }
}
Time taken for 100 concurrent calls: 11.726269483566284 seconds

Conclusions

  • The proposed approach has proved effective for I/O bound tasks.
  • A similar approach can be implemented using multiprocessing for CPU-bound tasks, such as local ML inference.
  • When implementing components like these, special attention must be paid to input and output to make them truly usable in pipelines.

Notebook by Stefano Fiorucci