-
-
Notifications
You must be signed in to change notification settings - Fork 11
async_in_actors.md
Actors in the QB Framework can directly and seamlessly leverage the asynchronous capabilities of qb-io
. This allows actors to perform time-based operations, schedule deferred tasks, or manage I/O-bound activities without blocking their VirtualCore
and impeding the progress of other actors on the same core.
(qb/io/async/io.h
)
This is the primary and most flexible way for an actor to schedule a piece of code (typically a lambda) to be executed later by its own VirtualCore
's event loop.
-
How to Use: Call
qb::io::async::callback(your_lambda, delay_in_seconds);
from within an actor'sonInit()
, anon(Event&)
handler, or even anotheronCallback()
(if the actor inheritsqb::ICallback
).- A
delay_in_seconds
of0.0
(or omitting it) schedules the lambda for the next available iteration of the event loop.
- A
-
Execution Context: The provided lambda will execute on the same
VirtualCore
(and thus the same thread) as the actor that scheduled it. This means the lambda can safely interact with the actor's state if and only if the actor is still alive and proper checks are made. -
Capturing Actor Context (
this
orid()
):-
Crucial Safety Check: When a lambda captures
this
(orauto self_id = id();
), it is essential to checkthis->is_alive()
(orif (auto actor_ptr = VirtualCore::_handler->getActor(self_id)) { ... }
if only ID was captured and direct actor pointer needed, though this is less common for self-interaction) at the very beginning of the lambda's body. This is because the actor might be terminated and destroyed before the delayed callback gets a chance to execute. -
Sending Events Back to Self: Instead of complex direct state manipulation from the lambda, a robust pattern is for the lambda to
push
a new, specific event back to the scheduling actor itself. The actor then handles this event in a dedicatedon()
handler.
-
Crucial Safety Check: When a lambda captures
-
Common Use Cases for Actors:
- Implementing Timeouts: For an operation initiated by the actor, schedule a callback. If the operation doesn't complete (e.g., by an incoming response event clearing a pending flag) before the callback runs, the callback can trigger timeout logic.
-
Periodic Actions: A callback can reschedule itself to create periodic behavior (an alternative to using
qb::ICallback
). -
Breaking Down Complex Non-Blocking Tasks: If an actor needs to perform a series of non-blocking steps, it can execute one step and then schedule the next step via
async::callback
with a zero delay, effectively yielding control back to the event loop between steps. - Retry Mechanisms: After a failed attempt (e.g., network connection), schedule a retry attempt using a delayed callback.
- Simulating Work/Delays: Useful in examples or tests to introduce artificial processing times without actual blocking.
// Inside MyProcessingActor class
#include <qb/actor.h>
#include <qb/io/async.h> // For qb::io::async::callback
#include <qb/io.h> // For qb::io::cout
// Event to signal task completion or timeout
struct ProcessTaskCompleteEvent : qb::Event {
int task_id;
bool timed_out;
explicit ProcessTaskCompleteEvent(int id, bool timeout_status)
: task_id(id), timed_out(timeout_status) {}
};
class MyProcessingActor : public qb::Actor {
private:
std::map<int, bool> _pending_tasks; // task_id -> is_pending
int _next_task_id = 1;
public:
bool onInit() override {
registerEvent<ProcessTaskCompleteEvent>(*this);
registerEvent<qb::KillEvent>(*this);
// Example: Start a task immediately
startLongOperation(5.0); // Operation that we expect to complete or timeout in 5s
return true;
}
void startLongOperation(double timeout_seconds) {
int current_task_id = _next_task_id++;
_pending_tasks[current_task_id] = true;
qb::io::cout() << "Actor [" << id() << "]: Starting operation for task " << current_task_id
<< ", timeout in " << timeout_seconds << "s.\n";
// Simulate starting an external non-blocking operation here...
// Schedule a timeout check using async::callback
// Capture 'this' to access actor members and methods
qb::io::async::callback([this, current_task_id]() {
if (!this->is_alive()) { // Crucial check!
// qb::io::cout() << "Actor for task " << current_task_id << " no longer alive for timeout check.\n";
return;
}
// Check if the task is still marked as pending
if (_pending_tasks.count(current_task_id) && _pending_tasks[current_task_id]) {
// Task timed out, send a completion event indicating timeout
qb::io::cout() << "Actor [" << id() << "]: Task " << current_task_id << " timed out!\n";
this->push<ProcessTaskCompleteEvent>(this->id(), current_task_id, true);
_pending_tasks.erase(current_task_id); // Clean up
}
// If not pending, it means a response was received before timeout
}, timeout_seconds);
}
// Imagine this event is received from an external system or another actor
// when the long operation actually finishes (before timeout).
void onOperationActuallyCompleted(int task_id_completed) { // Not a qb::Event, but a conceptual method
if (_pending_tasks.count(task_id_completed) && _pending_tasks[task_id_completed]) {
qb::io::cout() << "Actor [" << id() << "]: Task " << task_id_completed << " completed successfully (before timeout).\n";
this->push<ProcessTaskCompleteEvent>(this->id(), task_id_completed, false);
_pending_tasks.erase(task_id_completed);
}
}
void on(const ProcessTaskCompleteEvent& event) {
qb::io::cout() << "Actor [" << id() << "]: Handling ProcessTaskCompleteEvent for task " << event.task_id
<< (event.timed_out ? " (TIMED OUT)" : " (SUCCESS)") << ".\n";
// Further processing based on completion or timeout...
}
void on(const qb::KillEvent& /*event*/) { kill(); }
};
(Reference: test-actor-delayed-events.cpp
, example10_distributed_computing.cpp
(extensive use for simulation and scheduling), file_processor/file_worker.h
(wrapping blocking calls).**)
While async::callback
is versatile, if an actor needs a simple inactivity timeout (i.e., it should terminate or take action if no relevant messages arrive for a period), inheriting from qb::io::async::with_timeout<DerivedActor>
can be an option.
-
How it Works:
-
Inheritance:
class MyActor : public qb::Actor, public qb::io::async::with_timeout<MyActor> { ... };
-
Constructor Initialization: Call the base
with_timeout(timeout_duration_seconds);
in your actor's constructor. -
Implement Timeout Handler:
void on(qb::io::async::event::timer const& event)
will be called when the timer expires. -
Reset on Activity: In your relevant
on(ApplicationEvent&)
handlers (those that signify activity), callthis->updateTimeout();
to reset the inactivity countdown.
-
Inheritance:
- Use Case: Terminating idle sessions, cleaning up actors that haven't received heartbeats or work for a while.
#include <qb/actor.h>
#include <qb/io/async.h> // For qb::io::async::with_timeout and event::timer
#include <qb/io.h>
struct ClientActivityPing : qb::Event {};
class SessionActorWithInactivity : public qb::Actor,
public qb::io::async::with_timeout<SessionActorWithInactivity> {
public:
SessionActorWithInactivity() : with_timeout(30.0) { // 30-second inactivity timeout
qb::io::cout() << "SessionActor [" << id() << "]: Created with 30s inactivity timeout.\n";
}
bool onInit() override {
registerEvent<ClientActivityPing>(*this);
registerEvent<qb::KillEvent>(*this);
updateTimeout(); // Start the initial countdown
return true;
}
void on(const ClientActivityPing& /*event*/) {
qb::io::cout() << "SessionActor [" << id() << "]: Ping received, activity confirmed. Resetting timeout.\n";
updateTimeout(); // Reset inactivity timer upon receiving a ping
}
// This handler is called by the with_timeout base when the timer expires
void on(qb::io::async::event::timer const& /*event*/) {
qb::io::cout() << "SessionActor [" << id() << "]: Inactivity timeout reached! Terminating session.\n";
kill(); // Terminate the actor due to inactivity
}
void on(const qb::KillEvent& /*event*/) {
setTimeout(0.0); // Disable timer explicitly during shutdown
kill();
}
~SessionActorWithInactivity() override {
qb::io::cout() << "SessionActor [" << id() << "] destroyed.\n";
}
};
(Reference: chat_tcp/server/ChatSession.h
uses this pattern, though it's a non-actor class in that example, the principle is the same. test-async-io.cpp::TimerHandler
also demonstrates with_timeout
.)**
Directly performing blocking file I/O (like std::fstream::read
or qb::io::sys::file::write
) within an actor's event handler or onCallback
is highly discouraged as it will block the VirtualCore
.
To handle file operations asynchronously from an actor:
-
Wrap Blocking Calls in
qb::io::async::callback
(for simpler cases):- The actor receives a request to read/write a file.
- It captures necessary parameters (file path, data, original requester ID) into a lambda.
- It schedules this lambda using
qb::io::async::callback
. - The lambda executes on the actor's
VirtualCore
but in a way that doesn't block the primary event processing flow for other events while the blocking call itself is in progress (though the callback itself will block its turn in the event loop queue). - After the blocking I/O in the lambda completes, the lambda should
push
a result event (containing data or error status) back to the original requester or to the scheduling actor itself. -
Caution: While this makes the actor appear non-blocking to other actors, the
VirtualCore
processing this specific callback will be blocked during the actual file I/O. This is suitable for infrequent or less performance-critical file operations.
-
Dedicated File Worker Actors (Recommended for complex or frequent I/O):
- Create specialized worker actors whose sole purpose is to perform file I/O.
- The main actor delegates file operation requests (as events) to these worker actors.
- Worker actors can use the
async::callback
pattern internally to perform the blocking I/O. - This isolates blocking operations to specific actors/cores, potentially configured with higher latency if they are expected to block often.
-
(Reference: The
example/core_io/file_processor/
provides a good example of this pattern.)**
-
Asynchronous File Watching (
qb::io::async::file_watcher
ordirectory_watcher
):- For scenarios where an actor needs to react to changes in the file system (file creation, modification, deletion) without polling.
- The actor (or a helper class it owns) can inherit from
qb::io::async::file_watcher<MyActor>
ordirectory_watcher<MyActor>
. - It then implements
void on(qb::io::async::event::file const& event)
to handle notifications. - The watcher is started with
this->start(path_to_watch, interval);
. -
(Reference: The
example/core_io/file_monitor/
demonstrates this pattern with aDirectoryWatcher
actor.)**
By using these asynchronous patterns, actors can effectively manage time-based logic and interact with potentially blocking resources like the file system without compromising the overall responsiveness and concurrency of the QB application.
(Next: Integrating Core & IO: Network Actors to see how actors handle network I/O.**)