A Python package for efficiently transferring documents from Elasticsearch to Pinecone with vector embeddings and threading support.
ES to Pinecone bridges the gap between traditional search engines (Elasticsearch) and vector databases (Pinecone), enabling semantic search capabilities. The pipeline extracts documents from Elasticsearch, generates vector embeddings using providers like OpenAI or HuggingFace, and uploads them to Pinecone.
- Simple Configuration: Minimal setup required through environment variables or direct configuration
- Multiple Embedding Providers: Support for OpenAI, HuggingFace, and custom embedding generators
- Multi-threaded Processing: Parallel processing for significantly faster operations
- Flexible Field Selection: Choose which document fields to embed and include as metadata
- Progress Tracking: Built-in progress visualization with tqdm and custom callbacks
- Robust Error Handling: Comprehensive exception handling and detailed logging
- Dry Run Mode: Test configuration without writing to Pinecone
pip install es-to-pinecone
- Create a
.env
file with your configuration:
# Elasticsearch Configuration
ES_HOST=http://localhost:9200
ES_USERNAME=elastic
ES_PASSWORD=changeme
ES_INDEX=your_index
# Embedding Configuration
EMBEDDING_TYPE=openai
OPENAI_API_KEY=your_openai_key
OPENAI_MODEL=text-embedding-ada-002
# Pinecone Configuration
PINECONE_API_KEY=your_pinecone_key
PINECONE_ENVIRONMENT=us-west1-gcp
PINECONE_INDEX_NAME=your_pinecone_index
# Pipeline Configuration
BATCH_SIZE=100
MAX_THREADS=5
FIELDS_TO_EMBED=title,content
METADATA_FIELDS=author,date,url
DEFAULT_NAMESPACE=default
- Use the pipeline in your code:
from es_to_pinecone_transfer.pipeline import ElasticsearchToPineconePipeline
# Initialize the pipeline
pipeline = ElasticsearchToPineconePipeline()
# Run the pipeline
stats = pipeline.run()
print(f"Processed: {stats['processed']} documents")
print(f"Upserted: {stats['upserted']} documents")
print(f"Failed: {stats['failed']} documents")
- Or use the command-line interface:
es-to-pinecone
Comprehensive documentation is available in the docs directory:
- Configuration Options: All available configuration settings
- Usage Examples: Detailed usage examples
- API Reference: Complete API documentation
- Troubleshooting: Solutions for common issues
Ready-to-use example scripts are available in the examples directory:
- basic_transfer.py: Simple document transfer
- advanced_transfer.py: Advanced usage with progress tracking and filtering
- semantic_search.py: Perform semantic search with the transferred vectors
# Define a query to filter documents
query = {
"bool": {
"must": [
{"match": {"status": "published"}},
{"range": {"published_date": {"gte": "2023-01-01"}}}
]
}
}
# Run the pipeline with the query
stats = pipeline.run(query=query)
# Set field mapping
pipeline.set_field_mapping({
'title': 'document_title',
'content': 'document_content',
'author': 'author_name'
})
# Define a progress callback
def progress_callback(current_batch, total_batches):
percent = (current_batch / total_batches) * 100
print(f"Progress: {current_batch}/{total_batches} ({percent:.2f}%)")
# Set the progress callback
pipeline.set_progress_callback(progress_callback)
from es_to_pinecone_transfer.exceptions import (
ElasticsearchConnectionError,
PineconeConnectionError,
EmbeddingError,
ConfigurationError
)
try:
pipeline = ElasticsearchToPineconePipeline()
stats = pipeline.run()
except ConfigurationError as e:
print(f"Configuration error: {e}")
except ElasticsearchConnectionError as e:
print(f"Elasticsearch connection failed: {e}")
except PineconeConnectionError as e:
print(f"Pinecone connection failed: {e}")
except EmbeddingError as e:
print(f"Embedding generation failed: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
The pipeline consists of these main components:
- ElasticsearchClient: Extracts documents from Elasticsearch
- Embedding Generators: Convert text to vector embeddings
- PineconeClient: Uploads vectors to Pinecone
- Pipeline Coordinator: Manages the transfer process with threading
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add some amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.