A complete event-driven pipeline that processes tabular data through three classification stages and returns results to Redis.
The HDX SSD (Sensitive Data Detection) Pipeline processes HDX resources through a three-stage classification system:
- PII Detection: Identifies personally identifiable information in table columns
- PII Reflection: Determines sensitivity levels for detected PII entities
- Non-PII Classification: Assesses overall table sensitivity for non-PII aspects
Redis Event → Download CSV → Preprocess → PII Detection → PII Reflection → Non-PII Classification → Redis Response
- Python 3.8+
- Redis server
- Azure OpenAI API access
- Clone the repository:
git clone <repository-url>
cd hdx-ssd-pipeline- Install dependencies:
pip install -r requirements.txt- Set up environment variables:
cp .env.example .env
# Edit .env with your configurationCreate a .env file with the following variables:
# Azure OpenAI Configuration
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_API_KEY=your-api-key-here
AZURE_OPENAI_API_VERSION=2024-02-15-preview
# Redis Configuration
REDIS_STREAM_PORT=6379
REDIS_STREAM_DB=0Use the provided docker-compose.yml to set up Redis:
docker-compose up -dStart the pipeline to listen for HDX events:
python main.pyThe pipeline will:
- Listen for events on the
sdd:tablesRedis stream - Download resources from HDX when events are received
- Process files through the three classification stages
- Return formatted results
Use the provided event generator to test the pipeline:
python redis_streams_event_generator.py- Receives HDX events containing
resource_id - Downloads CSV/Excel files from HDX API
- Preprocesses files to extract table structure
{
'resource_id': '1234567890',
'file_name': 'example.csv',
'file_url': 'https://example.com/example.csv'
'processing_timestamp': '2025-01-01 12:00:00'
'processing_success': True,
'n_records': 100,
'n_columns': 10,
'pii_sensitive': True,
'non_pii_sensitive': True,
'columns': [
{
'column_name': 'email_address',
'sample_values': ['john@example.com', 'jane@company.com'],
'pii': {
'entity_type': 'EMAIL_ADDRESS',
'sensitive': True
}
}
]
'non_pii': {
'sensitivity': 'LOW'
'explanation': 'The table contains email addresses, which are considered sensitive data.'
}
}- Analyzes each column for PII entities
- Uses sample values and column names
- Returns entity types (e.g., PERSON_NAME, EMAIL_ADDRESS, etc.)
{
'column_name': 'email_address',
'sample_values': ['john@example.com', 'jane@company.com'],
'pii': {
'entity_type': 'EMAIL_ADDRESS',
}
}- For columns with detected PII, determines sensitivity level
- Considers table context and entity type
- Returns sensitivity levels: NON_SENSITIVE, MODERATE_SENSITIVE, HIGH_SENSITIVE, SEVERE_SENSITIVE
{
'column_name': 'email_address',
'sample_values': ['john@example.com', 'jane@company.com'],
'pii': {
'entity_type': 'EMAIL_ADDRESS',
'sensitive': True
}
}- Analyzes overall table for non-PII sensitivity
- Uses ISP (Information Sensitivity Protocol) rules
- Considers humanitarian data sharing guidelines
- Combines all classification results
- Formats for Redis response
- Includes processing metadata and summaries
Models can be configured in utils/main_config.py:
NON_PII_DETECT_MODEL = 'gpt-4o-mini'
PII_DETECT_MODEL = 'gpt-4o-mini'
PII_REFLECT_MODEL = 'gpt-4o-mini'Information Sensitivity Protocol rules are defined in utils/main_config.py under ISP_DEFAULT. These rules define sensitivity levels for humanitarian data sharing.
hdx-ssd-pipeline/
├── classifiers/ # Classification modules
├── llm_model/ # LLM integration
├── pipeline/ # Orchestration logic
├── preprocessing/ # Data preprocessing
├── prompts/ # Prompt templates
├── utils/ # Configuration and utils
├── main.py # Main pipeline entry point
└── requirements.txt # Dependencies
The pipeline includes comprehensive error handling:
- Network errors during HDX downloads
- File processing errors
- Classification failures
- All errors are logged and returned in results
Logs are written to:
logs/ssd.log- General pipeline logslogs/ssd-json.log- JSON formatted logs
- Create new classifier in
classifiers/ - Extend
BaseClassifier - Add to orchestrator in
pipeline/orchestrator.py
Run tests with:
python -m pytest test/- Fork the repository
- Create a feature branch
- Make your changes
- Add tests if applicable
- Submit a pull request
[Add license information]
jinja2 openai python-dotenv requests