A tiny wrapper on top of RabbitMQ allowing for pub/sub and RPC messages.
Depends on:
kotlinx.serialization
,
kotlinx.coroutines
,
and amqp-client
It is recommended to use any SLF4J implementation, like
logback
,
to enable logging for this project. To enable Kotlin reflection in logs, add
kotlin-reflect
to your project.
Install with Gradle:
repositories {
maven(url = "https://jitpack.io")
}
dependencies {
implementation("com.github.bluedragonmc:messagingsystem:$version")
}
All message classes must be serializable with
kotlinx.serialization
and must implement Message
.
UUIDs have a contextual serializer, so they can be serialized by adding the @Contextual
annotation to their declarations.
@Serializable // Make sure this is `kotlinx.serialization.Serializable` and not `java.io.Serializable`
data class MyMessage(val greeting: String) : Message
@Serializable
data class MyResponse(val welcome: String) : Message
@Serializable
data class UUIDExample(val username: String, @Contextual val uuid: UUID) : Message
/*
Due to a security restriction of kotlinx.serialization, ALL messages must be registered as subclasses using a polymorphic module builder.
This will be passed to the AMQPClient in the next step.
*/
val polymorphicModuleBuilder: PolymorphicModuleBuilder<Message>.() -> Unit = {
subclass(MyMessage::class)
subclass(MyResponse::class)
subclass(UUIDExample::class)
}
// RabbitMQ's default port is 5672
val client = AMQPClient(hostname = "127.0.0.1", port = 5672, polymorphicModuleBuilder = polymorphicModuleBuilder)
// Every client will only make one connection and open two channels: one for pub/sub and one for RPC.
// This instance should be kept and used for every method call.
Resources should be closed when you are done using them:
client.close()
// This will close the two created channels and the connection.
// `AMQPClient`s cannot be used after they are closed.
AMQPClient
also implements the Closeable
interface, so it can be used in a use
block. It will be closed automatically after the block has finished executing.
System properties will be used if no value is provided to the parameter. If a system property value was not found, the default is used. See AMQPClient for descriptions of each of these properties.
Property | System Property | Default |
---|---|---|
hostname: String | rabbitmq_host |
"rabbitmq" |
port: Int | rabbitmq_port |
5672 |
serializersModuleBuilder: SerializersModuleBuilder.() -> Unit | {} | |
polymorphicModuleBuilder: PolymorphicModuleBuilder.() -> Unit | ||
exchangeName: String | rabbitmq_exchange_name |
"bluedragon" |
rpcExchangeName: String | rabbitmq_rpc_exchange_name |
"" |
routingKey: String | rabbitmq_routing_key |
"" |
rpcQueueName: String | rabbitmq_rpc_queue_name |
"rpc_queue" |
connectionName: String? | Value of AMQPClient#toString |
|
writeOnly: Boolean | false |
ℹ️ ️polymorphicModuleBuilder
, exchangeName
, rpcExchangeName
, routingKey
, rpcQueueName
should use the same values for all instances of this program. If not, some messages may not be received properly.
client.subscribe(MyMessage::class) { message ->
// `message` is guaranteed to be of type MyMessage
logger.info("Greeting received: ${message.greeting}")
}
// Unsubscribe
client.unsubscribe(MyMessage::class)
client.publish(MyMessage("Hello, world!"))
// This message will be passed to all subscribers of MyMessage.
RPC stands for remote-procedure call, and it is used to perform operations on a remote server and return a response.
The usage is very similar to pub/sub messaging, but all listeners must return a Message
as a response.
client.subscribeRPC(MyMessage::class) { message ->
// Just like in the `subscribe()` example, `message` is guaranteed to be MyMessage
return MyResponse("Welcome!") // This message will be returned to the sender.
// It can be any subclass of `Message`.
}
// Unsubscribe
client.unsubscribeRPC(MyMessage::class)
ℹ️ This method is a suspend fun
, so it must be called from a coroutine or another suspend function.
val response = client.publishAndReceive(MyMessage("Hello, world!"))
// `response` can be any subclass of `Message`, so manual type checking is required
If an uncaught exception occurs in an RPC handler, an RPCErrorMessage
is sent back to the receiver, which will cause an RPCMessagingException
to be thrown on the receiving end.
This signifies the exception occurred on the server, not the client.
The RabbitMQ connection and channels are not initialized until they are first used. If you want to pre-initialize them, use client.preInitialize()
This project is far from production-ready and should not be trusted for mission-critical data. It was built to meet the needs of BlueDragon, with brevity and simplicity in mind over reliability.