Upload CSED332(Software Design Method) group project works
You can find how to excute in Docs/Excution Guide.md
1. Participants 2. Milestone 3. Weekly Progress 4. Workflow 5. Feedback
Refering on the The Mythical Man-Month, the development period was divided into 3 parts
-
Design phase
(~ Week 3)
-
Programming & Minor debuging phase
(~ Week 6)
-
System test, obtaining all components
(~ Week 8)
Establish connection between master and workerDesign distributed sorting algorithm for this projectGenerate sample input data for testing key-value parsing
Complete key-value parsing- Send and receive sample datas between master and worker
Implement samplingImplement partitioning
Implement parsing- Implement shuffling
- Implement sorting
- Implement merging
- Test integrated milestone #3 with sample data to see it works well
- Debug errors for sample data
- Test distributed sorting program with intensive data
- Debug all remaining errors for intensive data
- Convention disccusioned
- Seeking useful librarys for implementing this project
- Reading gRPC Docs
- Configured project setting with Metals, sbt, ScalaCheck
- Pseudo code based on merge sort
- Design K-way merge using tournament tree between compute nodes
- Apply Week1 feedback
- Design network interaction
- Pseudo code for K-way merge
- Rough design for network and its exception
- Divide pseudocode design into master & worker
- Design abstact structure of sorting (Sampling)
- Construct detailed network interation messages
- Study how to use multiple cores with scala
- Removed network worker-worker interaction
- Plan for handling packet loss
- Planned disconnection recovery
- Use concorrent programming to prevent master bottleneck
- Implemented basic structure & case object for message and state
- Implemented Key-Value parsing and made simple test cases
- Added gRPC and protobuf to project settings
- Tested simple gRPC messaging using localhost
- Added protobuf for test messaging
- Chose FSM to track-control the worker's state and master's state
- Implemented server starting
- Chose log4j to take log from runtime
- Renewal overall design see here
- Design abstact structure of Shuffle
- Design protobuf (now implement orderly with sorting implementation steps)
- Implement logging code
- Completed FSM design & implementation
- Test logging & logging ip addresses
- Implemented partitioning & dividing key range
- Argument parsing
Workflow based on TDD (Test Driven Development)
Feature
: Add new functionFix
: Fix bugDocs
: Modify DocumentChore
: Change Settings (build, project configs...)Test
: Add/Fix Test suiteRefactor
: Refactor code
main
: For releasedevelop
: Merge test-passed featurestest-[TestName]
: Develop feature and testsuite
- Heavy Milestone #3: Milestones should have similar workloads. Milestone should take feedbacks from previous Milestone.
- No need to follow Mythical Man Month strictly: Update development cycle by merging programming phase and testing/debuging phase.
- Parsing/Sort/Partition/Shuffle, Merge/Sampling is closely related: Well-defined interface needed, extra effort for communication needed.
- TDD's idea is "test is a some kind of document": Record TDD application for docs with details.
- Check pseudocode: Partitioning and Shuffling.
- Introducing proper locking mechanism.
- Network defect: reconnect between nodes and recovering network.
- Tournament tree: initialization, save, data type, metadata.
- Network interaction: role of Master, necessity of Worker-Worker.
- Flexibly divide components: interaction may contain multiple messages For example, network disconnection can be problem for this.
- Testnames: Use Testnames that represent the tested object and normaility well
- Network connection: Test master - worker connection establishment
-
Major Design problem: No worker -> master -> worker data exchange sequential running will happen.
-
Internal Sorting phase problem: Internal sorting should be perform in other phase
expired design, should be updated
Master
divide_data(number_of_worker);
for i in 0 to number_of_worker, i++:
internal_sort(disk[i])
queue.insert({i})
merged = number_of_worker
while(merged != 1):
concurrent_excute(send, receive)
send:
while(queue.size > 1):
one = queue.front
queue.pop
two = queue.front
queue.pop
if(one.size < two.size):
swap(one, two)
network_send(to: worker[one].head, next_worker: worker[one], work_with: worker[two], reply: {worker[two], worker[one]}.flatten, next_head: True)
network_send(to: worker[two].head, next_worker: worker[two], work_with: worker[one], reply: empty, next_head: False)
merged -= 2
recevie:
recevied_msg = network_receive()
if(recevied_msg == sort_done):
queue.push(sort_done_recieve)
merged += 1
if(recevied_msg == wrong_send):
resend_msg(to: proper_worker)
Worker:
next_worker, work_with, reply, next_head = network_receive()
if(next_worker.head != self):
network_exception_reply(exception: wrong_send)
else:
interrupt_on(network_receive, recevie)
if(next_head):
while(True):
sort()
if(is_there_done && is_here_done):
break;
if(to.tail == empty):
network_send(to: master, sort_done: reply)
else:
network_send(to: next_worker.tail.head, next_worker: next_worker.tail, work_with: work_with, reply: reply, next_head: next_head)
sort:
if(RAM.my_disk.empty && !is_here_done):
my_disk = disk_cursor ~ disk_cursor + BLOCK_SIZE
disk_cursor += BLOCK_SIZE
if(disk_cursor > DISK_SIZE):
is_here_done = True
RAM_push(my_disk)
if(RAM.received_msg.empty && !is_there_done):
network_send(to: work_with.head, msg: RAM.sorted_data)
wait_until_receive()
RAM.sorted_data.flush
else if(RAM.received_msg.empty && is_there_done):
disk_push(RAM.sorted_data)
disk_push(RAM.my_disk)
is_here_done = True
RAM.flush
while(!RAM.my_disk.empty || !RAM.received_msg.empty):
take_smaller_one_and_pop()
RAM_push(sorted_data, smaller_one)
receive:
if(received_msg == sort_done_here):
next_worker = next_worker.tail
if(next_worker == NULL):
is_there_done = True
else:
resend_msg(to: next_worker.tail.head)
else if(sort_done):
network_exception_reply(exception: sort_done_here)
else if(next_head):
RAM_push(received_msg)
else:
RAM_push(received_msg)
network_send(to: work_with.head, memory(disk_cursor ~ disk_cursor + BLOCK_SIZE))
disk_push(RAM.received_msg)
disk_cursor += BLOCK_SIZE
Plan for handling packet loss
TCP-like acknowlegement number and EOF: Messages that sends multiple data packets should have their own TCP-like acknowlegement number to ensure the message sendings were performed well.
Timeout: If acknowlegement message doesn't arrive during speific time(e.g. timeout), resend the message.
Plan for handling disconnection
To recover from disconnection, create checkpoint tokens and manipulate with queue for each master-worker connection.
Checkpoints should be orderly enqueued, dequeue checkpoints that has been passed
Should not init checkpoint queue at start of the program; shuffling-like phase needs multiple checkpoints to ensure atomic
Need testcases that causes disconnection intentionally.
Plan for network interaction enhancement
All communications should be tracked by master, can use FSM to track the worker's state
To prevent bottleneck of each master/worker caused by multiple network connection, handle network connection concurrently.
Establish connections using multiple threads
for each worker - worker connection for shuffling, create Input thread and output thread.
Use locking mechanism to prevent racing while sending/receiving the datas
Message type | Content | Sender | Receiver |
---|---|---|---|
EstablishRequest | Ip and port of worker and master | worker | master |
EstablishResponse | Boolean that indicates connection between master and worker has been established well | master | worker |
SamplingRequest | Ip and port of worker, percentage of wanted samples | master | worker |
SamplingResponse | Stream of sample keys | worker | master |
PartitioningRequest | Ip and port of worker, Table of IP and partition | master | worker |
PartitioningResponse | Boolean that indicates partitioning has been complete successfully | worker | master |
InternalSortRequest | Ip and port of worker | master | worker |
InternalSortResponse | Boolean that indicates internal sorting of worker has been complete successfully | worker | master |
ShuffleRunRequest | Ip and port of worker | worker | master |
ShuffleRunResponse | Boolean that indicates shuffling has been complete successfully | master | worker |
ShuffleExchangeRequest | Ip and port of source and destination | worker | master |
ShuffleExchangeResponse | Ip and port of source and destination, Stream of data that should be exchanged | master | worker |
MergeRequest | Ip and port of worker | master | worker |
MergeResponse | Boolean that indicates merging has been complete successfully | worker | master |
VerificationRequest | Ip and port of worker | master | worker |
VerificationResponse | Boolean that indicates verification has been complete successfully | worker | master |