Skip to content

Mb/func simple handle run in parallel #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
b5f12e6
[WIP] Initial progress on “direct functions”: the ability to directly…
kompfner May 21, 2025
643fd8f
[WIP] More progress on “direct functions”
kompfner May 21, 2025
59c2a33
[WIP] More progress on “direct functions”
kompfner May 21, 2025
c991f8d
[WIP] More progress on “direct functions”
kompfner May 21, 2025
e02fc99
[WIP] More progress on “direct functions”
kompfner May 21, 2025
d4496b5
[WIP] More progress on “direct functions”
kompfner May 21, 2025
44d579f
[WIP] More progress on “direct functions”
kompfner May 21, 2025
244e618
[WIP] More progress on “direct functions”: union with `None` should n…
kompfner May 21, 2025
a09ca72
[WIP] More progress on “direct functions”: read parameter description…
kompfner May 21, 2025
205b133
[WIP] More progress on “direct functions”
kompfner May 22, 2025
d663e8b
[WIP] More progress on “direct functions”
kompfner May 22, 2025
6399351
[WIP] More progress on “direct functions”: direct function calls are …
kompfner May 22, 2025
7cecece
[WIP] More progress on “direct functions”
kompfner May 22, 2025
d1b3bd7
[WIP] More progress on “direct functions”: fix broken unit test
kompfner May 22, 2025
b165475
[WIP] More progress on “direct functions”
kompfner May 23, 2025
1ea7add
[WIP] More progress on “direct functions”: split apart the concepts o…
kompfner May 23, 2025
31f9206
[WIP] More progress on “direct functions”: fix direct functions tests
kompfner May 23, 2025
74492d1
[WIP] More progress on “direct functions”: support returning from a u…
kompfner May 23, 2025
2feb060
[WIP] More progress on “direct functions”: add `FlowsDirectFunction` …
kompfner May 23, 2025
45fa831
[WIP] More progress on “direct functions”: some validation
kompfner May 23, 2025
c6ecfb4
[WIP] More progress on “direct functions”: some validation
kompfner May 23, 2025
4b35aa1
[WIP] More progress on “direct functions”: add tests for validation
kompfner May 23, 2025
d21d895
[WIP] More progress on “direct functions”: fix up and add tests for T…
kompfner May 23, 2025
00ccc4c
[WIP] More progress on “direct functions”: make `flow_manager` the fi…
kompfner May 28, 2025
a2425aa
[WIP] More progress on “direct functions”: add additional test for ch…
kompfner May 28, 2025
ca2427c
[WIP] More progress on “direct functions”: add test for `FlowsDirectF…
kompfner May 28, 2025
90fa1c3
[WIP] More progress on “direct functions”: return more realistic `Flo…
kompfner May 28, 2025
59b9850
[WIP] More progress on “direct functions”: support static flows
kompfner May 28, 2025
b983500
[WIP] More progress on “direct functions”: some typing fixes
kompfner May 29, 2025
daef6fa
[WIP] More progress on “direct functions”: some typing fixes
kompfner May 29, 2025
c317da7
[WIP] More progress on “direct functions”: add support for direct fun…
kompfner May 29, 2025
7046793
[WIP] More progress on “direct functions”: rename "unified" functions…
kompfner May 29, 2025
04965ee
[WIP] More progress on “direct functions”: remove comment
kompfner May 29, 2025
1f69ce7
[WIP] More progress on “direct functions”: some minor cleanup
kompfner May 29, 2025
6361de5
[WIP] More progress on “direct functions”: changelog
kompfner May 29, 2025
bd43742
[WIP] More progress on “direct functions”: test tweak to better exerc…
kompfner May 29, 2025
33abc74
[WIP] More progress on “direct functions”: add optional `name` field …
kompfner May 30, 2025
8f2e1ed
[WIP] More progress on “direct functions”: deprecate `transition_to` …
kompfner May 30, 2025
538cf83
[WIP] More progress on “direct functions”: update CHANGELOG
kompfner May 30, 2025
4907c5d
[WIP] More progress on “direct functions”: `initialize()` should take…
kompfner May 30, 2025
758602a
[WIP] More progress on “direct functions”: deprecate `set_node()`
kompfner May 30, 2025
e34c386
Revert "[WIP] More progress on “direct functions”: deprecate `set_nod…
kompfner May 30, 2025
b910b31
[WIP] More progress on “direct functions”: update examples to use new…
kompfner May 30, 2025
ea5d9fb
[WIP] More progress on “direct functions”: remove unused import
kompfner May 31, 2025
8da25bf
[WIP] More progress on “direct functions”: Add `set_node_from_config(…
kompfner May 31, 2025
1d913b6
[WIP] More progress on “direct functions”: deprecate `set_node()` (ta…
kompfner May 31, 2025
c7c998c
Fix: Handle run_in_parallel=False, simplify pending function call tra…
markbackman Jun 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,113 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Addded a new optional `name` field to `NodeConfig`. When using dynamic flows alongside
"consolidated" functions that return a tuple (result, next node), giving the next node a `name` is
helpful for debug logging. If you don't specify a `name`, an automatically-generated UUID is used.

- Added support for providing "consolidated" functions, which are responsible
for both doing some work as well as specifying the next node to transition
to. When using consolidated functions, you don't specify `transition_to` or
`transition_callback`.

Usage:

```python
# "Consolidated" function
async def do_something(args: FlowArgs) -> tuple[FlowResult, NodeConfig]:
foo = args["foo"]
bar = args.get("bar", "")

# Do some work (optional; this function may be a transition-only function)
result = await process(foo, bar)

# Specify next node (optional; this function may be a work-only function)
# This is either a NodeConfig (for dynamic flows) or a node name (for static flows)
next_node = create_another_node()

return result, next_node

def create_a_node() -> NodeConfig:
return NodeConfig(
task_messages=[
# ...
],
functions=[FlowsFunctionSchema(
name="do_something",
description="Do something interesting.",
handler=do_something,
properties={
"foo": {
"type": "integer",
"description": "The foo to do something interesting with."
},
"bar": {
"type": "string",
"description": "The bar to do something interesting with."
}
},
required=["foo"],
)],
)
```

- Added support for providing "direct" functions, which don't need an
accompanying `FlowsFunctionSchema` or function definition dict. Instead,
metadata (i.e. `name`, `description`, `properties`, and `required`) are
automatically extracted from a combination of the function signature and
docstring.

Usage:

```python
# "Direct" function
# `flow_manager` must be the first parameter
async def do_something(flow_manager: FlowManager, foo: int, bar: str = "") -> tuple[FlowResult, NodeConfig]:
"""
Do something interesting.

Args:
foo (int): The foo to do something interesting with.
bar (string): The bar to do something interesting with.
"""

# Do some work (optional; this function may be a transition-only function)
result = await process(foo, bar)

# Specify next node (optional; this function may be a work-only function)
# This is either a NodeConfig (for dynamic flows) or a node name (for static flows)
next_node = create_another_node()

return result, next_node

def create_a_node() -> NodeConfig:
return NodeConfig(
task_messages=[
# ...
],
functions=[do_something]
)
```

### Deprecated

- Deprecated `transition_to` and `transition_callback` in favor of "consolidated" `handler`s that
return a tuple (result, next node). Alternatively, you could use "direct" functions and avoid
using `FlowsFunctionSchema`s or function definition dicts entirely. See the "Added" section above
for more details.

- Deprecated `set_node()` in favor of doing the following for dynamic flows:

- Prefer "consolidated" or "direct" functions that return a tuple (result, next node) over
deprecated `transition_callback`s
- Pass your initial node to `FlowManager.initialize()`
- If you really need to set a node explicitly, use `set_node_from_config()`

In all of these cases, you can provide a `name` in your new node's config for debug logging
purposes.

### Changed

- `functions` are now optional in the `NodeConfig`. Additionally, for AWS
Expand All @@ -15,6 +122,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
you to either omit `functions` for nodes, which is common for the end node,
or specify an empty function call list, if desired.

### Fixed

- Fixed an issue where if `run_in_parallel=False` was set for the LLM, the bot
would trigger N completions for each sequential function call. Now, Flows
uses Pipecat's internal function tracking to determine when there are more
edge functions to call.

## [0.0.17] - 2025-05-16

### Added
Expand Down
94 changes: 43 additions & 51 deletions examples/dynamic/insurance_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import os
import sys
from pathlib import Path
from typing import Dict, TypedDict, Union
from typing import TypedDict, Union

import aiohttp
from dotenv import load_dotenv
Expand Down Expand Up @@ -88,22 +88,37 @@ class CoverageUpdateResult(FlowResult, InsuranceQuote):


# Function handlers
async def collect_age(args: FlowArgs) -> AgeCollectionResult:
"""Process age collection."""
async def collect_age(
args: FlowArgs, flow_manager: FlowManager
) -> tuple[AgeCollectionResult, NodeConfig]:
"""Process age collection and return next node name."""
age = args["age"]
logger.debug(f"collect_age handler executing with age: {age}")
return AgeCollectionResult(age=age)

flow_manager.state["age"] = age
result = AgeCollectionResult(age=age)

async def collect_marital_status(args: FlowArgs) -> MaritalStatusResult:
"""Process marital status collection."""
next_node = create_marital_status_node()

return result, next_node


async def collect_marital_status(
args: FlowArgs, flow_manager: FlowManager
) -> tuple[MaritalStatusResult, NodeConfig]:
"""Process marital status collection and return next node name."""
status = args["marital_status"]
logger.debug(f"collect_marital_status handler executing with status: {status}")
return MaritalStatusResult(marital_status=status)

result = MaritalStatusResult(marital_status=status)

next_node = create_quote_calculation_node(flow_manager.state["age"], status)

return result, next_node

async def calculate_quote(args: FlowArgs) -> QuoteCalculationResult:
"""Calculate insurance quote based on age and marital status."""

async def calculate_quote(args: FlowArgs) -> tuple[QuoteCalculationResult, NodeConfig]:
"""Calculate insurance quote based on age and marital status, return next node name."""
age = args["age"]
marital_status = args["marital_status"]
logger.debug(f"calculate_quote handler executing with age: {age}, status: {marital_status}")
Expand All @@ -116,15 +131,17 @@ async def calculate_quote(args: FlowArgs) -> QuoteCalculationResult:
# Calculate quote
monthly_premium = rates["base_rate"] * rates["risk_multiplier"]

return {
result = {
"monthly_premium": monthly_premium,
"coverage_amount": 250000,
"deductible": 1000,
}
next_node = create_quote_results_node(result)
return result, next_node


async def update_coverage(args: FlowArgs) -> CoverageUpdateResult:
"""Update coverage options and recalculate premium."""
async def update_coverage(args: FlowArgs) -> tuple[CoverageUpdateResult, NodeConfig]:
"""Update coverage options and recalculate premium, return next node name."""
coverage_amount = args["coverage_amount"]
deductible = args["deductible"]
logger.debug(
Expand All @@ -136,51 +153,28 @@ async def update_coverage(args: FlowArgs) -> CoverageUpdateResult:
if deductible > 1000:
monthly_premium *= 0.9 # 10% discount for higher deductible

return {
result = {
"monthly_premium": monthly_premium,
"coverage_amount": coverage_amount,
"deductible": deductible,
}
next_node = create_quote_results_node(result)
return result, next_node


async def end_quote() -> FlowResult:
"""Handle quote completion."""
async def end_quote(args: FlowArgs) -> tuple[FlowResult, str]:
"""Handle quote completion and return next node name."""
logger.debug("end_quote handler executing")
return {"status": "completed"}


# Transition callbacks and handlers
async def handle_age_collection(args: Dict, result: AgeCollectionResult, flow_manager: FlowManager):
flow_manager.state["age"] = result["age"]
await flow_manager.set_node("marital_status", create_marital_status_node())


async def handle_marital_status_collection(
args: Dict, result: MaritalStatusResult, flow_manager: FlowManager
):
flow_manager.state["marital_status"] = result["marital_status"]
await flow_manager.set_node(
"quote_calculation",
create_quote_calculation_node(
flow_manager.state["age"], flow_manager.state["marital_status"]
),
)


async def handle_quote_calculation(
args: Dict, result: QuoteCalculationResult, flow_manager: FlowManager
):
await flow_manager.set_node("quote_results", create_quote_results_node(result))


async def handle_end_quote(_: Dict, result: FlowResult, flow_manager: FlowManager):
await flow_manager.set_node("end", create_end_node())
result = {"status": "completed"}
next_node = create_end_node()
return result, next_node


# Node configurations
def create_initial_node() -> NodeConfig:
"""Create the initial node asking for age."""
return {
"name": "initial",
"role_messages": [
{
"role": "system",
Expand Down Expand Up @@ -221,7 +215,6 @@ def create_initial_node() -> NodeConfig:
"properties": {"age": {"type": "integer"}},
"required": ["age"],
},
"transition_callback": handle_age_collection,
}
],
}
Expand All @@ -230,6 +223,7 @@ def create_initial_node() -> NodeConfig:
def create_marital_status_node() -> NodeConfig:
"""Create node for collecting marital status."""
return {
"name": "marital_status",
"task_messages": [
{
"role": "user",
Expand All @@ -253,7 +247,6 @@ def create_marital_status_node() -> NodeConfig:
},
"required": ["marital_status"],
},
"transition_callback": handle_marital_status_collection,
}
],
}
Expand All @@ -262,6 +255,7 @@ def create_marital_status_node() -> NodeConfig:
def create_quote_calculation_node(age: int, marital_status: str) -> NodeConfig:
"""Create node for calculating initial quote."""
return {
"name": "quote_calculation",
"task_messages": [
{
"role": "user",
Expand Down Expand Up @@ -290,7 +284,6 @@ def create_quote_calculation_node(age: int, marital_status: str) -> NodeConfig:
},
"required": ["age", "marital_status"],
},
"transition_callback": handle_quote_calculation,
}
],
}
Expand All @@ -301,6 +294,7 @@ def create_quote_results_node(
) -> NodeConfig:
"""Create node for showing quote and adjustment options."""
return {
"name": "quote_results",
"task_messages": [
{
"role": "user",
Expand Down Expand Up @@ -341,7 +335,6 @@ def create_quote_results_node(
"handler": end_quote,
"description": "Complete the quote process",
"input_schema": {"type": "object", "properties": {}},
"transition_callback": handle_end_quote,
},
],
}
Expand All @@ -350,6 +343,7 @@ def create_quote_results_node(
def create_end_node() -> NodeConfig:
"""Create the final node."""
return {
"name": "end",
"task_messages": [
{
"role": "user",
Expand Down Expand Up @@ -421,9 +415,7 @@ async def main():
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Initialize flow
await flow_manager.initialize()
# Set initial node
await flow_manager.set_node("initial", create_initial_node())
await flow_manager.initialize(create_initial_node())

# Run the pipeline
runner = PipelineRunner()
Expand Down
Loading