This data pipeline ingests live traffic disruption data from Transport for London (TFL) via public APIs. It processes and validates the data using Pydantic, stores it in a PostgreSQL database, and exposes it through a FastAPI server. The project also includes a terminal user interface (TUI) for infrastructure control, a suite of Pytest tests, and an example client that demonstrates usage of the API.
The aim of the project is to show proficiency in python under the theme of data-engineering. The pipeline therefore strictly uses python instead of industry-grade software. The pipeline is therefore architected as close to 'first-principles' as possible - leaving the maximum possible flexibility and scope for the former requirements.
In general, the pipeline follows an Extract, Transform, Load (ETL) structure.
Note: Red lines indicate operations flow and access and Black indicates logical data flow.
This API endpoint is provided by Transport for London. Details of the endpoint connection process and data stream format can be found at: https://api-portal.tfl.gov.uk/api-details#api=Road&operation=Road_DisruptionByPathIdsQueryStripContentQuerySeveritiesQueryCategoriesQuery&definition=System-2. In addition, the data stream is live.
The datalake component (managed by datalake_manager.py) acts as an intermediate storage layer between each ETL step. After each processing stage—Extract, Transform, or Load—a snapshot of the data is saved to disk. This approach prevents large datasets from being held in memory and allows for easier debugging, recovery, and auditing of pipeline stages. The datalake_manager module abstracts all file operations (opening, reading, writing, and moving files), so other classes can interact with the datalake using simple, clean read and write methods, without worrying about file system details. This design simulates the behavior of production-grade datalake solutions in a lightweight, Pythonic way.
In the Extract step, the pipeline connectes to the live TFL API endpoint and receives the latest traffic disruption data. The data is returned in JSON format and is then saved as a snapshot using the datalake.
In the Transform step, the raw data snapshots from the datalake are processed. This stage involves cleaning the data, validating fields and types (using Pydantic models), removing duplicates, and restructuring the data as needed for downstream storage. The transformed data is then saved back to the datalake, ready for loading into the database.
In the Load step, the transformed data is read from the datalake and batch-inserted into the PostgreSQL database.
The orchestrator coordinates the execution of the entire ETL pipeline. It manages the order and timing of each step (Extract, Transform, Load), handles error logging, and ensures that each stage completes successfully before moving to the next. The orchestrator can be triggered manually, scheduled via cron jobs, or controlled through the TUI, providing flexibility and reliability in pipeline operations.
The pipeline logger is a shared object passed throughout the codebase, enabling unified logging of key events and metrics across all ETL stages. This ensures consistent and centralized monitoring of pipeline health and operations.
The API server is built with FastAPI and exposes RESTful endpoints for querying, exporting, and interacting with the processed disruption data. It enables external applications and users to access the pipeline’s data and functionality in a protected way.
API endpoints | Database |
---|---|
![]() |
![]() |
There are various workflows with regard to pipeline operation. The pipeline can be run manually to ingest data once, in a single cycle. In addition, an external scheduler, like cron, can be used to schedule pipeline runs. This effectively causes the pipeline to run automatically.
A TUI is provided to interact with and monitor pipeline operations. This interface allows an operator to see, at a glance, the state of the pipeline.
Other than the core pipeline itself, a TUI and Example application make for suitable additions to the codebase.
The TUI provides a command-line interface for monitoring and controlling the pipeline. It allows users to view pipeline status, trigger runs, and access logs in real time, making pipeline management accessible and user-friendly without needing to interact directly with the codebase.
Overview | Pipeline Controls |
---|---|
![]() |
![]() |
Pipeline History | Pipeline Operations Log |
![]() |
![]() |
The example application completes the data lifecycle by consuming processed disruption data from our API server. It fetches data from the pipeline’s RESTful endpoints, processes it, and creates an interactive visualizations using Plotly.
- Python - the core language for all pipeline logic and scripting
- Pydantic - for data validation and sanitisation
- PostgreSQL - for the database component
- FastAPI - to create RESTful API endpoints with which to access the database
- Pytest - creating unit and integration tests for the codebase
- Pandas - manipulating data
- Plotly - easy data visualisation in example application
- Textual - a framework for building TUIs in python
The pipeline is architected in such a way that it can easily be extended. Areas of extension are:
- Adding more data sources for richer insight capabilities
- Adding more applications making use of API endpoints
- Adding more / more varied API endpoints to the API server
Note: The pipeline contains several important files in the root directory. They include:
- .env file - contains API keys, Database connection URL, etc
- Makefile - for easy setup and running
- requirements.txt - used when setting up
- pipeline_enabled.flag - used when scheduling pipeline runs
- run_pipeline.sh - used when scheduling pipeline runs
Please ensure that all these files are present
Note: the pipeline depends on its connection to a hosted DB as well as its connection to the TFL endpoint. Details relating to these are specified within the .env file in the root directory. Please ensure that this is filled out (if not by default) before attempting to run the pipeline.
-
Clone the repo
-
Ensure all the root directory files (mentioned above) are present and configured (should be by default)
-
Navigate to the root directory
-
run
make setup
. This will automatically create and setup a python virtual environment with all the project dependencies -
run
source pipeline_venv/bin/activate
. This will activate the python virtual environment -
run
make run_pipeline
to run the pipeline once. Logs should be printed in pipeline_logs/pipeline_logs.log and in the terminal. -
run
make run_tui
to open the TUI. This is best done in a separate terminal window. -
run
make run_tests
to test modules using pytest -
run
make start_api
to start the API server. This is best done in a separate terminal window. -
run
make run_example
to run an example application.
Note: this command will start a uvicorn server locally hosted. Ensure that the provided URL is specified in the .env file under PIPELINE_API_ENDPOINT.
When finished:
11. run make clean
to remove the virtual environment
12. run deactivate
to return to normal shell.
Note: A bash script run_pipeline.sh has been provided. When using an external scheduler, schedule this script to be run and not the pipeline directly - this allows for the toggle switch in the TUI to work properly.