-
-
Notifications
You must be signed in to change notification settings - Fork 11
message_broker_analysis.md
-
Location:
example/core_io/message_broker/
- Objective: This example implements a fully functional, topic-based Publish/Subscribe (Pub/Sub) message broker. It demonstrates how QB actors can be used to build a robust messaging system with TCP-based client-server communication, custom protocol handling, and efficient message distribution to multiple subscribers.
Key learning points from this example include:
- Implementing a classic Pub/Sub architecture with actors.
- Efficiently fanning out messages to numerous subscribers using shared message payloads (
MessageContainer
). - Designing a custom binary protocol for diverse message types (
BrokerProtocol
). - Managing distributed state for topics and subscriptions in a central actor.
- Leveraging
qb::string_view
for zero-copy string processing during internal event forwarding.
The message_broker
system comprises a server (the broker) and multiple clients that can publish messages to topics or subscribe to receive messages from topics.
The server architecture is similar in structure to the chat_tcp
example, with distinct roles for accepting connections, managing client sessions, and handling the core broker logic.
-
AcceptActor
(server/AcceptActor.h/.cpp
)- Core (Typical): Core 0.
- Role: Listens for incoming TCP connections on a configured port (e.g., 12345).
-
QB Integration:
qb::Actor
,qb::io::use<AcceptActor>::tcp::acceptor
. -
Functionality: When a new client connects,
on(accepted_socket_type&& new_io)
is called. It then round-robins the newqb::io::tcp::socket
(wrapped in aNewSessionEvent
) to one of the availableServerActor
instances for session management.
-
ServerActor
(server/ServerActor.h/.cpp
)- Core (Typical): Core 1 (or a pool across multiple cores, e.g., 1 & 2).
-
Role: Manages a group of active client connections, represented by
BrokerSession
instances. It acts as an intermediary, forwarding client commands to theTopicManagerActor
and relaying messages from theTopicManagerActor
back to the appropriate clients. -
QB Integration:
qb::Actor
,qb::io::use<ServerActor>::tcp::server<BrokerSession>
(which providesio_handler
capabilities). -
Session Creation:
on(NewSessionEvent& evt)
callsregisterSession(std::move(evt.socket))
to create, start, and manage a newBrokerSession
. -
Efficient Command Forwarding: When a
BrokerSession
calls methods likeserver().handleSubscribe(id(), std::move(msg))
, theServerActor
often usesstd::move
for the incomingbroker::Message
. It then creates specialized events for theTopicManagerActor
(e.g.,SubscribeEvent
,PublishEvent
) that can utilizebroker::MessageContainer
(for shared ownership of message payloads) andstd::string_view
(for topic/content if parsed locally before forwarding). This minimizes unnecessary string copying, especially for message content being published.// Simplified from ServerActor::handlePublish // The BrokerSession has already parsed the topic and content into string_views // and put the original message into a MessageContainer. void ServerActor::handlePublish(qb::uuid session_id, broker::MessageContainer&& container, std::string_view topic, std::string_view content) { if (_topic_manager_id.is_valid()) { push<PublishEvent>(_topic_manager_id, session_id, id(), std::move(container), topic, content); } }
-
Delivering Messages to Clients:
on(SendMessageEvent& evt)
receives messages from theTopicManagerActor
. It looks up the targetBrokerSession
usingevt.target_session_id
and sends the actual message payload (accessed viaevt.message_container.message().payload
) to the client using the session'soperator<<
.
-
BrokerSession
(server/BrokerSession.h/.cpp
)-
Context: Managed by a
ServerActor
, runs on the sameVirtualCore
. - Role: Handles I/O and protocol parsing for a single connected client.
-
QB Integration:
qb::io::use<BrokerSession>::tcp::client<ServerActor>
,qb::io::use<BrokerSession>::timeout
. -
Protocol:
using Protocol = broker::BrokerProtocol<BrokerSession>;
(defined inshared/Protocol.h
). - **Command Processing (
on(broker::Message msg)
):- Receives a fully parsed
broker::Message
from itsBrokerProtocol
. - Crucially, it takes
broker::Message msg
by value to allowstd::move
for efficient forwarding. - Based on
msg.type
(SUBSCRIBE
,UNSUBSCRIBE
,PUBLISH
):- For
PUBLISH
, it first parses themsg.payload
to extracttopic
andcontent
asstd::string_view
s. - It then calls the appropriate handler on its parent
ServerActor
(e.g.,server().handleSubscribe(id(), std::move(msg))
, or for publish:server().handlePublish(id(), broker::MessageContainer(std::move(msg)), topic_view, content_view);
). This passes ownership of the message (or a shared container of it) efficiently.
- For
- Receives a fully parsed
-
Lifecycle: Notifies its
ServerActor
viahandleDisconnect()
on disconnection or inactivity timeout.
-
Context: Managed by a
-
TopicManagerActor
(server/TopicManagerActor.h/.cpp
)- Core (Typical): Core 2 (dedicated to core broker logic).
- Role: The heart of the broker. Manages topic subscriptions and efficiently routes published messages to all relevant subscribers.
-
State Management:
-
_sessions
: Mapsqb::uuid
(client session ID) toSessionInfo { qb::ActorId server_id }
(to know whichServerActor
manages that session). -
_subscriptions
: Mapsqb::string topic_name
tostd::set<qb::uuid> subscriber_session_ids
. -
_session_topics
: Mapsqb::uuid session_id
tostd::set<qb::string topic_name>
(for cleanup on disconnect).
-
-
Event Handling:
-
on(SubscribeEvent&)
: Addsevt.session_id
to the subscriber set forevt.topic_sv.data()
. Sends aRESPONSE
message back to the client via itsServerActor
. -
on(UnsubscribeEvent&)
: Removes the session from the topic's subscriber set. -
on(PublishEvent&)
: This is where efficient fan-out happens.- Formats the message to be broadcast (e.g., "topic:publisher_id:content").
-
Creates a single
broker::MessageContainer shared_message(...)
. This container likely holds astd::shared_ptr
to the actual formatted message string/payload. - Looks up all
subscriber_session_id
s for the givenevt.topic_sv.data()
. - For each subscriber, it retrieves their managing
ServerActor
's ID from_sessions
. -
push
es aSendMessageEvent(subscriber_session_id, managing_server_id, shared_message)
to the managingServerActor
. Becauseshared_message
is passed (likely by const-ref or by copying theMessageContainer
which shares the underlying payload), the actual message data is not copied for each subscriber.
// Simplified from TopicManagerActor::on(PublishEvent& evt) if (_subscriptions.count(topic_key)) { qb::string<512> formatted_msg_content; // Or std::string if payload is large // ... format message content using evt.publisher_id and evt.content_sv ... broker::MessageContainer shared_payload_container( broker::MessageType::MESSAGE, std::string(formatted_msg_content.c_str()) // Convert qb::string to std::string for MessageContainer ); for (qb::uuid subscriber_session_id : _subscriptions.at(topic_key)) { if (_sessions.count(subscriber_session_id)) { qb::ActorId target_server_id = _sessions.at(subscriber_session_id).server_id; push<SendMessageEvent>(target_server_id, subscriber_session_id, shared_payload_container); } } }
-
on(DisconnectEvent&)
: Removes the disconnected session from all its topic subscriptions and from the_sessions
map.
-
Similar to the chat_tcp
client:
-
InputActor
(client/InputActor.h/.cpp
): Handles console input, parses basic commands (SUB, UNSUB, PUB, QUIT, HELP), andpush
es aBrokerInputEvent
(containing the raw command string) to theClientActor
. -
ClientActor
(client/ClientActor.h/.cpp
):- Manages the TCP connection to the broker server using
qb::io::use<ClientActor>::tcp::client<>
. - Uses
BrokerProtocol
for message framing:using Protocol = broker::BrokerProtocol<ClientActor>;
. -
onInit()
: Connects to the server usingqb::io::async::tcp::connect
. -
on(BrokerInputEvent&)
: Further parses the raw command fromInputActor
. For example, for "PUB topicname message content", it extracts "topicname" and "message content". - Sends
SUBSCRIBE
,UNSUBSCRIBE
, orPUBLISH
messages (asbroker::Message
objects) to the server using*this << broker_message << Protocol::end;
. -
on(broker::Message&)
: HandlesRESPONSE
andMESSAGE
types from the server, printing information to the console. -
on(qb::io::async::event::disconnected const&)
: Handles disconnection and schedules reconnection attempts usingqb::io::async::callback
.
- Manages the TCP connection to the broker server using
-
Publish/Subscribe Architecture: A full implementation of the Pub/Sub pattern using a central
TopicManagerActor
. -
Efficient Message Fan-Out (Zero/Minimal Copy for Payloads):
- The
TopicManagerActor
demonstrates a crucial optimization: when broadcasting a published message to multiple subscribers, it creates the actual payload data (or a container for it likebroker::MessageContainer
which likely usesstd::shared_ptr
) once. Then, it sends events (SendMessageEvent
) to variousServerActor
s, each containing a reference or a shared pointer to this single payload. This avoids repeatedly copying potentially large message contents for every subscriber, which is critical for performance in systems with many subscribers to a topic. - The use of
std::move
inBrokerSession
when callingserver().handlePublish(...)
and inServerActor
when creatingPublishEvent
helps transfer ownership of message data efficiently towards theTopicManagerActor
.
- The
-
Custom Binary Protocol (
BrokerProtocol
): The example definesMessageHeader
andMessageType
to structure communication, showing how to build more complex protocols than simple delimiter-based ones. -
Use of
std::string_view
for Intermediate Processing:BrokerSession
andServerActor
usestd::string_view
to refer to parts of the message payload (like topic and content) without copying them before the data is packaged into theMessageContainer
or specific events for theTopicManagerActor
. -
Multi-Core Scalability: Distributing the
AcceptActor
,ServerActor
s (potentially multiple instances on different cores), and theTopicManagerActor
across differentVirtualCore
s allows the system to handle high connection rates and message throughput by parallelizing work. - Layered Responsibilities: Clear separation of concerns: connection acceptance, session I/O and protocol parsing, and topic/subscription management.
This message_broker
example is significantly more advanced than the chat_tcp
example in its message handling and provides excellent insights into building high-performance, scalable messaging systems with QB.
(Next Example Analysis: We have covered the main core_io
examples. Consider revisiting Developer Guides for broader patterns or the Reference Documentation section.**)