Skip to content

Add unit tests and CI/CD tests #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
name: test

on:
workflow_dispatch:
push:
branches:
- main
pull_request:
branches:
- '**'

concurrency:
group: build-test-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
test:
name: 'Unit Tests'
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4

- name: Set up Python
id: setup_python
uses: actions/setup-python@v4
with:
python-version: '3.10'

- name: Cache virtual environment
uses: actions/cache@v3
with:
key: venv-${{ runner.os }}-${{ steps.setup_python.outputs.python-version}}-${{ hashFiles('dev-requirements.txt') }}-${{ hashFiles('test-requirements.txt') }}
path: .venv

- name: Setup virtual environment
run: |
python -m venv .venv

- name: Install dependencies
run: |
source .venv/bin/activate
python -m pip install --upgrade pip
pip install -r dev-requirements.txt -r test-requirements.txt
pip install -e .

- name: Test with pytest
run: |
source .venv/bin/activate
pytest tests/ --cov=pipecat_flows --cov-report=xml

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
fail_ci_if_error: true
7 changes: 7 additions & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
build~=1.2.1
pip-tools~=7.4.1
pytest~=8.3.2
pytest-asyncio~=0.23.5
pytest-cov~=4.1.0
ruff~=0.6.7
setuptools~=72.2.0
9 changes: 7 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[build-system]
requires = ["setuptools>=64"] # Removed setuptools_scm
requires = ["setuptools>=64"]
build-backend = "setuptools.build_meta"

[project]
Expand All @@ -9,7 +9,7 @@ description = "Conversation Flow management for Pipecat AI applications"
license = { text = "BSD 2-Clause License" }
readme = "README.md"
requires-python = ">=3.10"
keywords = ["pipecat", " conversation", "flows", "state machine", "ai", "llm"]
keywords = ["pipecat", "conversation", "flows", "state machine", "ai", "llm"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
Expand All @@ -27,5 +27,10 @@ dependencies = [
Source = "https://github.com/pipecat-ai/pipecat-flows"
Website = "https://www.pipecat.ai"

[tool.pytest.ini_options]
pythonpath = ["src"]
testpaths = ["tests"]
asyncio_mode = "auto"

[tool.ruff]
line-length = 100
24 changes: 18 additions & 6 deletions src/pipecat_flows/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ def __init__(self, flow_config: FlowConfig, llm):
Raises:
ValueError: If required configuration keys are missing
"""
if "initial_node" not in flow_config:
raise ValueError("Flow config must specify 'initial_node'")
if "nodes" not in flow_config:
raise ValueError("Flow config must specify 'nodes'")

self.nodes: Dict[str, NodeConfig] = {}
self.current_node: str = flow_config["initial_node"]
self.adapter = create_adapter(llm)
Expand All @@ -48,13 +53,20 @@ def _load_config(self, config: FlowConfig):
config: Dictionary containing the flow configuration

Raises:
ValueError: If required configuration keys are missing
ValueError: If required configuration keys are missing or invalid
"""
if "initial_node" not in config:
raise ValueError("Flow config must specify 'initial_node'")
if "nodes" not in config:
raise ValueError("Flow config must specify 'nodes'")

initial_node = config["initial_node"]
if initial_node not in config["nodes"]:
raise ValueError(f"Initial node '{initial_node}' not found in nodes")

# Validate node structure
for node_id, node in config["nodes"].items():
if "messages" not in node:
raise ValueError(f"Node '{node_id}' missing required 'messages' field")
if "functions" not in node:
raise ValueError(f"Node '{node_id}' missing required 'functions' field")

# Load the nodes
self.nodes = config["nodes"]

def get_current_messages(self) -> List[dict]:
Expand Down
5 changes: 5 additions & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pipecat-ai>=0.0.49
loguru~=0.7.2
anthropic~=0.30.0
google-generativeai~=0.7.2
openai~=1.37.2
Empty file added tests/__init__.py
Empty file.
179 changes: 179 additions & 0 deletions tests/test_actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import unittest
from unittest.mock import AsyncMock, patch

from pipecat.frames.frames import EndFrame, TTSSpeakFrame

from pipecat_flows.actions import ActionManager
from pipecat_flows.exceptions import ActionError


class TestActionManager(unittest.IsolatedAsyncioTestCase):
def setUp(self):
"""
Set up test fixtures before each test.

Creates:
- Mock PipelineTask for frame queueing
- Mock TTS service for speech synthesis
- ActionManager instance with mocked dependencies
"""
self.mock_task = AsyncMock()
self.mock_task.queue_frame = AsyncMock()

self.mock_tts = AsyncMock()
self.mock_tts.say = AsyncMock()

self.action_manager = ActionManager(self.mock_task, self.mock_tts)

async def test_initialization(self):
"""Test ActionManager initialization and default handlers"""
# Verify built-in action handlers are registered
self.assertIn("tts_say", self.action_manager.action_handlers)
self.assertIn("end_conversation", self.action_manager.action_handlers)

# Test initialization without TTS service
action_manager_no_tts = ActionManager(self.mock_task, None)
self.assertIsNone(action_manager_no_tts.tts)

async def test_tts_action(self):
"""Test basic TTS action execution"""
action = {"type": "tts_say", "text": "Hello"}
await self.action_manager.execute_actions([action])

# Verify TTS service was called with correct text
self.mock_tts.say.assert_called_once_with("Hello")

@patch("loguru.logger.error")
async def test_tts_action_no_text(self, mock_logger):
"""Test TTS action with missing text field"""
action = {"type": "tts_say"} # Missing text field

# The implementation logs error but doesn't raise
await self.action_manager.execute_actions([action])

# Verify error was logged
mock_logger.assert_called_with("TTS action missing 'text' field")

# Verify TTS service was not called
self.mock_tts.say.assert_not_called()

@patch("loguru.logger.warning")
async def test_tts_action_no_service(self, mock_logger):
"""Test TTS action when no TTS service is provided"""
action_manager = ActionManager(self.mock_task, None)
action = {"type": "tts_say", "text": "Hello"}

# Should log warning but not raise error
await action_manager.execute_actions([action])

# Verify warning was logged
mock_logger.assert_called_with("TTS action called but no TTS service provided")

# Verify no frames were queued
self.mock_task.queue_frame.assert_not_called()

async def test_end_conversation_action(self):
"""Test basic end conversation action"""
action = {"type": "end_conversation"}
await self.action_manager.execute_actions([action])

# Verify EndFrame was queued
self.mock_task.queue_frame.assert_called_once()
frame = self.mock_task.queue_frame.call_args[0][0]
self.assertIsInstance(frame, EndFrame)

async def test_end_conversation_with_goodbye(self):
"""Test end conversation action with goodbye message"""
action = {"type": "end_conversation", "text": "Goodbye!"}
await self.action_manager.execute_actions([action])

# Verify both frames were queued in correct order
self.assertEqual(self.mock_task.queue_frame.call_count, 2)

# Verify TTSSpeakFrame
first_frame = self.mock_task.queue_frame.call_args_list[0][0][0]
self.assertIsInstance(first_frame, TTSSpeakFrame)
self.assertEqual(first_frame.text, "Goodbye!")

# Verify EndFrame
second_frame = self.mock_task.queue_frame.call_args_list[1][0][0]
self.assertIsInstance(second_frame, EndFrame)

async def test_custom_action(self):
"""Test registering and executing custom actions"""
mock_handler = AsyncMock()
self.action_manager._register_action("custom", mock_handler)

# Verify handler was registered
self.assertIn("custom", self.action_manager.action_handlers)

# Execute custom action
action = {"type": "custom", "data": "test"}
await self.action_manager.execute_actions([action])

# Verify handler was called with correct data
mock_handler.assert_called_once_with(action)

async def test_invalid_action(self):
"""Test handling invalid actions"""
# Test missing type
with self.assertRaises(ActionError) as context:
await self.action_manager.execute_actions([{}])
self.assertIn("missing required 'type' field", str(context.exception))

# Test unknown action type
with self.assertRaises(ActionError) as context:
await self.action_manager.execute_actions([{"type": "invalid"}])
self.assertIn("No handler registered", str(context.exception))

async def test_multiple_actions(self):
"""Test executing multiple actions in sequence"""
actions = [
{"type": "tts_say", "text": "First"},
{"type": "tts_say", "text": "Second"},
]
await self.action_manager.execute_actions(actions)

# Verify TTS was called twice in correct order
self.assertEqual(self.mock_tts.say.call_count, 2)
expected_calls = [unittest.mock.call("First"), unittest.mock.call("Second")]
self.assertEqual(self.mock_tts.say.call_args_list, expected_calls)

def test_register_invalid_handler(self):
"""Test registering invalid action handlers"""
# Test non-callable handler
with self.assertRaises(ValueError) as context:
self.action_manager._register_action("invalid", "not_callable")
self.assertIn("must be callable", str(context.exception))

# Test None handler
with self.assertRaises(ValueError) as context:
self.action_manager._register_action("invalid", None)
self.assertIn("must be callable", str(context.exception))

async def test_none_or_empty_actions(self):
"""Test handling None or empty action lists"""
# Test None actions
await self.action_manager.execute_actions(None)
self.mock_task.queue_frame.assert_not_called()
self.mock_tts.say.assert_not_called()

# Test empty list
await self.action_manager.execute_actions([])
self.mock_task.queue_frame.assert_not_called()
self.mock_tts.say.assert_not_called()

@patch("loguru.logger.error")
async def test_action_error_handling(self, mock_logger):
"""Test error handling during action execution"""
# Configure TTS mock to raise an error
self.mock_tts.say.side_effect = Exception("TTS error")

action = {"type": "tts_say", "text": "Hello"}
await self.action_manager.execute_actions([action])

# Verify error was logged
mock_logger.assert_called_with("TTS error: TTS error")

# Verify action was still marked as executed (doesn't raise)
self.mock_tts.say.assert_called_once()
Loading
Loading