A high-performance, generalized process pooler and session manager for external language integrations in Elixir
Snakepit is a battle-tested Elixir library that provides a robust pooling system for managing external processes (Python, Node.js, Ruby, R, etc.). Born from the need for reliable ML/AI integrations, it offers:
- Lightning-fast concurrent initialization - 1000x faster than sequential approaches
- Session-based execution with automatic worker affinity
- gRPC-based communication - Modern HTTP/2 protocol with streaming support
- Native streaming support - Real-time progress updates and progressive results (gRPC)
- Adapter pattern for any external language/runtime
- Built on OTP primitives - DynamicSupervisor, Registry, GenServer
- Production-ready with telemetry, health checks, and graceful shutdowns
- What's New in v0.4
- Quick Start
- Installation
- Core Concepts
- Configuration
- Usage Examples
- gRPC Communication
- Python Bridges
- Built-in Adapters
- Creating Custom Adapters
- Session Management
- Monitoring & Telemetry
- Architecture Deep Dive
- Performance
- Troubleshooting
- Contributing
- New
process_text
tool - Text processing with upper, lower, reverse, length operations - New
get_stats
tool - Real-time adapter and system monitoring with memory/CPU usage - Fixed gRPC tool registration - Resolved async/sync issues with UnaryUnaryCall objects
- Automatic session initialization - Sessions created automatically when Python tools register
- Remote tool dispatch - Complete bidirectional communication between Elixir and Python
- Missing tool recovery - Added adapter_info, echo, process_text, get_stats to ShowcaseAdapter
- Async/sync compatibility - Fixed gRPC stub handling with proper response processing
- Enhanced error handling - Better diagnostics for tool registration failures
- Persistent process tracking with DETS storage survives BEAM crashes
- Automatic orphan cleanup - no more zombie Python processes
- Pre-registration pattern - Prevents orphans even during startup crashes
- Immediate DETS persistence - No data loss on abrupt termination
- Zero-configuration reliability - works out of the box
- Production-ready - handles VM crashes, OOM kills, and power failures
- See Process Management Documentation for details
- Real-time progress updates for long-running operations
- HTTP/2 multiplexing for concurrent requests
- Cancellable operations with graceful stream termination
- Built-in health checks and rich error handling
- Automatic binary encoding for tensors and embeddings > 10KB
- 5-10x faster than JSON for large numerical arrays
- Zero configuration - works automatically
- Backward compatible - smaller data still uses JSON
- Modern architecture with protocol buffers
- Efficient binary transfers with protocol buffers
- HTTP/2 multiplexing for concurrent operations
- Native binary data handling perfect for ML models and images
- 18-36% smaller message sizes for improved performance
- Complete example app at
examples/snakepit_showcase
- Demonstrates all features including binary serialization
- Performance benchmarks showing 5-10x speedup
- Ready-to-run demos for all Snakepit capabilities
- Production-ready packaging with pip install support
- Enhanced error handling and robust shutdown management
- Console script integration for deployment flexibility
- Type checking support with proper py.typed markers
- Deprecated V1 Python bridge in favor of V2 architecture
- Updated demo implementations using latest best practices
- Comprehensive documentation for all bridge implementations
- Backward compatibility maintained for existing integrations
- Cross-language function execution - Call Python from Elixir and vice versa
- Transparent tool proxying - Remote functions appear as local functions
- Session-scoped isolation - Tools are isolated by session for multi-tenancy
- Dynamic discovery - Automatic tool discovery and registration
- See Bidirectional Tool Bridge Documentation for details
# In your mix.exs
def deps do
[
{:snakepit, "~> 0.4.1"}
]
end
# Configure with gRPC adapter
Application.put_env(:snakepit, :pooling_enabled, true)
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)
Application.put_env(:snakepit, :grpc_config, %{
base_port: 50051,
port_range: 100
})
Application.put_env(:snakepit, :pool_config, %{pool_size: 4})
{:ok, _} = Application.ensure_all_started(:snakepit)
# Execute commands with gRPC
{:ok, result} = Snakepit.execute("ping", %{test: true})
{:ok, result} = Snakepit.execute("compute", %{operation: "add", a: 5, b: 3})
# Session-based execution (maintains state)
{:ok, result} = Snakepit.execute_in_session("user_123", "echo", %{message: "hello"})
# Streaming operations for real-time updates
Snakepit.execute_stream("batch_process", %{items: [1, 2, 3]}, fn chunk ->
IO.puts("Progress: #{chunk["progress"]}%")
end)
def deps do
[
{:snakepit, "~> 0.4.1"}
]
end
def deps do
[
{:snakepit, github: "nshkrdotcom/snakepit"}
]
end
- Elixir 1.18+
- Erlang/OTP 27+
- External runtime (Python 3.8+, Node.js 16+, etc.) depending on adapter
For Python/gRPC integration (recommended):
# Install Python dependencies
pip install grpcio grpcio-tools protobuf
# Or use the provided requirements file
pip install -r priv/python/requirements.txt
# Generate Python gRPC code
make proto-python
# This creates the necessary gRPC stubs in priv/python/
Add to your config/config.exs
:
config :snakepit,
# Enable pooling (recommended for production)
pooling_enabled: true,
# Choose your adapter
adapter_module: Snakepit.Adapters.GRPCPython,
# Pool configuration
pool_config: %{
pool_size: System.schedulers_online() * 2,
startup_timeout: 10_000,
max_queue_size: 1000
},
# gRPC configuration
grpc_config: %{
base_port: 50051,
port_range: 100,
connect_timeout: 5_000
},
# Session configuration
session_config: %{
ttl: 3600, # 1 hour default
cleanup_interval: 60_000 # 1 minute
}
In your application supervisor:
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
# Other children...
{Snakepit.Application, []}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
Or start manually:
{:ok, _} = Application.ensure_all_started(:snakepit)
For custom Python functionality:
# priv/python/my_adapter.py
from snakepit_bridge.adapters.base import BaseAdapter
class MyAdapter(BaseAdapter):
def __init__(self):
super().__init__()
# Initialize your libraries here
async def execute_my_command(self, args):
# Your custom logic
result = do_something(args)
return {"status": "success", "result": result}
Configure it:
# config/config.exs
config :snakepit,
adapter_module: Snakepit.Adapters.GRPCPython,
python_adapter: "my_adapter:MyAdapter"
# In IEx
iex> Snakepit.execute("ping", %{})
{:ok, %{"status" => "pong", "timestamp" => 1234567890}}
Adapters define how Snakepit communicates with external processes. They specify:
- The runtime executable (python3, node, ruby, etc.)
- The bridge script to execute
- Supported commands and validation
- Request/response transformations
Each worker is a GenServer that:
- Owns one external process via Erlang Port
- Handles request/response communication
- Manages health checks and metrics
- Auto-restarts on crashes
The pool manager:
- Starts workers concurrently on initialization
- Routes requests to available workers
- Handles queueing when all workers are busy
- Supports session affinity for stateful operations
Sessions provide:
- State persistence across requests
- Worker affinity (same session prefers same worker)
- TTL-based expiration
- Centralized storage in ETS
# config/config.exs
config :snakepit,
pooling_enabled: true,
adapter_module: Snakepit.Adapters.GRPCPython, # gRPC-based communication
grpc_config: %{
base_port: 50051, # Starting port for gRPC servers
port_range: 100 # Port range for worker allocation
},
pool_config: %{
pool_size: 8 # Default: System.schedulers_online() * 2
}
# gRPC-specific configuration
config :snakepit,
grpc_config: %{
base_port: 50051, # Starting port for gRPC servers
port_range: 100, # Port range for worker allocation
connect_timeout: 5000, # Connection timeout in ms
request_timeout: 30000 # Default request timeout in ms
}
The gRPC adapter automatically assigns unique ports to each worker within the specified range, ensuring isolation and parallel operation.
config :snakepit,
# Pool settings
pooling_enabled: true,
pool_config: %{
pool_size: 16
},
# Adapter
adapter_module: MyApp.CustomAdapter,
# Timeouts (milliseconds)
pool_startup_timeout: 10_000, # Max time for worker initialization
pool_queue_timeout: 5_000, # Max time in request queue
worker_init_timeout: 20_000, # Max time for worker to respond to init
worker_health_check_interval: 30_000, # Health check frequency
worker_shutdown_grace_period: 2_000, # Grace period for shutdown
# Cleanup settings
cleanup_retry_interval: 100, # Retry interval for cleanup
cleanup_max_retries: 10, # Max cleanup retries
# Queue management
pool_max_queue_size: 1000 # Max queued requests before rejection
# Override configuration at runtime
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericJavaScript)
Application.stop(:snakepit)
Application.start(:snakepit)
# Basic ping/pong
{:ok, result} = Snakepit.execute("ping", %{})
# => %{"status" => "pong", "timestamp" => 1234567890}
# Computation
{:ok, result} = Snakepit.execute("compute", %{
operation: "multiply",
a: 7,
b: 6
})
# => %{"result" => 42}
# With error handling
case Snakepit.execute("risky_operation", %{threshold: 0.5}) do
{:ok, result} ->
IO.puts("Success: #{inspect(result)}")
{:error, :worker_timeout} ->
IO.puts("Operation timed out")
{:error, {:worker_error, msg}} ->
IO.puts("Worker error: #{msg}")
{:error, reason} ->
IO.puts("Failed: #{inspect(reason)}")
end
For short-lived scripts, Mix tasks, or demos that need to execute and exit cleanly, use run_as_script/2
:
# In a Mix task or script
Snakepit.run_as_script(fn ->
# Your code here - all workers will be properly cleaned up on exit
{:ok, result} = Snakepit.execute("process_data", %{data: large_dataset})
IO.inspect(result)
end)
# With custom timeout for pool initialization
Snakepit.run_as_script(fn ->
results = Enum.map(1..100, fn i ->
{:ok, result} = Snakepit.execute("compute", %{value: i})
result
end)
IO.puts("Processed #{length(results)} items")
end, timeout: 30_000)
This ensures:
- The pool waits for all workers to be ready before executing
- All Python/external processes are properly terminated on exit
- No orphaned processes remain after your script completes
# Create a session with variables
session_id = "analysis_#{UUID.generate()}"
# Initialize session with variables
{:ok, _} = Snakepit.Bridge.SessionStore.create_session(session_id)
{:ok, _} = Snakepit.Bridge.SessionStore.register_variable(
session_id,
"temperature",
:float,
0.7,
constraints: %{min: 0.0, max: 1.0}
)
# Execute commands that use session variables
{:ok, result} = Snakepit.execute_in_session(session_id, "generate_text", %{
prompt: "Tell me about Elixir"
})
# Update variables
:ok = Snakepit.Bridge.SessionStore.update_variable(session_id, "temperature", 0.9)
# List all variables
{:ok, vars} = Snakepit.Bridge.SessionStore.list_variables(session_id)
# Cleanup when done
:ok = Snakepit.Bridge.SessionStore.delete_session(session_id)
# Using SessionHelpers for ML program management
alias Snakepit.SessionHelpers
# Create an ML program/model
{:ok, response} = SessionHelpers.execute_program_command(
"ml_session_123",
"create_program",
%{
signature: "question -> answer",
model: "gpt-3.5-turbo",
temperature: 0.7
}
)
program_id = response["program_id"]
# Execute the program multiple times
{:ok, result} = SessionHelpers.execute_program_command(
"ml_session_123",
"execute_program",
%{
program_id: program_id,
input: %{question: "What is the capital of France?"}
}
)
# Configure gRPC adapter for streaming workloads
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)
Application.put_env(:snakepit, :grpc_config, %{
base_port: 50051,
port_range: 100
})
# Process large datasets with streaming
Snakepit.execute_stream("process_dataset", %{
file_path: "/data/large_dataset.csv",
chunk_size: 1000
}, fn chunk ->
if chunk["is_final"] do
IO.puts("Processing complete: #{chunk["total_processed"]} records")
else
IO.puts("Progress: #{chunk["progress"]}% - #{chunk["records_processed"]}/#{chunk["total_records"]}")
end
end)
# ML inference with real-time results
Snakepit.execute_stream("batch_inference", %{
model_path: "/models/resnet50.pkl",
images: ["img1.jpg", "img2.jpg", "img3.jpg"]
}, fn chunk ->
IO.puts("Processed #{chunk["image"]}: #{chunk["prediction"]} (#{chunk["confidence"]}%)")
end)
# Process multiple items in parallel across the pool
items = ["item1", "item2", "item3", "item4", "item5"]
tasks = Enum.map(items, fn item ->
Task.async(fn ->
Snakepit.execute("process_item", %{item: item})
end)
end)
results = Task.await_many(tasks, 30_000)
Snakepit supports modern gRPC-based communication for advanced streaming capabilities, real-time progress updates, and superior performance.
# Step 1: Install gRPC dependencies
make install-grpc
# Step 2: Generate protocol buffer code
make proto-python
# Step 3: Test the upgrade
elixir examples/grpc_non_streaming_demo.exs
# Replace your adapter configuration with this:
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)
Application.put_env(:snakepit, :grpc_config, %{
base_port: 50051,
port_range: 100
})
# β
ALL your existing API calls work EXACTLY the same
{:ok, result} = Snakepit.execute("ping", %{})
{:ok, result} = Snakepit.execute("compute", %{operation: "add", a: 5, b: 3})
# π PLUS you get new streaming capabilities
Snakepit.execute_stream("batch_inference", %{
batch_items: ["image1.jpg", "image2.jpg", "image3.jpg"]
}, fn chunk ->
IO.puts("Processed: #{chunk["item"]} - #{chunk["confidence"]}")
end)
Feature | gRPC Non-Streaming | gRPC Streaming |
---|---|---|
Standard API | β Full support | β Full support |
Streaming | β | β Real-time |
HTTP/2 Multiplexing | β | β |
Progress Updates | β | β Live Updates |
Health Checks | β Built-in | β Built-in |
Error Handling | β Rich Status | β Rich Status |
Use this for: Standard request-response operations
# Standard API for quick operations
{:ok, result} = Snakepit.execute("ping", %{})
{:ok, result} = Snakepit.execute("compute", %{operation: "multiply", a: 10, b: 5})
{:ok, result} = Snakepit.execute("info", %{})
# Session support works exactly the same
{:ok, result} = Snakepit.execute_in_session("user_123", "echo", %{message: "hello"})
When to use:
- β You want better performance without changing your code
- β Your operations complete quickly (< 30 seconds)
- β You don't need progress updates
- β Standard request-response pattern
Use this for: Long-running operations with real-time progress updates
# NEW streaming API - get results as they complete
Snakepit.execute_stream("batch_inference", %{
batch_items: ["img1.jpg", "img2.jpg", "img3.jpg"]
}, fn chunk ->
if chunk["is_final"] do
IO.puts("β
All done!")
else
IO.puts("π§ Processed: #{chunk["item"]} - #{chunk["confidence"]}")
end
end)
# Session-based streaming also available
Snakepit.execute_in_session_stream("session_123", "process_large_dataset", %{
file_path: "/data/huge_file.csv"
}, fn chunk ->
IO.puts("π Progress: #{chunk["progress_percent"]}%")
end)
When to use:
- β Long-running operations (ML training, data processing)
- β You want real-time progress updates
- β Processing large datasets or batches
- β Better user experience with live feedback
# Install gRPC dependencies
make install-grpc
# Generate protocol buffer code
make proto-python
# Verify with non-streaming demo (same as your existing API)
elixir examples/grpc_non_streaming_demo.exs
# Try new streaming capabilities
elixir examples/grpc_streaming_demo.exs
# Configure gRPC
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)
Application.put_env(:snakepit, :grpc_config, %{base_port: 50051, port_range: 100})
# All your existing code works unchanged
{:ok, result} = Snakepit.execute("ping", %{})
{:ok, result} = Snakepit.execute("compute", %{operation: "add", a: 5, b: 3})
{:ok, result} = Snakepit.execute("info", %{})
# Sessions work exactly the same
{:ok, result} = Snakepit.execute_in_session("session_123", "echo", %{message: "hello"})
# Try it: elixir examples/grpc_non_streaming_demo.exs
ML Batch Inference with Real-time Progress:
# Process multiple items, get results as each completes
Snakepit.execute_stream("batch_inference", %{
model_path: "/models/resnet50.pkl",
batch_items: ["img1.jpg", "img2.jpg", "img3.jpg"]
}, fn chunk ->
if chunk["is_final"] do
IO.puts("β
All #{chunk["total_processed"]} items complete!")
else
IO.puts("π§ #{chunk["item"]}: #{chunk["prediction"]} (#{chunk["confidence"]})")
end
end)
Large Dataset Processing with Progress:
# Process huge datasets, see progress in real-time
Snakepit.execute_stream("process_large_dataset", %{
file_path: "/data/huge_dataset.csv",
chunk_size: 5000
}, fn chunk ->
if chunk["is_final"] do
IO.puts("π Processing complete: #{chunk["final_stats"]}")
else
progress = chunk["progress_percent"]
IO.puts("π Progress: #{progress}% (#{chunk["processed_rows"]}/#{chunk["total_rows"]})")
end
end)
Session-based Streaming:
# Streaming with session state
session_id = "ml_training_#{user_id}"
Snakepit.execute_in_session_stream(session_id, "distributed_training", %{
model_config: training_config,
dataset_path: "/data/training_set"
}, fn chunk ->
if chunk["is_final"] do
model_path = chunk["final_model_path"]
IO.puts("π― Training complete! Model saved: #{model_path}")
else
epoch = chunk["epoch"]
loss = chunk["train_loss"]
acc = chunk["val_acc"]
IO.puts("π Epoch #{epoch}: loss=#{loss}, acc=#{acc}")
end
end)
# Try it: elixir examples/grpc_streaming_demo.exs
gRPC Non-Streaming:
- β Better performance: HTTP/2 multiplexing, protocol buffers
- β Built-in health checks: Automatic worker monitoring
- β Rich error handling: Detailed gRPC status codes
- β Zero code changes: Drop-in replacement
gRPC Streaming vs Traditional (All Protocols):
- β Progressive results: Get updates as work completes
- β Constant memory: Process unlimited data without memory growth
- β Real-time feedback: Users see progress immediately
- β Cancellable operations: Stop long-running tasks mid-stream
- β Better UX: No more "is it still working?" uncertainty
Traditional (blocking): Submit β Wait 10 minutes β Get all results
gRPC Non-streaming: Submit β Get result faster (better protocol)
gRPC Streaming: Submit β Get result 1 β Get result 2 β ...
Memory usage: Fixed vs Grows with result size vs Constant
User experience: "Wait..." vs "Wait..." vs Real-time updates
Cancellation: Kill process vs Kill process vs Graceful stream close
Choose your mode based on your needs:
Your Situation | Recommended Mode | Why |
---|---|---|
Quick operations (< 30s) | gRPC Non-Streaming | Low latency, simple API |
Want better performance, same API | gRPC Non-Streaming | Drop-in upgrade |
Need progress updates | gRPC Streaming | Real-time feedback |
Long-running ML tasks | gRPC Streaming | See progress, cancel if needed |
Quick operations (< 30s) | gRPC Non-Streaming | No streaming overhead |
Large dataset processing | gRPC Streaming | Memory efficient |
Migration path:
Elixir:
# mix.exs
def deps do
[
{:grpc, "~> 0.8"},
{:protobuf, "~> 0.12"},
# ... other deps
]
end
Python:
# Install with gRPC support
pip install 'snakepit-bridge[grpc]'
# Or manually
pip install grpcio protobuf grpcio-tools
Command | Description | Use Case |
---|---|---|
ping_stream |
Heartbeat stream | Testing, monitoring |
batch_inference |
ML model inference | Computer vision, NLP |
process_large_dataset |
Data processing | ETL, analytics |
tail_and_analyze |
Log analysis | Real-time monitoring |
distributed_training |
ML training | Neural networks |
For comprehensive gRPC documentation, see README_GRPC.md.
Snakepit automatically optimizes large data transfers using binary serialization:
# Small tensor (<10KB) - uses JSON automatically
{:ok, result} = Snakepit.execute("create_tensor", %{
shape: [10, 10], # 100 elements = 800 bytes
name: "small_tensor"
})
# Large tensor (>10KB) - uses binary automatically
{:ok, result} = Snakepit.execute("create_tensor", %{
shape: [100, 100], # 10,000 elements = 80KB
name: "large_tensor"
})
# Performance: 5-10x faster for large data!
# Embeddings - automatic binary for large batches
{:ok, embeddings} = Snakepit.execute("generate_embeddings", %{
texts: ["sentence 1", "sentence 2", ...], # 100+ sentences
model: "sentence-transformers/all-MiniLM-L6-v2",
dimensions: 384
})
# Image processing - binary for pixel data
{:ok, result} = Snakepit.execute("process_images", %{
images: ["image1.jpg", "image2.jpg"],
return_tensors: true # Returns large tensors via binary
})
Data Size | JSON Time | Binary Time | Speedup |
---|---|---|---|
800B | 12ms | 15ms | 0.8x |
20KB | 45ms | 18ms | 2.5x |
80KB | 156ms | 22ms | 7.1x |
320KB | 642ms | 38ms | 16.9x |
- Automatic Detection: Data size calculated on serialization
- Threshold: 10KB (10,240 bytes)
- Formats:
- Small data: JSON (human-readable, debuggable)
- Large data: Binary (Pickle on Python, ETF on Elixir)
- Zero Configuration: Works out of the box
For detailed binary serialization documentation, see priv/python/BINARY_SERIALIZATION.md.
Explore all Snakepit features with our comprehensive showcase application:
# Navigate to showcase
cd examples/snakepit_showcase
# Install and run
mix setup
mix demo.all
# Or interactive mode
mix demo.interactive
- Basic Operations - Health checks, error handling
- Session Management - Stateful operations, worker affinity
- Streaming Operations - Real-time progress, chunked data
- Concurrent Processing - Parallel execution, pool management
- Variable Management - Type system, constraints, validation
- Binary Serialization - Performance benchmarks, large data handling
- ML Workflows - Complete pipelines, DSPy integration
mix run -e "SnakepitShowcase.Demos.BinaryDemo.run()"
Shows:
- Automatic JSON vs binary detection
- Side-by-side performance comparison
- Real-world ML embedding examples
- Memory efficiency metrics
See examples/snakepit_showcase/README.md for full documentation.
For detailed documentation on all Python bridge implementations (V1, V2, Enhanced, gRPC), see the Python Bridges section below.
Snakepit supports transparent cross-language function execution between Elixir and Python:
# Call Python functions from Elixir
{:ok, result} = ToolRegistry.execute_tool(session_id, "python_ml_function", %{data: input})
# Python can call Elixir functions transparently
# result = ctx.call_elixir_tool("parse_json", json_string='{"test": true}')
For comprehensive documentation on the bidirectional tool bridge, see README_BIDIRECTIONAL_TOOL_BRIDGE.md.
# Configure with gRPC for dedicated streaming and advanced features
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)
Application.put_env(:snakepit, :grpc_config, %{base_port: 50051, port_range: 100})
# Dedicated streaming capabilities
{:ok, _} = Snakepit.execute_stream("batch_inference", %{
batch_items: ["img1.jpg", "img2.jpg", "img3.jpg"]
}, fn chunk ->
IO.puts("Processed: #{chunk["item"]} - #{chunk["confidence"]}")
end)
- β Native streaming - Progressive results and real-time updates
- β HTTP/2 multiplexing - Multiple concurrent requests per connection
- β Built-in health checks - Automatic worker health monitoring
- β Rich error handling - gRPC status codes with detailed context
- β Protocol buffers - Efficient binary serialization
- β Cancellable operations - Stop long-running tasks gracefully
- β Custom adapter support - Use third-party Python adapters via pool configuration
The gRPC adapter now supports custom Python adapters through pool configuration:
# Configure with a custom Python adapter (e.g., DSPy integration)
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)
Application.put_env(:snakepit, :pool_config, %{
pool_size: 4,
adapter_args: ["--adapter", "snakepit_bridge.adapters.dspy_grpc.DSPyGRPCHandler"]
})
# The adapter can provide custom commands beyond the standard set
{:ok, result} = Snakepit.Python.call("dspy.Predict", %{signature: "question -> answer"})
{:ok, result} = Snakepit.Python.call("stored.predictor.__call__", %{question: "What is DSPy?"})
snakepit_bridge.adapters.dspy_grpc.DSPyGRPCHandler
- DSPy integration for declarative language model programming- Supports DSPy modules (Predict, ChainOfThought, ReAct, etc.)
- Python API with
call
,store
,retrieve
commands - Automatic signature parsing and field mapping
- Session management for stateful operations
# Install gRPC dependencies
make install-grpc
# Generate protocol buffer code
make proto-python
# Test with streaming demo
elixir examples/grpc_streaming_demo.exs
# Test with non-streaming demo
elixir examples/grpc_non_streaming_demo.exs
# Configure
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericJavaScript)
# Additional commands
{:ok, _} = Snakepit.execute("random", %{type: "uniform", min: 0, max: 100})
{:ok, _} = Snakepit.execute("compute", %{operation: "sqrt", a: 16})
Here's a real-world example of a data science adapter with session support:
# priv/python/data_science_adapter.py
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from snakepit_bridge.adapters.base import BaseAdapter
from snakepit_bridge.session_context import SessionContext
class DataScienceAdapter(BaseAdapter):
def __init__(self):
super().__init__()
self.models = {} # Store trained models per session
def set_session_context(self, context: SessionContext):
"""Called when a session context is available."""
self.session_context = context
async def execute_load_data(self, args):
"""Load data from CSV and store in session."""
file_path = args.get("file_path")
if not file_path:
raise ValueError("file_path is required")
# Load data
df = pd.read_csv(file_path)
# Store basic info in session variables
if self.session_context:
await self.session_context.register_variable(
"data_shape", "list", list(df.shape)
)
await self.session_context.register_variable(
"columns", "list", df.columns.tolist()
)
return {
"rows": len(df),
"columns": len(df.columns),
"column_names": df.columns.tolist(),
"dtypes": df.dtypes.to_dict()
}
async def execute_preprocess(self, args):
"""Preprocess data with scaling."""
data = args.get("data")
target_column = args.get("target")
# Convert to DataFrame
df = pd.DataFrame(data)
# Separate features and target
X = df.drop(columns=[target_column])
y = df[target_column]
# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Store scaler parameters in session
if self.session_context:
session_id = self.session_context.session_id
self.models[f"{session_id}_scaler"] = scaler
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X_scaled, y, test_size=0.2, random_state=42
)
return {
"train_size": len(X_train),
"test_size": len(X_test),
"feature_means": scaler.mean_.tolist(),
"feature_stds": scaler.scale_.tolist()
}
async def execute_train_model(self, args):
"""Train a model and store it."""
model_type = args.get("model_type", "linear_regression")
hyperparams = args.get("hyperparams", {})
# Import the appropriate model
if model_type == "linear_regression":
from sklearn.linear_model import LinearRegression
model = LinearRegression(**hyperparams)
elif model_type == "random_forest":
from sklearn.ensemble import RandomForestRegressor
model = RandomForestRegressor(**hyperparams)
else:
raise ValueError(f"Unknown model type: {model_type}")
# Train model (assume data is passed or stored)
# ... training logic ...
# Store model in session
if self.session_context:
session_id = self.session_context.session_id
model_id = f"{session_id}_{model_type}"
self.models[model_id] = model
# Store model metadata as variables
await self.session_context.register_variable(
"current_model", "string", model_id
)
return {
"model_id": model_id,
"model_type": model_type,
"training_complete": True
}
# Usage in grpc_server.py or your bridge
adapter = DataScienceAdapter()
For simpler use cases without session management:
# my_simple_adapter.py
from snakepit_bridge import BaseCommandHandler, ProtocolHandler
from snakepit_bridge.core import setup_graceful_shutdown, setup_broken_pipe_suppression
class MySimpleHandler(BaseCommandHandler):
def _register_commands(self):
self.register_command("uppercase", self.handle_uppercase)
self.register_command("word_count", self.handle_word_count)
def handle_uppercase(self, args):
text = args.get("text", "")
return {"result": text.upper()}
def handle_word_count(self, args):
text = args.get("text", "")
words = text.split()
return {
"word_count": len(words),
"char_count": len(text),
"unique_words": len(set(words))
}
def main():
setup_broken_pipe_suppression()
command_handler = MySimpleHandler()
protocol_handler = ProtocolHandler(command_handler)
setup_graceful_shutdown(protocol_handler)
protocol_handler.run()
if __name__ == "__main__":
main()
- β No sys.path manipulation - proper package imports
- β Location independent - works from any directory
- β Production ready - can be packaged and installed
- β Enhanced error handling - robust shutdown and signal management
- β Type checking - full IDE support with proper imports
defmodule MyApp.RubyAdapter do
@behaviour Snakepit.Adapter
@impl true
def executable_path do
System.find_executable("ruby")
end
@impl true
def script_path do
Path.join(:code.priv_dir(:my_app), "ruby/bridge.rb")
end
@impl true
def script_args do
["--mode", "pool-worker"]
end
@impl true
def supported_commands do
["ping", "process_data", "generate_report"]
end
@impl true
def validate_command("process_data", args) do
if Map.has_key?(args, :data) do
:ok
else
{:error, "Missing required field: data"}
end
end
def validate_command("ping", _args), do: :ok
def validate_command(cmd, _args), do: {:error, "Unsupported command: #{cmd}"}
# Optional callbacks
@impl true
def prepare_args("process_data", args) do
# Transform args before sending
Map.update(args, :data, "", &String.trim/1)
end
@impl true
def process_response("generate_report", %{"report" => report} = response) do
# Post-process the response
{:ok, Map.put(response, "processed_at", DateTime.utc_now())}
end
@impl true
def command_timeout("generate_report", _args), do: 120_000 # 2 minutes
def command_timeout(_command, _args), do: 30_000 # Default 30 seconds
end
#!/usr/bin/env ruby
# priv/ruby/bridge.rb
require 'grpc'
require_relative 'snakepit_services_pb'
class BridgeHandler
def initialize
@commands = {
'ping' => method(:handle_ping),
'process_data' => method(:handle_process_data),
'generate_report' => method(:handle_generate_report)
}
end
def run
STDERR.puts "Ruby bridge started"
loop do
# gRPC server handles request/response automatically
end
end
private
def process_command(request)
command = request['command']
args = request['args'] || {}
handler = @commands[command]
if handler
result = handler.call(args)
{
'id' => request['id'],
'success' => true,
'result' => result,
'timestamp' => Time.now.iso8601
}
else
{
'id' => request['id'],
'success' => false,
'error' => "Unknown command: #{command}",
'timestamp' => Time.now.iso8601
}
end
rescue => e
{
'id' => request['id'],
'success' => false,
'error' => e.message,
'timestamp' => Time.now.iso8601
}
end
def handle_ping(args)
{ 'status' => 'ok', 'message' => 'pong' }
end
def handle_process_data(args)
data = args['data'] || ''
{ 'processed' => data.upcase, 'length' => data.length }
end
def handle_generate_report(args)
# Simulate report generation
sleep(1)
{
'report' => {
'title' => args['title'] || 'Report',
'generated_at' => Time.now.iso8601,
'data' => args['data'] || {}
}
}
end
end
# Handle signals gracefully
Signal.trap('TERM') { exit(0) }
Signal.trap('INT') { exit(0) }
# Run the bridge
BridgeHandler.new.run
alias Snakepit.Bridge.SessionStore
# Create a session
{:ok, session} = SessionStore.create_session("session_123", ttl: 7200)
# Store data in session
:ok = SessionStore.store_program("session_123", "prog_1", %{
model: "gpt-4",
temperature: 0.8
})
# Retrieve session data
{:ok, session} = SessionStore.get_session("session_123")
{:ok, program} = SessionStore.get_program("session_123", "prog_1")
# Update session
{:ok, updated} = SessionStore.update_session("session_123", fn session ->
Map.put(session, :last_activity, DateTime.utc_now())
end)
# Check if session exists
true = SessionStore.session_exists?("session_123")
# List all sessions
session_ids = SessionStore.list_sessions()
# Manual cleanup
SessionStore.delete_session("session_123")
# Get session statistics
stats = SessionStore.get_stats()
# Store programs accessible by any worker
:ok = SessionStore.store_global_program("template_1", %{
type: "qa_template",
prompt: "Answer the following question: {question}"
})
# Retrieve from any worker
{:ok, template} = SessionStore.get_global_program("template_1")
# Worker request completed
[:snakepit, :worker, :request]
# Measurements: %{duration: milliseconds}
# Metadata: %{result: :ok | :error}
# Worker initialized
[:snakepit, :worker, :initialized]
# Measurements: %{initialization_time: seconds}
# Metadata: %{worker_id: string}
# In your application startup
:telemetry.attach_many(
"snakepit-metrics",
[
[:snakepit, :worker, :request],
[:snakepit, :worker, :initialized]
],
&MyApp.Metrics.handle_event/4,
%{}
)
defmodule MyApp.Metrics do
require Logger
def handle_event([:snakepit, :worker, :request], measurements, metadata, _config) do
# Log slow requests
if measurements.duration > 5000 do
Logger.warning("Slow request: #{measurements.duration}ms")
end
# Send to StatsD/Prometheus/DataDog
MyApp.Metrics.Client.histogram(
"snakepit.request.duration",
measurements.duration,
tags: ["result:#{metadata.result}"]
)
end
def handle_event([:snakepit, :worker, :initialized], measurements, metadata, _config) do
Logger.info("Worker #{metadata.worker_id} started in #{measurements.initialization_time}s")
end
end
stats = Snakepit.get_stats()
# Returns:
# %{
# workers: 8, # Total workers
# available: 6, # Available workers
# busy: 2, # Busy workers
# requests: 1534, # Total requests
# queued: 0, # Currently queued
# errors: 12, # Total errors
# queue_timeouts: 3, # Queue timeout count
# pool_saturated: 0 # Saturation rejections
# }
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Snakepit Application β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ ββββββββββββββββ βββββββββββββββββ β
β β Pool β β SessionStore β βProcessRegistryβ β
β β Manager β β (ETS) β β (ETS + DETS) β β
β ββββββββ¬βββββββ ββββββββββββββββ βββββββββββββββββ β
β β β
β ββββββββΌββββββββββββββββββββββββββββββββββββββββββββββ
β β WorkerSupervisor (Dynamic) ββ
β ββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββββ
β β β
β ββββββββΌβββββββ ββββββββββββββββ ββββββββββββββββ β
β β Worker β β Worker β β Worker β β
β β Starter β β Starter β β Starter β β
β β(Supervisor) β β(Supervisor) β β(Supervisor) β β
β ββββββββ¬βββββββ βββββββββ¬βββββββ βββββββββ¬βββββββ β
β β β β β
β ββββββββΌβββββββ βββββββββΌβββββββ βββββββββΌβββββββ β
β β Worker β β Worker β β Worker β β
β β (GenServer) β β (GenServer) β β (GenServer) β β
β ββββββββ¬βββββββ βββββββββ¬βββββββ βββββββββ¬βββββββ β
β β β β β
βββββββββββΌββββββββββββββββββΌββββββββββββββββββΌββββββββββ
β β β
βββββββΌβββββββ βββββββΌβββββββ βββββββΌβββββββ
β External β β External β β External β
β Process β β Process β β Process β
β (Python) β β (Node.js) β β (Ruby) β
ββββββββββββββ ββββββββββββββ ββββββββββββββ
- Concurrent Initialization: Workers start in parallel using
Task.async_stream
- Permanent Wrapper Pattern: Worker.Starter supervises Workers for auto-restart
- Centralized State: All session data in ETS, workers are stateless
- Registry-Based: O(1) worker lookups and reverse PID lookups
- gRPC Communication: HTTP/2 protocol with streaming support
- Persistent Process Tracking: ProcessRegistry uses DETS for crash-resistant tracking
-
Startup:
- Pool manager starts
- Concurrently spawns N workers via WorkerSupervisor
- Each worker starts its external process
- Workers send init ping and register when ready
-
Request Flow:
- Client calls
Snakepit.execute/3
- Pool finds available worker (with session affinity if applicable)
- Worker sends request to external process
- External process responds
- Worker returns result to client
- Client calls
-
Crash Recovery:
- Worker crashes β Worker.Starter restarts it automatically
- External process dies β Worker detects and crashes β restart
- Pool crashes β Supervisor restarts entire pool
- BEAM crashes β ProcessRegistry cleans orphans on next startup
-
Shutdown:
- Pool manager sends shutdown to all workers
- Workers close ports gracefully (SIGTERM)
- ApplicationCleanup ensures no orphaned processes (SIGKILL)
Configuration: 16 workers, gRPC Python adapter
Hardware: 8-core CPU, 32GB RAM
gRPC Performance:
Startup Time:
- Sequential: 16 seconds (1s per worker)
- Concurrent: 1.2 seconds (13x faster)
Throughput (gRPC Non-Streaming):
- Simple computation: 75,000 req/s
- ML inference: 12,000 req/s
- Session operations: 68,000 req/s
Latency (p99, gRPC):
- Simple computation: < 1.2ms
- ML inference: < 8ms
- Session operations: < 0.6ms
Streaming Performance:
- Throughput: 250,000 chunks/s
- Memory usage: Constant (streaming)
- First chunk latency: < 5ms
Connection overhead:
- Initial connection: 15ms
- Reconnection: 8ms
- Health check: < 1ms
- Pool Size: Start with
System.schedulers_online() * 2
- Queue Size: Monitor
pool_saturated
errors and adjust - Timeouts: Set appropriate timeouts per command type
- Session TTL: Balance memory usage vs cache hits
- Health Checks: Increase interval for stable workloads
Snakepit v0.3+ includes automatic binary serialization for large data transfers, providing significant performance improvements for ML/AI workloads that involve tensors, embeddings, and other numerical arrays.
- Automatic Detection: When variable data exceeds 10KB, Snakepit automatically switches from JSON to binary encoding
- Type Support: Currently optimized for
tensor
andembedding
variable types - Zero Configuration: No code changes required - it just works
- Protocol: Uses Erlang's native binary format (ETF) on Elixir side and Python's pickle on Python side
# Example: 1000x1000 tensor (8MB of float data)
# JSON encoding: ~500ms
# Binary encoding: ~50ms (10x faster!)
# Create a large tensor
{:ok, _} = Snakepit.execute_in_session("ml_session", "create_tensor", %{
shape: [1000, 1000],
fill_value: 0.5
})
# The tensor is automatically stored using binary serialization
# Retrieval is also optimized
{:ok, tensor} = Snakepit.execute_in_session("ml_session", "get_variable", %{
name: "large_tensor"
})
The 10KB threshold (10,240 bytes) is optimized for typical workloads:
- Below 10KB: JSON encoding (better for debugging, human-readable)
- Above 10KB: Binary encoding (better for performance)
# In your Python adapter
from snakepit_bridge import SessionContext
class MLAdapter:
def process_embeddings(self, ctx: SessionContext, batch_size: int):
# Generate large embeddings (e.g., 512-dimensional)
embeddings = np.random.randn(batch_size, 512).tolist()
# This automatically uses binary serialization if > 10KB
ctx.register_variable("batch_embeddings", "embedding", embeddings)
# Retrieval also handles binary data transparently
stored = ctx["batch_embeddings"]
return {"shape": [len(stored), len(stored[0])]}
-
Tensor Type:
- Metadata (JSON):
{"shape": [dims...], "dtype": "float32", "binary_format": "pickle/erlang_binary"}
- Binary data: Serialized flat array of values
- Metadata (JSON):
-
Embedding Type:
- Metadata (JSON):
{"shape": [length], "dtype": "float32", "binary_format": "pickle/erlang_binary"}
- Binary data: Serialized array of float values
- Metadata (JSON):
The following fields support binary data:
Variable.binary_value
: Stores large variable dataSetVariableRequest.binary_value
: Sets variable with binary dataRegisterVariableRequest.initial_binary_value
: Initial binary valueBatchSetVariablesRequest.binary_updates
: Batch binary updatesExecuteToolRequest.binary_parameters
: Binary tool parameters
- Variable Types: Always use proper types (
tensor
,embedding
) for large numerical data - Batch Operations: Use batch updates for multiple large variables to minimize overhead
- Memory Management: Binary data is held in memory - monitor usage for very large datasets
- Compatibility: Binary format is internal - use standard types when sharing data externally
- Type Support: Currently only
tensor
andembedding
types use binary serialization - Format Lock-in: Binary data uses platform-specific formats (ETF/pickle)
- Debugging: Binary data is not human-readable in logs/inspection
# Check for orphaned processes
ps aux | grep grpc_server.py
# Verify ProcessRegistry is cleaning up
Snakepit.Pool.ProcessRegistry.get_stats()
# Check DETS file location
ls -la priv/data/process_registry.dets
# See detailed documentation
# README_PROCESS_MANAGEMENT.md
# Check adapter configuration
adapter = Application.get_env(:snakepit, :adapter_module)
adapter.executable_path() # Should return valid path
File.exists?(adapter.script_path()) # Should return true
# Check logs for errors
Logger.configure(level: :debug)
# Enable port tracing
:erlang.trace(Process.whereis(Snakepit.Pool.Worker), true, [:receive, :send])
# Check external process logs
# Python: Add logging to bridge script
# Node.js: Check stderr output
# Monitor ETS usage
:ets.info(:snakepit_sessions, :memory)
# Check for orphaned processes
Snakepit.Pool.ProcessRegistry.get_stats()
# Force cleanup
Snakepit.Bridge.SessionStore.cleanup_expired_sessions()
# Enable debug logging
Logger.configure(level: :debug)
# Trace specific worker
:sys.trace(Snakepit.Pool.Registry.via_tuple("worker_1"), true)
# Get internal state
:sys.get_state(Snakepit.Pool)
- Testing Guide - How to run and write tests
- Unified gRPC Bridge - Stage 0, 1, and 2 implementation details
- Bidirectional Tool Bridge - Cross-language function execution between Elixir and Python
- Process Management - Persistent tracking and orphan cleanup
- gRPC Communication - Streaming and non-streaming gRPC details
- Python Bridge Implementations - See sections above for V1, V2, Enhanced, and gRPC bridges
We welcome contributions! Please see our Contributing Guide for details.
# Clone the repo
git clone https://github.com/nshkrdotcom/snakepit.git
cd snakepit
# Install dependencies
mix deps.get
# Run tests
mix test
# Run example scripts
elixir examples/v2/session_based_demo.exs
elixir examples/javascript_grpc_demo.exs
# Check code quality
mix format --check-formatted
mix dialyzer
# All tests
mix test
# With coverage
mix test --cover
# Specific test
mix test test/snakepit_test.exs:42
Snakepit is released under the MIT License. See the LICENSE file for details.
- Inspired by the need for reliable ML/AI integrations in Elixir
- Built on battle-tested OTP principles
- Special thanks to the Elixir community
v0.3 (Current Release)
- β gRPC streaming bridge implementation complete
- β Unified gRPC architecture for all communication
- β Python Bridge V2 architecture with production packaging
- β Comprehensive documentation and examples
- β Performance benchmarks and optimization
- β End-to-end testing across all protocols
Roadmap
- π Enhanced streaming operations and cancellation
- π Additional language adapters (Ruby, R, Go)
- π Advanced telemetry and monitoring features
- π Distributed worker pools
- Hex Package
- API Documentation
- GitHub Repository
- Example Projects
- gRPC Bridge Documentation
- Python Bridge Documentation - See sections above
Made with β€οΈ by NSHkr