This Spark-based ETL pipeline processes insurance claim and contract data. It reads parameters from the configuration file, uses source data, performs transformations, and outputs transaction-level data in Parquet format.
.
├── config/
│ └── config.json # Configuration file with input/output paths
├── main.py # Entry script that ties everything together
├── src/
│ ├── transform.py # Core transformation logic
│ ├── io_base.py # Abstract class defining IO interface
│ ├── io_local.py # Concrete implementation for local CSV + Parquet
│ └── utils/
│ ├── logger.py # Logger setup for file-based logging
│ ├── spark_utils.py # SparkSession builder
│ └── utils.py # Utility to load configuration
├── data/
│ ├── contract.csv # Sample input contract data
│ └── claim.csv # Sample input claim data
├── output/
│ └── transactions.parquet # Final transformed output in Parquet format
├── tests/
│ ├── conftest.py # Test fixtures
│ ├── test_transform.py # Unit tests for transform logic
│ └── test_utils.py # Unit tests for utility functions
├── requirements.txt # Python package requirements
├── README.md # Project overview and instructions
└── exploration.ipynb # Exploratory notebook (for local testing)
- Install dependencies:
pip install -r requirements.txt
- Run the pipeline locally:
python main.py
-
Reads contract and claim CSVs using
LocalCSVParquetIO
-
Joins on
CONTRACT_ID
andSOURCE_SYSTEM
-
Applies transformations such as:
- Type casting
- Regex parsing
- Standardized timestamp formatting
-
Calls external API to derive
NSE_ID
-
Saves final DataFrame as a Parquet file.
{
"input": {
"contract": "data/contract.csv",
"claim": "data/claim.csv"
},
"output": {
"transactions": "output/transactions.parquet"
},
"api": {
"nse_url_template": "https://api.hashify.net/hash/md4/hex?value={CLAIM_ID}"
}
}
Test scripts are located in the tests/
folder. To run all tests:
pytest tests/
- Enriched transaction data is written to the path specified in
config.json
as a Parquet file. - Sample schema includes fields like
CLAIM_ID
,TRANSACTION_TYPE
,CONFORMED_VALUE
,NSE_ID
, and timestamp columns.
- Logs are written to the
logs/
folder using timestamped filenames.
One idea to maintain new data without overwriting the existing records is this.
- Before merging into the table we apply logic to identify and filter out duplicate transactions based on a unique key (e.g. CLAIM_ID + CONTRACT_ID). Maintain a unque view through a Spark SQL view or Delta Lake merge logic.
- Another approach can be, we store the final data in a partitioned manner with the use of an identifier column or hashes(e.g. SHA256) for each row. It can be batch id or ingestion timestamp or any identifier to mark the entry batches. That way we can distinguish between upcoming records and existing records. We do a left anti join to skip the existing.