-
-
Notifications
You must be signed in to change notification settings - Fork 11
distributed_computing_analysis.md
Mohamed edited this page Jun 10, 2025
·
1 revision
-
Location:
example/core/example10_distributed_computing.cpp
- Objective: This example constructs a simulation of a distributed computing environment. It effectively demonstrates how multiple QB actors can collaborate to manage a workflow involving task generation, intelligent scheduling, parallel execution by worker nodes, result aggregation, and overall system monitoring. It's a valuable case study for understanding more complex, multi-stage actor interactions and asynchronous process simulation.
Through this analysis, you will see how QB facilitates:
- Complex, multi-actor workflows.
- Dynamic task scheduling and basic load balancing concepts.
- Simulation of work and delays using asynchronous callbacks.
- Centralized coordination and state management by dedicated actors.
- System-wide monitoring and statistics gathering.
The simulation is orchestrated by several types of actors, each with distinct responsibilities, potentially running across multiple VirtualCore
s:
-
Role: Periodically creates new computational
Task
objects (defined in the example with properties like type, priority, complexity, and data). -
QB Integration: Standard
qb::Actor
. -
Functionality:
- Uses
qb::io::async::callback
to triggergenerateTask()
at intervals determined by aTASKS_PER_SECOND
constant. -
generateTask()
: Constructs aTask
object with randomized attributes. - Encapsulates the
Task
within aTaskMessage
event andpush
es it to theTaskSchedulerActor
. - Ceases generation upon receiving a
ShutdownMessage
.
- Uses
-
Role: Receives tasks, queues them, and dispatches them to available
WorkerNodeActor
s based on a scheduling strategy (prioritization and basic load awareness). -
QB Integration: Standard
qb::Actor
. -
State Management:
-
_worker_ids
: A list of availableWorkerNodeActor
IDs. -
_worker_metrics
: A map (qb::ActorId
toWorkerMetrics
) storing the last known status (heartbeat, utilization) of each worker. -
_task_queue
: Astd::deque
ofstd::shared_ptr<Task>
, acting as the pending task buffer. -
_active_tasks
: A map (task_id
tostd::shared_ptr<Task>
) tracking tasks currently assigned to workers.
-
-
Event Handling & Logic:
-
on(TaskMessage&)
: Adds the new task to_task_queue
and attempts to schedule it immediately viascheduleTasks()
. -
on(WorkerHeartbeatMessage&)
: Updates thelast_heartbeat
timestamp for the reporting worker in_worker_metrics
. -
on(WorkerStatusMessage&)
: Updates the fullWorkerMetrics
(including utilization) for the reporting worker. -
on(ResultMessage&)
: ReceivesTaskResult
from aResultCollectorActor
(indirectly signaling task completion by a worker), removes the task from_active_tasks
, and attempts toscheduleTasks()
to dispatch new work if workers are free. -
on(UpdateWorkersMessage&)
: Receives the initial list ofWorkerNodeActor
IDs from theSystemMonitorActor
.
-
- **Scheduling (
scheduleTasks()
):- Sorts
_task_queue
byTask::priority
(higher priority first). - Iterates through known
_worker_ids
. - For each worker, calls
isWorkerAvailable()
to check its status. - If a worker is available and tasks are pending, dequeues the highest priority task, moves it to
_active_tasks
, andpush
es aTaskAssignmentMessage
(containing theTask
) to that worker.
- Sorts
-
Worker Availability (
isWorkerAvailable()
): Checks if metrics for the worker exist, if itslast_heartbeat
is recent (withinHEARTBEAT_TIMEOUT
), and if its reportedutilization
is below a defined threshold (e.g., 80%), indicating it has capacity. -
Load Assessment (
assessLoadBalance()
): Periodically scheduled viaqb::io::async::callback
. Calculates and logs average worker utilization and current task queue sizes for monitoring purposes.
-
Role: Receives
TaskAssignmentMessage
s and simulates the execution of the assignedTask
. -
QB Integration: Standard
qb::Actor
. -
State Management:
_current_task
,_metrics
(its ownWorkerMetrics
),_is_busy
flag,_simulation_start_time
,_busy_start_time
. - **Initialization & Reporting (
onInit()
,scheduleHeartbeat()
,scheduleMetricsUpdate()
):- In
onInit()
, after receivingInitializeMessage
, it starts two recurringqb::io::async::callback
chains:-
scheduleHeartbeat()
: Periodically sends aWorkerHeartbeatMessage
(containing its ID, current time, and busy status) to theTaskSchedulerActor
. -
scheduleMetricsUpdate()
: Periodically calculates its own utilization and other metrics, then sends aWorkerStatusMessage
to theTaskSchedulerActor
.
-
- In
- **Task Execution (
on(TaskAssignmentMessage&)
):- If not already
_is_busy
, accepts the task. - Sets
_is_busy = true
, stores_current_task
, updates the task's status toIN_PROGRESS
. -
push
es aTaskStatusUpdateMessage
back to theTaskSchedulerActor
. -
Simulates Work: Schedules a call to its own
completeCurrentTask()
method usingqb::io::async::callback
. The delay for this callback is determined by thetask->complexity
(usinggenerateProcessingTime()
).
- If not already
- **Task Completion (
completeCurrentTask()
- invoked by the scheduled callback):- Calculates simulated processing time.
- Randomly determines if the task succeeded or failed.
- Updates its internal
_metrics
(total tasks processed, processing time, success/failure count). - Creates a
TaskResult
object. -
push
es aResultMessage
(containing theTaskResult
) to theResultCollectorActor
. -
push
es a finalTaskStatusUpdateMessage
(COMPLETED or FAILED) to theTaskSchedulerActor
. - Sets
_is_busy = false
, making itself available for new tasks.
-
Role: Receives
ResultMessage
s fromWorkerNodeActor
s and stores/logs the outcomes of completed tasks. -
QB Integration: Standard
qb::Actor
. -
State Management:
_results
(a map fromtask_id
toTaskResult
). -
Event Handling:
-
on(InitializeMessage&)
: Activates the actor. -
on(ResultMessage&)
: Stores the receivedresult
in its_results
map and logs the result to the console. -
on(ShutdownMessage&)
: Calculates and prints final statistics (overall success rate, average processing time across all tasks) based on the collected_results
.
-
- Role: Sets up the entire simulation, starts the actors, monitors overall system performance, and initiates a graceful shutdown.
-
QB Integration: Standard
qb::Actor
. - **Initialization (
onInit()
andon(InitializeMessage&)
):- Creates instances of all other actor types (
TaskGeneratorActor
,TaskSchedulerActor
,ResultCollectorActor
, and a pool ofWorkerNodeActor
s), assigning them to different cores (qb::Main::addActor
). - Stores their
ActorId
s. -
push
es anInitializeMessage
to all created actors to kickstart their operations. - Sends an initial
UpdateWorkersMessage
(containing all worker IDs) to theTaskSchedulerActor
.
- Creates instances of all other actor types (
- **Performance Monitoring (
schedulePerformanceReport()
):- Uses
qb::io::async::callback
to periodically call itself (by pushing aSystemStatsMessage
to its own ID after calculation). - Calculates system-wide statistics (e.g., total tasks generated, completed, failed; overall throughput) using global
std::atomic
counters (which are incremented by other actors likeTaskGeneratorActor
andWorkerNodeActor
). - Logs these system-wide stats.
- Uses
- **Simulation Shutdown (
shutdownSystem()
):- Scheduled via
qb::io::async::callback
to run afterSIMULATION_DURATION_SECONDS
. - Performs a final performance report.
-
broadcast<ShutdownMessage>()
to all actors to signal them to stop their activities and terminate gracefully. - Finally, calls
qb::Main::stop()
to stop the entire engine.
- Scheduled via
- Complex Multi-Actor Workflow: Illustrates a sophisticated pipeline with clear separation of concerns: generation, scheduling, execution, collection, and system-level monitoring.
-
Asynchronous Simulation of Work: Extensive use of
qb::io::async::callback
to simulate task processing times and to schedule periodic actions (heartbeats, metrics updates, report generation) without blocking anyVirtualCore
. -
Centralized Coordination & State:
-
TaskSchedulerActor
: Manages the central task queue and worker availability. -
ResultCollectorActor
: Aggregates all final task outcomes. -
SystemMonitorActor
: Orchestrates the setup and shutdown of the entire simulation.
-
-
Dynamic Task Scheduling & Basic Load Balancing: The
TaskSchedulerActor
uses worker heartbeats and reported metrics (utilization) to make decisions about which worker should receive the next task, also prioritizing tasks from its queue. -
Worker Self-Monitoring and Reporting:
WorkerNodeActor
s proactively send heartbeats and status updates to the scheduler, enabling the scheduler to make informed decisions. -
Inter-Actor Event Design: Shows various custom event types (
TaskMessage
,WorkerStatusMessage
,ResultMessage
,ShutdownMessage
, etc.) designed to carry specific information between collaborating actors. -
Graceful Shutdown: A coordinated shutdown process initiated by the
SystemMonitorActor
via aShutdownMessage
, allowing other actors to finalize their work and report statistics before the engine stops. -
Global Atomics for High-Level Stats: Use of
std::atomic
global counters demonstrates a simple way to aggregate high-level system statistics from multiple concurrent actors with thread safety (though for more complex stats, a dedicated statistics actor would be preferable).
This distributed_computing
example serves as a rich template for understanding how to structure larger, more intricate applications using the QB Actor Framework, particularly those involving distributed workflows and resource management.
(Next Example Analysis: file_monitor Example Analysis**)