Skip to content

Fix: Handle run_in_parallel=False, simplify pending function call tra… #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

markbackman
Copy link
Contributor

@markbackman markbackman commented Jun 13, 2025

…cking

#145 but on top of the pk/function-simplification-exploration branch. This now relies on Pipecat to do the pending function call tracking and removes the counter and tracking logic from Flows, which is a nice simplification.

DO NOT MERGE YET.

Requires an updated pipecat-ai version due to: pipecat-ai/pipecat#2006.

Copy link

vercel bot commented Jun 13, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Updated (UTC)
pipecat-flows ✅ Ready (Inspect) Visit Preview Jun 16, 2025 8:38pm

@markbackman markbackman requested a review from kompfner June 13, 2025 16:29
@markbackman markbackman force-pushed the mb/function-simplified-handle-parallel-calls branch from f0490bf to 462b7d6 Compare June 13, 2025 16:52
Base automatically changed from pk/function-simplification-exploration to main June 13, 2025 21:10
@markbackman markbackman force-pushed the mb/function-simplified-handle-parallel-calls branch from 462b7d6 to 3ac577b Compare June 14, 2025 01:47
@markbackman
Copy link
Contributor Author

@kompfner this is ready for review.

logger.debug(
f"{'Transition-only function called for' if is_transition_only_function else 'Function handler completed for'} {name}"
)

# For edge functions, prevent LLM completion until transition (run_llm=False)
# For node functions, allow immediate completion (run_llm=True)
# Determine if this is an edge function
has_explicit_transition = bool(transition_to) or bool(transition_callback)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit: the only reason we were storing has_explicit_transition before was to help determine how to invoke on_context_updated_edge() (though, looking back on it, I don't think it was necessary). If we don't need this var anymore, we can just get rid of it and just do:

is_edge_function = bool(next_node) or bool(transition_to) or bool(transition_callback)

Otherwise the reader might think it's significant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this 🙇

else:
# New style: (args, result, flow_manager)
await transition_callback(arguments, result, self)
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, looks like here you removed some duplicate logic that had been invoking result_callback with an error, right? It's logic that we already have in. transiition_func(), right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I think this is the right call. transition_func() is responsible for the actual function call. That leaves _execute_transition to transition nodes. It's possible to have a successful function calls and a failed transition. Given that, I think this makes sense.

@@ -614,6 +587,8 @@ async def _set_node(self, node_id: str, node_config: NodeConfig) -> None:
raise FlowTransitionError(f"{self.__class__.__name__} must be initialized first")

try:
self._pending_transition = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we clear _pending_transition here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asking because it seems like we already clear it in _execute_transition().

Copy link
Contributor Author

@markbackman markbackman Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can still call set_node directly, which is probably less common as something that would happen outside of the context of a transition callback (or call set_node_from_config). It seems safer (maybe for now only) to reset in both places. I don't feel strongly about it, but that's why I added it. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. You want it so that if somehow the developer directly invoked set_node() from outside the context of an ongoing tool-call-based transition, then it would effectively cancel it. Is my understanding right?

If so, might be worth a comment above this line to provide rationale.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. I think that makes sense, as an explicitly called function should take precedent. I'll add a comment.

],
}

await flow_manager._set_node("test", node_config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't update all the _set_node calls in my last PR (replacing them with set_node_from_config(), but I probably ought to have...I think we should prefer to not add more private method calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice, you not only updated this line, but a bunch of others as well. Going the extra mile—thanks!

Copy link
Contributor

@kompfner kompfner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of minor nits, but otherwise looks great!


# Check if all function calls are complete using Pipecat's state
assistant_aggregator = self._context_aggregator.assistant()
if not assistant_aggregator.has_function_calls_in_progress:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to bump the minimum pipecat-ai version requirement in pyproject.toml to pick up this functionality.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed! We should hold off on merging this PR until the pipecat-ai release is cut.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants