From 8bec65d1c5bb2c6dc877d5ff6d89c6c8b445b4bd Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Fri, 13 Jun 2025 11:25:47 -0400 Subject: [PATCH] Fix: Handle run_in_parallel=False, simplify pending function call tracking --- CHANGELOG.md | 7 ++ src/pipecat_flows/manager.py | 133 +++++++++++++---------------------- 2 files changed, 55 insertions(+), 85 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 411b2ea..114a9ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 you to either omit `functions` for nodes, which is common for the end node, or specify an empty function call list, if desired. +### Fixed + +- Fixed an issue where if run_in_parallel=False was set for the LLM, the bot + would trigger N completions for each sequential function call. Now, Flows + uses Pipecat's internal function tracking to determine when there are more + edge functions to call. + ## [0.0.17] - 2025-05-16 ### Added diff --git a/src/pipecat_flows/manager.py b/src/pipecat_flows/manager.py index eb061eb..4d537c4 100644 --- a/src/pipecat_flows/manager.py +++ b/src/pipecat_flows/manager.py @@ -114,7 +114,7 @@ def __init__( self.adapter = create_adapter(llm) self.initialized = False self._context_aggregator = context_aggregator - self._pending_function_calls = 0 + self._pending_transition: Optional[Dict[str, Any]] = None self._context_strategy = context_strategy or ContextStrategyConfig( strategy=ContextStrategy.APPEND ) @@ -260,6 +260,30 @@ async def _call_handler(self, handler: FunctionHandler, args: FlowArgs) -> FlowR # Modern handler with args and flow_manager return await handler(args, self) + async def _check_and_execute_transition(self) -> None: + """Check if all functions are complete and execute transition if so.""" + if not self._pending_transition: + return + + # Check if all function calls are complete using Pipecat's state + assistant_aggregator = self._context_aggregator.assistant() + if not assistant_aggregator._function_calls_in_progress: + # All functions complete, execute transition + transition_info = self._pending_transition + self._pending_transition = None + + if transition_info["transition_to"]: + await self.set_node( + transition_info["transition_to"], self.nodes[transition_info["transition_to"]] + ) + elif transition_info["transition_callback"]: + callback = transition_info["transition_callback"] + sig = inspect.signature(callback) + if len(sig.parameters) == 2: + await callback(transition_info["arguments"], self) + else: + await callback(transition_info["arguments"], transition_info["result"], self) + async def _create_transition_func( self, name: str, @@ -267,89 +291,21 @@ async def _create_transition_func( transition_to: Optional[str], transition_callback: Optional[Callable] = None, ) -> Callable: - """Create a transition function for the given name and handler. - - Args: - name: Name of the function being registered - handler: Optional function to process data - transition_to: Optional node to transition to (static flows) - transition_callback: Optional callback for dynamic transitions - - Returns: - Callable: Async function that handles the tool invocation - - Raises: - ValueError: If both transition_to and transition_callback are specified - """ + """Create a transition function for the given name and handler.""" if transition_to and transition_callback: raise ValueError( f"Function {name} cannot have both transition_to and transition_callback" ) - # Validate transition callback if provided if transition_callback: self._validate_transition_callback(name, transition_callback) is_edge_function = bool(transition_to) or bool(transition_callback) - def decrease_pending_function_calls() -> None: - """Decrease the pending function calls counter if greater than zero.""" - if self._pending_function_calls > 0: - self._pending_function_calls -= 1 - logger.debug( - f"Function call completed: {name} (remaining: {self._pending_function_calls})" - ) - - async def on_context_updated_edge( - args: Dict[str, Any], result: Any, result_callback: Callable - ) -> None: - """Handle context updates for edge functions with transitions.""" - try: - decrease_pending_function_calls() - - # Only process transition if this was the last pending call - if self._pending_function_calls == 0: - if transition_to: # Static flow - logger.debug(f"Static transition to: {transition_to}") - await self.set_node(transition_to, self.nodes[transition_to]) - elif transition_callback: # Dynamic flow - logger.debug(f"Dynamic transition for: {name}") - # Check callback signature - sig = inspect.signature(transition_callback) - if len(sig.parameters) == 2: - # Old style: (args, flow_manager) - await transition_callback(args, self) - else: - # New style: (args, result, flow_manager) - await transition_callback(args, result, self) - # Reset counter after transition completes - self._pending_function_calls = 0 - logger.debug("Reset pending function calls counter") - else: - logger.debug( - f"Skipping transition, {self._pending_function_calls} calls still pending" - ) - except Exception as e: - logger.error(f"Error in transition: {str(e)}") - self._pending_function_calls = 0 - await result_callback( - {"status": "error", "error": str(e)}, - properties=None, # Clear properties to prevent further callbacks - ) - raise # Re-raise to prevent further processing - - async def on_context_updated_node() -> None: - """Handle context updates for node functions without transitions.""" - decrease_pending_function_calls() - async def transition_func(params: FunctionCallParams) -> None: """Inner function that handles the actual tool invocation.""" try: - # Track pending function call - self._pending_function_calls += 1 - logger.debug( - f"Function call pending: {name} (total: {self._pending_function_calls})" - ) + logger.debug(f"Function called: {name}") # Execute handler if present if handler: @@ -359,25 +315,30 @@ async def transition_func(params: FunctionCallParams) -> None: result = {"status": "acknowledged"} logger.debug(f"Function called without handler: {name}") - # For edge functions, prevent LLM completion until transition (run_llm=False) - # For node functions, allow immediate completion (run_llm=True) - async def on_context_updated() -> None: - if is_edge_function: - await on_context_updated_edge( - params.arguments, result, params.result_callback - ) - else: - await on_context_updated_node() + if is_edge_function: + # Store transition info + self._pending_transition = { + "transition_to": transition_to, + "transition_callback": transition_callback, + "function_name": name, + "arguments": params.arguments, + "result": result, + } + + properties = FunctionCallResultProperties( + run_llm=False, + on_context_updated=self._check_and_execute_transition, + ) + else: + properties = FunctionCallResultProperties( + run_llm=True, + on_context_updated=None, + ) - properties = FunctionCallResultProperties( - run_llm=not is_edge_function, - on_context_updated=on_context_updated, - ) await params.result_callback(result, properties=properties) except Exception as e: logger.error(f"Error in transition function {name}: {str(e)}") - self._pending_function_calls = 0 error_result = {"status": "error", "error": str(e)} await params.result_callback(error_result) @@ -479,6 +440,8 @@ async def set_node(self, node_id: str, node_config: NodeConfig) -> None: raise FlowTransitionError(f"{self.__class__.__name__} must be initialized first") try: + self._pending_transition = None + self._validate_node_config(node_id, node_config) logger.debug(f"Setting node: {node_id}")