Tutorial: Simplifying Pipeline Inputs with Multiplexer
Last Updated: January 15, 2025
- Level: Intermediate
- Time to complete: 10 minutes
- Components Used: Multiplexer, InMemoryDocumentStore, HuggingFaceAPIDocumentEmbedder, HuggingFaceAPITextEmbedder, InMemoryEmbeddingRetriever, PromptBuilder, HuggingFaceAPIGenerator and AnswerBuilder
- Prerequisites: You must have a Hugging Face API Key and be familiar with creating pipelines
- Goal: After completing this tutorial, you’ll have learned how to use a Multiplexer to simplify the inputs that
Pipeline.run()
get
As of version 2.2.0,
Multiplexer
has been deprecated in Haystack and will be completely removed from Haystack as of v2.4.0. We recommend using BranchJoiner instead. For more details about this deprecation, check out Haystack 2.2.0 release notes on Github.
This tutorial uses the latest version of Haystack 2.x (
haystack-ai
). For more information on Haystack 2.0, read the Haystack 2.0 announcement or visit the Haystack Documentation.
Overview
If you’ve ever built a Haystack pipeline with more than 3-4 components, you probably noticed that the number of inputs to pass to the run()
method of the pipeline grow endlessly. New components take some of their input from the other components of a pipeline, but many of them also require additional input from the user. As a result, the data
input of Pipeline.run()
grows and becomes very repetitive.
There is one component that can help managing this repetition in a more effective manner, and it’s called
Multiplexer
.
In this tutorial, you will learn how to drastically simplify the Pipeline.run()
of a RAG pipeline using a Multiplexer.
Setup
Prepare the Colab Environment
Install Haystack
Install Haystack with pip
:
%%bash
pip install haystack-ai "huggingface_hub>=0.23.0"
Enable Telemetry
Knowing you’re using this tutorial helps us decide where to invest our efforts to build a better product but you can always opt out by commenting the following line. See Telemetry for more details.
from haystack.telemetry import tutorial_running
tutorial_running(37)
Enter a Hugging Face API key
Set a Hugging Face API key:
import os
from getpass import getpass
if "HF_API_TOKEN" not in os.environ:
os.environ["HF_API_TOKEN"] = getpass("Enter Hugging Face token:")
Indexing Documents with a Pipeline
Create a pipeline to store the small example dataset in the InMemoryDocumentStore with their embeddings. You will use HuggingFaceAPIDocumentEmbedder to generate embeddings for your Documents and write them to the document store with the DocumentWriter.
After adding these components to your pipeline, connect them and run the pipeline.
If you’d like to learn about preprocessing files before you index them to your document store, follow the Preprocessing Different File Types tutorial.
from haystack import Pipeline, Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.writers import DocumentWriter
from haystack.components.embedders import HuggingFaceAPIDocumentEmbedder
documents = [
Document(content="My name is Jean and I live in Paris."),
Document(content="My name is Mark and I live in Berlin."),
Document(content="My name is Giorgio and I live in Rome."),
Document(content="My name is Giorgio and I live in Milan."),
Document(content="My name is Giorgio and I lived in many cities, but I settled in Naples eventually."),
]
document_store = InMemoryDocumentStore()
indexing_pipeline = Pipeline()
indexing_pipeline.add_component(
instance=HuggingFaceAPIDocumentEmbedder(
api_type="serverless_inference_api", api_params={"model": "sentence-transformers/all-MiniLM-L6-v2"}
),
name="doc_embedder",
)
indexing_pipeline.add_component(instance=DocumentWriter(document_store=document_store), name="doc_writer")
indexing_pipeline.connect("doc_embedder.documents", "doc_writer.documents")
indexing_pipeline.run({"doc_embedder": {"documents": documents}})
Building a RAG Pipeline
Build a basic retrieval augmented generative pipeline with
HuggingFaceAPITextEmbedder,
InMemoryEmbeddingRetriever,
PromptBuilder and
HuggingFaceAPIGenerator. Additionally, add
AnswerBuilder to help you enrich the generated answer with meta
info and the query
input.
For a step-by-step guide to create a RAG pipeline with Haystack, follow the Creating Your First QA Pipeline with Retrieval-Augmentation tutorial
from haystack.components.embedders import HuggingFaceAPITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder, AnswerBuilder
from haystack.components.generators import HuggingFaceAPIGenerator
template = """
<|user|>
Answer the question based on the given context.
Context:
{% for document in documents %}
{{ document.content }}
{% endfor %}
Question: {{ question }}</s>
<|assistant|>
Answer:
"""
pipe = Pipeline()
pipe.add_component(
"embedder",
HuggingFaceAPITextEmbedder(
api_type="serverless_inference_api", api_params={"model": "sentence-transformers/all-MiniLM-L6-v2"}
),
)
pipe.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store))
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component(
"llm",
HuggingFaceAPIGenerator(api_type="serverless_inference_api", api_params={"model": "HuggingFaceH4/zephyr-7b-beta"}),
)
pipe.add_component("answer_builder", AnswerBuilder())
pipe.connect("embedder.embedding", "retriever.query_embedding")
pipe.connect("retriever", "prompt_builder.documents")
pipe.connect("prompt_builder", "llm")
pipe.connect("llm.replies", "answer_builder.replies")
pipe.connect("llm.meta", "answer_builder.meta")
Running the Pipeline
Pass the query
to embedder
, prompt_builder
and answer_builder
and run it:
query = "Where does Mark live?"
pipe.run({"embedder": {"text": query}, "prompt_builder": {"question": query}, "answer_builder": {"query": query}})
In this basic RAG pipeline, components require a query
to operate are embedder
, prompt_builder
, and answer_builder
. However, as you extend the pipeline with additional components like Retrievers and Rankers, the number of components needing a query
can increase indefinitely. This leads to repetitive and increasingly complex Pipeline.run()
calls. In such cases, using a Multiplexer can help simplify and declutter Pipeline.run()
.
Introducing a Multiplexer
The
Multiplexer is a component that can accept multiple input connections and then distributes the first value it receives to all components connected to its output. In this seeting, you can use this component by connecting it to other pipeline components that expect a query
during runtime.
Now, initialize the Multiplexer with the expected input type (in this case, str
since the query
is a string):
from haystack.components.others import Multiplexer
multiplexer = Multiplexer(str)
Adding the Multiplexer
to the Pipeline
Create the same RAG pipeline but this time with the Multiplexer. Add the Multiplexer to the pipeline and connect it to all the components that need the query
as an input:
from haystack.components.embedders import HuggingFaceAPITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder, AnswerBuilder
from haystack.components.generators import HuggingFaceAPIGenerator
template = """
<|user|>
Answer the question based on the given context.
Context:
{% for document in documents %}
{{ document.content }}
{% endfor %}
Question: {{ question }}</s>
<|assistant|>
Answer:
"""
pipe = Pipeline()
pipe.add_component("multiplexer", multiplexer)
pipe.add_component(
"embedder",
HuggingFaceAPITextEmbedder(
api_type="serverless_inference_api", api_params={"model": "sentence-transformers/all-MiniLM-L6-v2"}
),
)
pipe.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store))
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component(
"llm",
HuggingFaceAPIGenerator(api_type="serverless_inference_api", api_params={"model": "HuggingFaceH4/zephyr-7b-beta"}),
)
pipe.add_component("answer_builder", AnswerBuilder())
# Connect the Multiplexer to all the components that need the query
pipe.connect("multiplexer.value", "embedder.text")
pipe.connect("multiplexer.value", "prompt_builder.question")
pipe.connect("multiplexer.value", "answer_builder.query")
pipe.connect("embedder.embedding", "retriever.query_embedding")
pipe.connect("retriever", "prompt_builder.documents")
pipe.connect("prompt_builder", "llm")
pipe.connect("llm.replies", "answer_builder.replies")
pipe.connect("llm.meta", "answer_builder.meta")
Running the Pipeline with a Multiplexer
Run the pipeline that you updated with a Multiplexer. This time, instead of passing the query to prompt_builder
, retriever
and answer_builder
seperately, you only need to pass it to the multiplexer
. As a result, you will get the same answer.
pipe.run({"multiplexer": {"value": "Where does Mark live?"}})
What’s next
๐ Congratulations! You’ve simplified your pipeline run with a Multiplexer!
If you liked this tutorial, there’s more to learn about Haystack 2.0:
- Creating a Hybrid Retrieval Pipeline
- Building Fallbacks to Websearch with Conditional Routing
- Model-Based Evaluation of RAG Pipelines
To stay up to date on the latest Haystack developments, you can sign up for our newsletter or join Haystack discord community.
Thanks for reading!