A streaming speech-to-speech AI Agent server for IoT devices using MQTT protocol. This server enables IoT devices to send raw PCM16 audio streams and receive AI-generated audio responses in real-time.
Optimized for embedded devices with minimal processing overhead.
- Streaming Speech-to-Speech: Direct audio-to-audio communication using OpenAI's Realtime API
- MQTT Protocol: Lightweight messaging for IoT devices with raw audio chunks
- Simplified Message Format: Minimal JSON schema optimized for embedded devices
- Raw PCM16 Audio: No base64 encoding required on embedded devices
- Modular AI Services: Easy-to-swap AI service providers (currently supports OpenAI Realtime API)
- Session Management: Handles multiple concurrent device sessions
- Health Monitoring: Built-in health checks and system monitoring
- Configurable: Environment-based configuration for easy deployment
- Production Ready: Async architecture with proper error handling and logging
IoT Device β MQTT Broker β AI Agent Server β OpenAI Realtime API
β β
βββββββββββββ Audio Response βββββββββββββββββββ
Simplified Audio Flow:
Device PCM16 β Server β OpenAI Realtime API β Server β Device PCM16
The server uses an abstract AI service interface, making it easy to swap between different providers:
- Primary: OpenAI Realtime API (direct speech-to-speech, ~300-500ms latency)
- Future: Modular pipeline (Soniox STT + DeepSeek LLM + OpenAI TTS)
- Python 3.11+
- OpenAI API key with Realtime API access
- MQTT broker (e.g., EMQX, Mosquitto)
- Audio format: Raw PCM16, 24kHz, mono, ~8KB chunks
- Embedded devices: No audio conversion libraries required
git clone <repository-url>
cd hw-mqtt-srv
# Using UV (recommended)
uv sync
# Run once to create example configuration
python main.py
# Copy and edit the configuration
cp .env.example .env
# Edit .env with your actual values
Edit .env
file with your settings:
# MQTT Broker Settings
MQTT_HOST=broker.emqx.io
MQTT_PORT=1883
MQTT_USERNAME=
MQTT_PASSWORD=
MQTT_CLIENT_ID=mqtt-ai-server
MQTT_USE_TLS=false
# OpenAI Configuration
OPENAI_API_KEY=your_openai_api_key_here
OPENAI_MODEL=gpt-4o-realtime-preview
OPENAI_VOICE=alloy
OPENAI_INSTRUCTIONS=You are a helpful AI assistant responding to voice commands from IoT devices.
# Server Settings
MAX_CONCURRENT_SESSIONS=50
SESSION_TIMEOUT_SECONDS=300
LOG_LEVEL=INFO
python main.py
python tests/simple_client.py
Simplified for embedded devices - 60% smaller payloads!
{
"message_id": "uuid-1234",
"device_id": "iot-device-001",
"timestamp": 1704067200,
"message_type": "audio_request",
"session_id": "session-567",
"audio_data": "raw_pcm16_bytes"
}
{
"message_id": "uuid-5678",
"device_id": "iot-device-001",
"timestamp": 1704067202,
"message_type": "audio_response",
"session_id": "session-567",
"audio_data": "raw_pcm16_bytes"
}
- Request:
iot/{device_id}/audio_request
- Response:
iot/{device_id}/audio_response
- Health:
iot/server/health
- Format: Raw PCM16 audio data
- Sample Rate: 24kHz
- Channels: Mono (1 channel)
- Bit Depth: 16-bit
- Chunk Size: ~8KB for optimal performance
// Capture audio from microphone as PCM16
uint8_t audio_buffer[8192];
capture_audio_pcm16(audio_buffer, sizeof(audio_buffer));
// Send directly via MQTT - no base64 encoding needed
mqtt_publish("iot/device001/audio_request", audio_buffer, sizeof(audio_buffer));
- No Base64 encoding/decoding required
- No audio format conversion needed
- Minimal memory footprint
- 60% smaller message payloads
- Faster processing due to direct PCM16 handling
MQTT_HOST
: MQTT broker hostnameMQTT_PORT
: MQTT broker port (default: 1883)MQTT_USERNAME/PASSWORD
: Authentication credentialsMQTT_USE_TLS
: Enable TLS encryption
OPENAI_API_KEY
: Your OpenAI API key (required)OPENAI_MODEL
: Model to use (default: gpt-4o-realtime-preview)OPENAI_VOICE
: Voice to use (alloy, echo, fable, onyx, nova, shimmer)OPENAI_INSTRUCTIONS
: Default system instructions
MAX_CONCURRENT_SESSIONS
: Maximum concurrent device sessionsSESSION_TIMEOUT_SECONDS
: Session timeout durationLOG_LEVEL
: Logging level (DEBUG, INFO, WARNING, ERROR)
The server uses an abstract interface for AI services. To add a new provider:
- Implement the
AIServiceInterface
:
from src.ai_services import AIServiceInterface, AudioRequest, AudioResponse
class MyAIService(AIServiceInterface):
async def process_audio_stream(self, request: AudioRequest) -> AsyncIterator[AudioResponse]:
# Your implementation here
pass
async def health_check(self) -> bool:
# Health check implementation
pass
- Update the main.py to use your service:
# Replace OpenAIRealtimeService with your implementation
ai_service = MyAIService(config)
# Install development dependencies
uv sync --group dev
# Run tests
pytest
# Run with coverage
pytest --cov=src
# Format code
black src/ tests/
isort src/ tests/
# Type checking
mypy src/
Mode | End-to-End Latency | Memory Usage | Use Case |
---|---|---|---|
Simplified PCM16 | 300-500ms | ~50% less | Embedded devices |
Legacy MP3 | 600-1000ms | Higher | Legacy systems |
-
Connection Refused
- Check MQTT broker is running and accessible
- Verify MQTT_HOST and MQTT_PORT settings
-
OpenAI API Errors
- Ensure OPENAI_API_KEY is set correctly
- Check you have access to the Realtime API (currently in beta)
-
High Latency
- Verify network connection to OpenAI services
- Check server resources and concurrent session limits
-
Audio Issues
- Ensure audio is in PCM16 format, 24kHz, mono
- Keep chunks around 8KB for optimal performance
- Verify audio data is not corrupted during transmission
Logs are written to both console and logs/mqtt-ai-server.log
with rotation.
This project is licensed under the MIT License - see the LICENSE file for details.
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Submit a pull request
For issues and questions:
- Check the troubleshooting section
- Review existing GitHub issues
- Create a new issue with detailed information
- Alternative AI service providers (DeepSeek, Anthropic)
- WebRTC support for lower latency
- Embedded device SDKs (ESP32, Arduino)
- Kubernetes deployment manifests
- Grafana dashboards for monitoring
- Load testing tools
- Multi-language documentation