Skip to content

Conversation

@sjrl
Copy link
Contributor

@sjrl sjrl commented Jan 23, 2026

Related Issues

Proposed Changes:

Natively support connecting multiple outputs directly to another component without needing the user to define a Joiner component.

For example, a user may want to connect multiple output edges of the FileTypeRouter to all go to the same converter (e.g. ImageConverter). Before you would have to define a ListJoiner as an in-between component. The changes introduced in this PR would allow you to directly connect all outputs from the FileTypeRouter to the ImageConverter.

Here is an example

from haystack import Pipeline
from haystack.components.converters import HTMLToDocument, TextFileToDocument
from haystack.components.routers import FileTypeRouter
from haystack.components.writers import DocumentWriter
from haystack.dataclasses import ByteStream
from haystack.document_stores.in_memory import InMemoryDocumentStore

sources = [
    ByteStream.from_string(text="Text file content", mime_type="text/plain", meta={"file_type": "txt"}),
    ByteStream.from_string(
        text="\n<html><body>Some content</body></html>\n", mime_type="text/html", meta={"file_type": "html"},
    ),
]

doc_store = InMemoryDocumentStore()
pipe = Pipeline()

pipe.add_component("router", FileTypeRouter(mime_types=["text/plain", "text/html"]))
pipe.add_component("txt_converter", TextFileToDocument())
pipe.add_component("html_converter", HTMLToDocument())
pipe.add_component("writer", DocumentWriter(doc_store))

pipe.connect("router.text/plain", "txt_converter.sources")
pipe.connect("router.text/html", "html_converter.sources")
# The DocumentWriter accepts documents from both converters without needing a DocumentJoiner
pipe.connect("txt_converter.documents", "writer.documents")
pipe.connect("html_converter.documents", "writer.documents")

result = pipe.run({"router": {"sources": sources}})
# result["writer"]["documents_written"] == 2

How did you test it?

  • Added two new pipeline run tests

Notes for the reviewer

Checklist

  • I have read the contributors guidelines and the code of conduct.
  • I have updated the related issue with new insights and changes.
  • I have added unit tests and updated the docstrings.
  • I've used one of the conventional commit types for my PR title: fix:, feat:, build:, chore:, ci:, docs:, style:, refactor:, perf:, test: and added ! in case the PR includes breaking changes.
  • I have documented my code.
  • I have added a release note file, following the contributors guidelines.
  • I have run pre-commit hooks and fixed any issue.

@vercel
Copy link

vercel bot commented Jan 23, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Review Updated (UTC)
haystack-docs Ignored Ignored Preview Jan 28, 2026 1:16pm

Request Review

@coveralls
Copy link
Collaborator

coveralls commented Jan 23, 2026

Pull Request Test Coverage Report for Build 21361994530

Warning: This coverage report may be inaccurate.

This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.

Details

  • 0 of 0 changed or added relevant lines in 0 files are covered.
  • 58 unchanged lines in 3 files lost coverage.
  • Overall coverage decreased (-0.02%) to 92.431%

Files with Coverage Reduction New Missed Lines %
core/component/types.py 3 92.68%
core/pipeline/pipeline.py 3 93.16%
core/pipeline/base.py 52 86.68%
Totals Coverage Status
Change from base Build 21361849247: -0.02%
Covered Lines: 14763
Relevant Lines: 15972

💛 - Coveralls

@sjrl sjrl marked this pull request as ready for review January 28, 2026 08:10
@sjrl sjrl requested a review from a team as a code owner January 28, 2026 08:10
@sjrl sjrl requested review from julian-risch and removed request for a team January 28, 2026 08:10
@sjrl
Copy link
Contributor Author

sjrl commented Jan 28, 2026

@anakin87 you might also find this interesting to review given your work on the OutputAdapter issue

…add3da.yaml

Co-authored-by: Stefano Fiorucci <stefanofiorucci@gmail.com>
Comment on lines +583 to +591
# We automatically set the receiver socket as variadic if:
# - it has at least one sender already connected
# - it's not already variadic
# - its origin type is list
if not receiver_socket.is_variadic and _safe_get_origin(receiver_socket.type) == list:
receiver_socket.is_lazy_variadic = True
# We also disable wrapping inputs into list so the sender outputs matches the type of the receiver
# socket.
receiver_socket.wrap_input_in_list = False
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about making this it's own function so I could reuse it but it felt too short and unnecessarily obfusicated the logic of what was going on so I decided to leave as is

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we automatically set the receiver socket as variadic could that change the execution order of components in a pipeline in some edge cases? If we set it to variadic we need to postpone execution until we can be sure that there won't be any other inputs, right? Can we be sure that because of that change the component is not executed later than before ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that should all be fine. I'm utilizing the same logic that already exists. Since the execution queue is not filled until the pipeline is run this receiver is just treated like a prexisting variadic component. I added a behavioural test that copied a complicated joiner scenario with branches that didn't always fire and it passes.

Copy link
Contributor Author

@sjrl sjrl Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for changing the order of this receiver component compared to before. I don't think that would really be an issue since before, a joiner component would have lived right before the receiver so it wouldn't execute until the joiner fired anyway.

@@ -580,14 +579,26 @@ def connect(self, sender: str, receiver: str) -> "PipelineBase": # noqa: PLR091
# This is already connected, nothing to do
return self

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some tests that cover changes in this file to test/core/pipeline/test_pipeline_base.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added see this comment and this comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

If you merge main, Coveralls message should reflect the new coverage.

Comment on lines +914 to +923
if socket.senders and socket_name in component_inputs:
# We automatically set the receiver socket as lazy variadic if:
# - it has at least one sender already connected
# - it's not already variadic
# - its origin type is list
if not socket.is_variadic and _safe_get_origin(socket.type) == list:
socket.is_lazy_variadic = True
# We also disable wrapping inputs into list so the sender outputs matches the type of the
# receiver socket.
socket.wrap_input_in_list = False
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we make a component auto variadic if a user input is a second connection to a component.

@anakin87
Copy link
Member

I'd also explain this new feature in docs.

@sjrl
Copy link
Contributor Author

sjrl commented Jan 28, 2026

I'd also explain this new feature in docs.

Good idea, do you know where in the docs this would ideally go?

@anakin87
Copy link
Member

I'd also explain this new feature in docs.

Good idea, do you know where in the docs this would ideally go?

I'm not sure. Somewhere in https://docs.haystack.deepset.ai/docs/pipelines or in child pages.
(I'd expect then to expand that section with features I'm planning to implement.)

@sjrl
Copy link
Contributor Author

sjrl commented Jan 28, 2026

Writing it here so I don't forget, but a follow up idea would be to getting this to also work for components that have Any as their input type. E.g. ChatPromptBuilders. Where it would be cool that if both senders have the same type and the senders origin type are list then we could also do this auto lazy variadic update. An example I'm thinking of is sending two lists of docs from different sources to one input on a ChatPromptBuilder.

Comment on lines +1582 to +1589
class TestPipelineConnect:
def test_connect(self):
comp1 = component_class("Comp1", output_types={"value": int})()
comp2 = component_class("Comp2", input_types={"value": int})()
pipe = PipelineBase()
pipe.add_component("comp1", comp1)
pipe.add_component("comp2", comp2)
assert pipe.connect("comp1.value", "comp2.value") is pipe
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I collected all connect tests under this class

Comment on lines +1833 to +1838
def test_connect_auto_variadic(self):
@component
class ListAcceptor:
@component.output_types(result=list[int])
def run(self, numbers: list[int]) -> dict[str, list[int]]:
return {"result": numbers}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the new test I added to test the new auto-variadic behavior on pipeline connection

Comment on lines +1857 to +1862
class TestValidateInput:
def test_validate_input_wrong_comp_name(self):
pipe = PipelineBase()
pipe.add_component("comp", FakeComponent())
with pytest.raises(ValueError, match="Component named 'wrong_comp_name' not found in the pipeline."):
pipe.validate_input(data={"wrong_comp_name": {"input_": "test"}})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function looked to be completely untested at least from unit tests so I added quite few under this class

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@sjrl
Copy link
Contributor Author

sjrl commented Jan 28, 2026

I'd also explain this new feature in docs.

Good idea, do you know where in the docs this would ideally go?

I'm not sure. Somewhere in https://docs.haystack.deepset.ai/docs/pipelines or in child pages. (I'd expect then to expand that section with features I'm planning to implement.)

Is it fine if I do this in a separate PR? I can create a follow up issue

@sjrl sjrl requested a review from anakin87 January 28, 2026 13:19
@anakin87
Copy link
Member

I'd also explain this new feature in docs.

Good idea, do you know where in the docs this would ideally go?

I'm not sure. Somewhere in https://docs.haystack.deepset.ai/docs/pipelines or in child pages. (I'd expect then to expand that section with features I'm planning to implement.)

Is it fine if I do this in a separate PR? I can create a follow up issue

Ok!

Copy link
Member

@anakin87 anakin87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

I'd like @julian-risch to take a look too.

Copy link
Member

@julian-risch julian-risch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me too. I just would like to briefly discuss if this PR can change the order of execution of different components. Would be an edge case. Or maybe you already thought about it?

@sjrl sjrl requested review from anakin87 and julian-risch January 29, 2026 10:26
Copy link
Member

@anakin87 anakin87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@sjrl sjrl merged commit 41c6404 into main Jan 29, 2026
22 checks passed
@sjrl sjrl deleted the auto-variadic-input branch January 29, 2026 10:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants