- Utsav Basu - 20CS30057
- Anamitra Mukhopadhyay - 20CS30064
- Ritwik Ranjan Mallik - 20CS10049
- Anand Manojkumar Parikh - 20CS10007
Before starting, permissions for docker.sock
have to be updated.
sudo chmod 777 <location of docker.sock> # usually the location is /var/run/docker.sock
There is a Makefile in the ./src
directory.
To start everything:
make
To stop everything:
make stop
For A1-A3, the read times and write times are checked on two different machines. The payloads can be found inside read_write_testing.
Read Time (in s) | Write Time (in s) | |
---|---|---|
Machine 1 | 433.89 | 1577.40 |
Machine 2 | 309.94 | 2234.44 |
Read Time (in s) | Write Time (in s) | |
---|---|---|
Machine 1 | 437.80 | 4020.94 |
Machine 2 | 274.79 | 2838.72 |
Read Time (in s) | Write Time (in s) | |
---|---|---|
Machine 1 | 425.43 | 3251.08 |
Machine 2 | 287.27 | 3247.10 |
The prerequisites and sample request-reponse pairs for the endpoints can be found inside endpoint_testing.
In the ./src/client/data
directory, there are two files fnames.txt and lnames.txt. fnames.txt
contains 942
unique first names and lnames.txt
contains 995
unique last names. Using these names, generate.py creates 942 * 995 = 937290
data entries with marks generated unformly at random in the range from 1
to 100
.
There are two major specifications for interfacing web applications with web servers: WSGI (Web Server Gateway Interface) and ASGI (Asynchronous Server Gateway Interface). WSGI is a synchronous interface, meaning that it handles one request at a time per process or thread. On the other hand, ASGI supports handling multiple requests concurrently without blocking. Since the load balancer should be able to handle as many as 10,000 concurrent requests, an web application based on WSGI such as Flask is not suitable. Rather, Quart, a framework built on top of Flask supporting ASGI servers is a better choice.
In python, asyncio is a library to write concurrent code using async/await syntax. Many asynchronous python frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues etc. are built on top of asyncio.
The aiohttp library is used for bulilding asynchronous HTTP client/server for asyncio and Python. It supports both Server WebSockets and Client WebSockets.
The aiodocker library is a simple docker HTTP API wrapper written with asyncio and aiohttp. Rather than making system calls inside the load balancer to perform docker related tasking like adding or removing server containers, aiodocker exposes functions with loads of flexibilities to perform these tasks.
This library is used for controlling concurrent read/writes to two important data structures used in the application. One of these data structures is used for mapping requests to virtual servers by consistent hasing. The other maintains the count of failed heartbeats for the servers.
Asyncpg is a high-performance, asynchronous PostgreSQL database client library for Python. It is designed to provide efficient and low-latency communication with a PostgreSQL database by leveraging the power of Python's asyncio framework.
Different design choices have been made throughout the application, keeping in mind efficiency, functionality and understandability. Some of these are given below:
Both threading and asyncio libraries in Python support concurrent programming. However, choice was made to use the asyncio library due to the following reasons:
- Due to the Global Interpreter Lock, only one thread can execute Python code at once. This means that for most Python 3 implementations, the different threads do not actually execute at the same time, they merely appear to.
- Threads are used to perform preemptive multi-tasking where the scheduler decides which threads to run and when. On the other hand, asyncio model allows cooperative multi-tasking where the user decides which tasks to run and when.
The application runs several taks asynchronously. These are:
spawn_container
: This task is used to spawn the server containers. The load balancer maintains a total ofN
servers at a time. If some server goes down, the load balancer detects it and spawns a new server.remove_container
: This task is used to remove the spawned server containers. The load balancer sends a signal SIGINT to the running container to stop it. If the container does not stop withinSTOP_TIMEOUT
, it sends a SIGKILL signal to remove the container altogether.get_heartbeat
: This task is used to periodically get the heartbeats of the server containers. Whenever a new server container is added, an entry is initialized as 0. This maintains the number of failed heartbeats for that container. The load balancer checks the heartbeats at an interval ofHEARTBEAT_INTERVAL
seconds. Upon not receiving the heartbeat for a server, the corresponding entry is increased. When this entry reachesMAX_FAIL_COUNT
, the server is assumed to be down and is restarted. This task is run as background task using Quart.handle_flatline
: This task is used to handle the flatline of the servers. When a server becomes unresponsive and is assumed to be down, this task respawns a new server. This new server replaces the failed server.
A cooperation takes place whenever there are multiple tasks to perform simultaneously. For docker related tasks such as spawning and removing a container, the set of tasks are added to to a pool and processed in batches of size DOCKER_TASK_BATCH_SIZE
. This is done using a semaphore initialized to DOCKER_TASK_BATCH_SIZE
. For http requests, similarly, the set of requests are processed in batches of size REQUEST_BATCH_SIZE
. This is done using a semaphore initialized to REQUEST_BATCH_SIZE
. Cooperation is ensured by calling await asyncio.sleep(0)
inside a task in appropiate places, which gives other tasks a chance to run before it itself starts executing.
Some design choices have been made during dockerizing the application. These are as follows:
- The default python image is based on the Ubuntu image which installs some unnecessary packages not needed for our application. Instead, the python image based on the Alpine image is used. This image is much lighter than the earlier one.
- Multistage build have been used during dockerization. This is done to reduce the image sizes for the load balancer and the server as much as possible.
- Both the load balancer and server containers are deployed via a shell script
deploy.sh
.postgres
is first run as a background task and the main process waits until it starts, using a variablepg_isready
. Whenpostgres
is up and running, the main process runs the actual python file as a background task -load_balancer.py
for load balancer andserver.py
for servers. Signal handlers are modified such that anySIGTERM
orSIGINT
signal to the main process is sent to the two child processes as well. The main process then waits for the two child processes.
For ensuring distributed database consistency, a Write-Ahead Logging (WAL) mechanism is used. WAL works as follows:
- Logging Changes Before Writing to Disk: With WAL, any changes to the database are first recorded in a log file before they are applied to the database files on disk. This log file is stored in a durable storage medium (like SSDs, RAID-configured HDDs or SSDs, and cloud storage).
- Sequential Writes: The changes are written sequentially to the log, which is more efficient than random disk access, especially for large transactions or when multiple transactions are occurring simultaneously.
- Recovery Process: In the event of a crash or restart, the system reads the WAL file(s) to redo operations that were not fully committed to the database files, ensuring that no committed transactions are lost. It can also undo any changes from transactions that were not completed, maintaining data integrity.
- Replication Log Shipping: A primary database server is chosen for each shard when writing to that shard is going to happen. The primary database server writes changes to its WAL as part of normal operations. These WAL records can then be shipped to replica servers, where they are replayed to apply the same changes. This ensures that all replicas process the same set of changes in the same order, maintaining consistency across the system.
- Synchronous Replication: The primary waits for the majority replica to commit the data before committing its copy of data and acknowledging transactions, which ensures strong consistency. If the primary shard fails, then a new primary is chosen from the replicated shards having the most updated log entries. As soon as the downed database server is up, it copies all the shards from the primary shards. It ensures the system can be recovered from crash failure with consistent data.
A server maintains three tables:
StudT(stud_id: INTEGER, stud_name: TEXT, stud_marks: INTEGER, shard_id: TEXT)
TermT(shard_id: TEXT, last_idx: INTEGER, executed: BOOLEAN)
logT(log_idx: INTEGER, shard_id: TEXT, operation: TEXT, stud_id: INTEGER, content: JSON)
The load balancer maintains a single table:
ShardT(stud_id_low: INTEGER, shard_id: TEXT, shard_size: INTEGER, valid_at: INTEGER)
A particular shard is stored in a number of servers. In this group, one server acts as the primary server and rest are secondary servers for that particular shard. Upon receiving a request the load balancer first queries the shard manager to find out primaries for the relevant shards. The load balancer sends the request to the primary server which forwards it to the secondary servers.
Bookkeeping operations are performed by the servers upon receiving a request for a particular shard. The requests can be assumed to have a general form (shard_id, term, op)
. Bookkeeping happens in the following fashion:
- The server consults its
logT
table to find all entries corresponding toshard_id
. - For entries where (
term
>last_idx
) or (term
==last_idx
andop
== 'R'), if the log entry is not executed, it is executed. - For entries where (
term
<last_idx
) or (term
==last_idx
andop
!= 'R'), the entry is removed from the log.