Skip to content

Fix: Handle run_in_parallel=False, simplify pending function call tra… #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Fixed an issue where if `run_in_parallel=False` was set for the LLM, the bot
would trigger N completions for each sequential function call. Now, Flows
uses Pipecat's internal function tracking to determine when there are more
edge functions to call.

- Overhauled `pre_actions` and `post_actions` timing logic, making their timing more predictable and
eliminating some bugs. For example, now `tts_say` actions will always run after the bot response,
when used in `post_actions`.
Expand Down
192 changes: 86 additions & 106 deletions src/pipecat_flows/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def __init__(
self.adapter = create_adapter(llm)
self.initialized = False
self._context_aggregator = context_aggregator
self._pending_function_calls = 0
self._pending_transition: Optional[Dict[str, Any]] = None
self._context_strategy = context_strategy or ContextStrategyConfig(
strategy=ContextStrategy.APPEND
)
Expand Down Expand Up @@ -329,87 +329,10 @@ async def _create_transition_func(
if transition_callback:
self._validate_transition_callback(name, transition_callback)

def decrease_pending_function_calls() -> None:
"""Decrease the pending function calls counter if greater than zero."""
if self._pending_function_calls > 0:
self._pending_function_calls -= 1
logger.debug(
f"Function call completed: {name} (remaining: {self._pending_function_calls})"
)

async def on_context_updated_edge(
next_node: Optional[NodeConfig | str],
args: Optional[Dict[str, Any]],
result: Optional[Any],
result_callback: Callable,
) -> None:
"""
Handle context updates for edge functions with transitions.

If `next_node` is provided:
- Ignore `args` and `result` and just transition to it.

Otherwise, if `transition_to` is available:
- Use it to look up the next node.

Otherwise, if `transition_callback` is provided:
- Call it with `args` and `result` to determine the next node.
"""
try:
decrease_pending_function_calls()

# Only process transition if this was the last pending call
if self._pending_function_calls == 0:
if next_node: # Function-returned next node (as opposed to next node specified by transition_*)
if isinstance(next_node, str): # Static flow
node_name = next_node
node = self.nodes[next_node]
else: # Dynamic flow
node_name = get_or_generate_node_name(next_node)
node = next_node
logger.debug(f"Transition to function-returned node: {node_name}")
await self._set_node(node_name, node)
elif transition_to: # Static flow
logger.debug(f"Static transition to: {transition_to}")
await self._set_node(transition_to, self.nodes[transition_to])
elif transition_callback: # Dynamic flow
logger.debug(f"Dynamic transition for: {name}")
# Check callback signature
sig = inspect.signature(transition_callback)
if len(sig.parameters) == 2:
# Old style: (args, flow_manager)
await transition_callback(args, self)
else:
# New style: (args, result, flow_manager)
await transition_callback(args, result, self)
# Reset counter after transition completes
self._pending_function_calls = 0
logger.debug("Reset pending function calls counter")
else:
logger.debug(
f"Skipping transition, {self._pending_function_calls} calls still pending"
)
except Exception as e:
logger.error(f"Error in transition: {str(e)}")
self._pending_function_calls = 0
await result_callback(
{"status": "error", "error": str(e)},
properties=None, # Clear properties to prevent further callbacks
)
raise # Re-raise to prevent further processing

async def on_context_updated_node() -> None:
"""Handle context updates for node functions without transitions."""
decrease_pending_function_calls()

async def transition_func(params: FunctionCallParams) -> None:
"""Inner function that handles the actual tool invocation."""
try:
# Track pending function call
self._pending_function_calls += 1
logger.debug(
f"Function call pending: {name} (total: {self._pending_function_calls})"
)
logger.debug(f"Function called: {name}")

# Execute handler if present
is_transition_only_function = False
Expand Down Expand Up @@ -439,47 +362,98 @@ async def transition_func(params: FunctionCallParams) -> None:
result = acknowledged_result
next_node = None
is_transition_only_function = True

logger.debug(
f"{'Transition-only function called for' if is_transition_only_function else 'Function handler completed for'} {name}"
)

# For edge functions, prevent LLM completion until transition (run_llm=False)
# For node functions, allow immediate completion (run_llm=True)
has_explicit_transition = bool(transition_to) or bool(transition_callback)

async def on_context_updated() -> None:
if next_node:
await on_context_updated_edge(
next_node=next_node,
args=None,
result=None,
result_callback=params.result_callback,
)
elif has_explicit_transition:
await on_context_updated_edge(
next_node=None,
args=params.arguments,
result=result,
result_callback=params.result_callback,
)
else:
await on_context_updated_node()

is_edge_function = bool(next_node) or has_explicit_transition
properties = FunctionCallResultProperties(
run_llm=not is_edge_function,
on_context_updated=on_context_updated,
# Determine if this is an edge function
is_edge_function = (
bool(next_node) or bool(transition_to) or bool(transition_callback)
)

if is_edge_function:
# Store transition info for coordinated execution
transition_info = {
"next_node": next_node,
"transition_to": transition_to,
"transition_callback": transition_callback,
"function_name": name,
"arguments": params.arguments,
"result": result,
}
self._pending_transition = transition_info

properties = FunctionCallResultProperties(
run_llm=False, # Don't run LLM until transition completes
on_context_updated=self._check_and_execute_transition,
)
else:
# Node function - run LLM immediately
properties = FunctionCallResultProperties(
run_llm=True,
on_context_updated=None,
)

await params.result_callback(result, properties=properties)

except Exception as e:
logger.error(f"Error in transition function {name}: {str(e)}")
self._pending_function_calls = 0
error_result = {"status": "error", "error": str(e)}
await params.result_callback(error_result)

return transition_func

async def _check_and_execute_transition(self) -> None:
"""Check if all functions are complete and execute transition if so."""
if not self._pending_transition:
return

# Check if all function calls are complete using Pipecat's state
assistant_aggregator = self._context_aggregator.assistant()
if not assistant_aggregator.has_function_calls_in_progress:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to bump the minimum pipecat-ai version requirement in pyproject.toml to pick up this functionality.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed! We should hold off on merging this PR until the pipecat-ai release is cut.

# All functions complete, execute transition
transition_info = self._pending_transition
self._pending_transition = None

await self._execute_transition(transition_info)

async def _execute_transition(self, transition_info: Dict[str, Any]) -> None:
"""Execute the stored transition."""
next_node = transition_info.get("next_node")
transition_to = transition_info.get("transition_to")
transition_callback = transition_info.get("transition_callback")
function_name = transition_info.get("function_name")
arguments = transition_info.get("arguments")
result = transition_info.get("result")

try:
if next_node: # Function-returned next node (consolidated function)
if isinstance(next_node, str): # Static flow
node_name = next_node
node = self.nodes[next_node]
else: # Dynamic flow
node_name = get_or_generate_node_name(next_node)
node = next_node
logger.debug(f"Transition to function-returned node: {node_name}")
await self._set_node(node_name, node)
elif transition_to: # Static flow (deprecated)
logger.debug(f"Static transition to: {transition_to}")
await self._set_node(transition_to, self.nodes[transition_to])
elif transition_callback: # Dynamic flow (deprecated)
logger.debug(f"Dynamic transition for: {function_name}")
# Check callback signature
sig = inspect.signature(transition_callback)
if len(sig.parameters) == 2:
# Old style: (args, flow_manager)
await transition_callback(arguments, self)
else:
# New style: (args, result, flow_manager)
await transition_callback(arguments, result, self)
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, looks like here you removed some duplicate logic that had been invoking result_callback with an error, right? It's logic that we already have in. transiition_func(), right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I think this is the right call. transition_func() is responsible for the actual function call. That leaves _execute_transition to transition nodes. It's possible to have a successful function calls and a failed transition. Given that, I think this makes sense.

logger.error(f"Error executing transition: {str(e)}")
raise

def _lookup_function(self, func_name: str) -> Callable:
"""Look up a function by name in the main module.

Expand Down Expand Up @@ -614,6 +588,12 @@ async def _set_node(self, node_id: str, node_config: NodeConfig) -> None:
raise FlowTransitionError(f"{self.__class__.__name__} must be initialized first")

try:
# Clear any pending transition state when starting a new node
# This ensures clean state regardless of how we arrived here:
# - Normal transition flow (already cleared in _check_and_execute_transition)
# - Direct calls to set_node/set_node_from_config
self._pending_transition = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we clear _pending_transition here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asking because it seems like we already clear it in _execute_transition().

Copy link
Contributor Author

@markbackman markbackman Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can still call set_node directly, which is probably less common as something that would happen outside of the context of a transition callback (or call set_node_from_config). It seems safer (maybe for now only) to reset in both places. I don't feel strongly about it, but that's why I added it. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. You want it so that if somehow the developer directly invoked set_node() from outside the context of an ongoing tool-call-based transition, then it would effectively cancel it. Is my understanding right?

If so, might be worth a comment above this line to provide rationale.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. I think that makes sense, as an explicitly called function should take precedent. I'll add a comment.


self._validate_node_config(node_id, node_config)
logger.debug(f"Setting node: {node_id}")

Expand Down
Loading