Skip to content

Fix: Handle run_in_parallel=False, simplify pending function call tra… #145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
you to either omit `functions` for nodes, which is common for the end node,
or specify an empty function call list, if desired.

### Fixed

- Fixed an issue where if run_in_parallel=False was set for the LLM, the bot
would trigger N completions for each sequential function call. Now, Flows
uses Pipecat's internal function tracking to determine when there are more
edge functions to call.

## [0.0.17] - 2025-05-16

### Added
Expand Down
133 changes: 48 additions & 85 deletions src/pipecat_flows/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def __init__(
self.adapter = create_adapter(llm)
self.initialized = False
self._context_aggregator = context_aggregator
self._pending_function_calls = 0
self._pending_transition: Optional[Dict[str, Any]] = None
self._context_strategy = context_strategy or ContextStrategyConfig(
strategy=ContextStrategy.APPEND
)
Expand Down Expand Up @@ -260,96 +260,52 @@ async def _call_handler(self, handler: FunctionHandler, args: FlowArgs) -> FlowR
# Modern handler with args and flow_manager
return await handler(args, self)

async def _check_and_execute_transition(self) -> None:
"""Check if all functions are complete and execute transition if so."""
if not self._pending_transition:
return

# Check if all function calls are complete using Pipecat's state
assistant_aggregator = self._context_aggregator.assistant()
if not assistant_aggregator._function_calls_in_progress:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't usually like relying on private state, as implementation can change at any time. If we want Pipecat to be telling us whether function calls are still in progress, should we first expose that as part of the API before relying on it here?

Copy link
Contributor Author

@markbackman markbackman Jun 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we have to. Currently, only Pipecat—in its internal state—knows about which function calls are in progress. I'll expose this via Pipecat in a separate PR.

# All functions complete, execute transition
transition_info = self._pending_transition
self._pending_transition = None

if transition_info["transition_to"]:
await self.set_node(
transition_info["transition_to"], self.nodes[transition_info["transition_to"]]
)
elif transition_info["transition_callback"]:
callback = transition_info["transition_callback"]
sig = inspect.signature(callback)
if len(sig.parameters) == 2:
await callback(transition_info["arguments"], self)
else:
await callback(transition_info["arguments"], transition_info["result"], self)

async def _create_transition_func(
self,
name: str,
handler: Optional[Callable],
transition_to: Optional[str],
transition_callback: Optional[Callable] = None,
) -> Callable:
"""Create a transition function for the given name and handler.

Args:
name: Name of the function being registered
handler: Optional function to process data
transition_to: Optional node to transition to (static flows)
transition_callback: Optional callback for dynamic transitions

Returns:
Callable: Async function that handles the tool invocation

Raises:
ValueError: If both transition_to and transition_callback are specified
"""
"""Create a transition function for the given name and handler."""
if transition_to and transition_callback:
raise ValueError(
f"Function {name} cannot have both transition_to and transition_callback"
)

# Validate transition callback if provided
if transition_callback:
self._validate_transition_callback(name, transition_callback)

is_edge_function = bool(transition_to) or bool(transition_callback)

def decrease_pending_function_calls() -> None:
"""Decrease the pending function calls counter if greater than zero."""
if self._pending_function_calls > 0:
self._pending_function_calls -= 1
logger.debug(
f"Function call completed: {name} (remaining: {self._pending_function_calls})"
)

async def on_context_updated_edge(
args: Dict[str, Any], result: Any, result_callback: Callable
) -> None:
"""Handle context updates for edge functions with transitions."""
try:
decrease_pending_function_calls()

# Only process transition if this was the last pending call
if self._pending_function_calls == 0:
if transition_to: # Static flow
logger.debug(f"Static transition to: {transition_to}")
await self.set_node(transition_to, self.nodes[transition_to])
elif transition_callback: # Dynamic flow
logger.debug(f"Dynamic transition for: {name}")
# Check callback signature
sig = inspect.signature(transition_callback)
if len(sig.parameters) == 2:
# Old style: (args, flow_manager)
await transition_callback(args, self)
else:
# New style: (args, result, flow_manager)
await transition_callback(args, result, self)
# Reset counter after transition completes
self._pending_function_calls = 0
logger.debug("Reset pending function calls counter")
else:
logger.debug(
f"Skipping transition, {self._pending_function_calls} calls still pending"
)
except Exception as e:
logger.error(f"Error in transition: {str(e)}")
self._pending_function_calls = 0
await result_callback(
{"status": "error", "error": str(e)},
properties=None, # Clear properties to prevent further callbacks
)
raise # Re-raise to prevent further processing

async def on_context_updated_node() -> None:
"""Handle context updates for node functions without transitions."""
decrease_pending_function_calls()

async def transition_func(params: FunctionCallParams) -> None:
"""Inner function that handles the actual tool invocation."""
try:
# Track pending function call
self._pending_function_calls += 1
logger.debug(
f"Function call pending: {name} (total: {self._pending_function_calls})"
)
logger.debug(f"Function called: {name}")

# Execute handler if present
if handler:
Expand All @@ -359,25 +315,30 @@ async def transition_func(params: FunctionCallParams) -> None:
result = {"status": "acknowledged"}
logger.debug(f"Function called without handler: {name}")

# For edge functions, prevent LLM completion until transition (run_llm=False)
# For node functions, allow immediate completion (run_llm=True)
async def on_context_updated() -> None:
if is_edge_function:
await on_context_updated_edge(
params.arguments, result, params.result_callback
)
else:
await on_context_updated_node()
if is_edge_function:
# Store transition info
self._pending_transition = {
"transition_to": transition_to,
"transition_callback": transition_callback,
"function_name": name,
"arguments": params.arguments,
"result": result,
}

properties = FunctionCallResultProperties(
run_llm=False,
on_context_updated=self._check_and_execute_transition,
)
else:
properties = FunctionCallResultProperties(
run_llm=True,
on_context_updated=None,
)

properties = FunctionCallResultProperties(
run_llm=not is_edge_function,
on_context_updated=on_context_updated,
)
await params.result_callback(result, properties=properties)

except Exception as e:
logger.error(f"Error in transition function {name}: {str(e)}")
self._pending_function_calls = 0
error_result = {"status": "error", "error": str(e)}
await params.result_callback(error_result)

Expand Down Expand Up @@ -479,6 +440,8 @@ async def set_node(self, node_id: str, node_config: NodeConfig) -> None:
raise FlowTransitionError(f"{self.__class__.__name__} must be initialized first")

try:
self._pending_transition = None

self._validate_node_config(node_id, node_config)
logger.debug(f"Setting node: {node_id}")

Expand Down