-
Notifications
You must be signed in to change notification settings - Fork 2
Custom Transports
← Usage Scenarios | Working with Middleware →
In addition to the built-in RabbitMQ and in-memory transports, you can extend Messenger by adding support for your own transport (for example, Kafka, NATS or another message source/recipient). To do this, you need to implement several interfaces and register them before building Messenger:
-
Implement the transport interface. In the package
github.com/gerfey/messenger/api
theTransport
interface is defined (and optionallyRetryableTransport
). The transport must be able to send messages (Send
), receive messages (Receive
) and have a name (Name
). If you support retries, you can also implementRetry(ctx, env)
and mark the transport asapi.RetryableTransport
. For example, let's simplify and make a transport that writes messages to stdout.:
type ConsoleTransport struct {
name string
}
func (t *ConsoleTransport) Name() string {
return t.name
}
func (t *ConsoleTransport) Send(ctx context.Context, env api.Envelope) error {
fmt.Println("ConsoleTransport sent message:", env.Message())
return nil
}
func (t *ConsoleTransport) Receive(ctx context.Context, handler func(context.Context, api.Envelope) error) error {
// This transport does not support incoming messages from an external source,
// therefore, Receive can be implemented as waiting for context cancellation.
<-ctx.Done()
return ctx.Err()
}
func (t *Transport) Close() error {
return nil
}
// Optional: implement RetryableTransport
func (t *ConsoleTransport) Retry(ctx context.Context, env api.Envelope) error {
// In this example, we simply call Send again
return t.Send(ctx, env)
}
Here ConsoleTransport
is a simple example: the Send
method outputs a message to the console, Receive
does not actually do anything (it is assumed that the transport is outgoing), and Retry
simply resends the message in the same way. In real transport, the Receive
must receive data from an external system and call the handler function handler(ctx, env)
for each received message (for example, read from a socket, queue, etc.). Note that the env api.Envelope
contains both the message itself and metadata (stamps). You can create a new envelope or add a stamp before passing it to the handler (as in-memory transport does, for example, by adding a ReceivedStamp
.
-
Implement a transport factory. An object must be created that satisfies the
api.TransportFactory
interface. The factory is responsible for creating transport instances based on the settings from the configuration. The following methods are usually implemented:
-
Supports(dsn string) bool
– returns true if the factory is able to create a transport for this DSN (for example, checking the prefix). -
Create(name string, dsn string, options config.OptionsConfig) (api.Transport, error)
– creates and configures a transport.
Let's continue our example by making a factory for ConsoleTransport
that will respond to DSN with the prefix "console://"
:
type ConsoleTransportFactory struct{}
func (f *ConsoleTransportFactory) Supports(dsn string) bool {
return strings.HasPrefix(dsn, "console://")
}
func (f *ConsoleTransportFactory) Create(name string, dsn string, options []byte, serializer api.Serializer) (api.Transport, error) {
// You can extract parameters from DSN or options if necessary
transport := &ConsoleTransport{name: name}
return transport, nil
}
Here, the factory ignores the DSN details (except the prefix) and always creates a ConsoleTransport
. In a more complex case, you can parse the DSN (for example, the connection string) or use the options
fields from the YAML configuration to configure the transport.
-
Register the factory in Messenger. Before calling
Build()
, add your transport factory to the builder:
b := builder.NewBuilder(cfg, logger)
b.RegisterTransportFactory(&ConsoleTransportFactory{})
This will integrate your factory into the search chain. Inside Messenger, FactoryChain
is used: it checks every registered factory using Supports. The factory that returns true
for the DSN from the configuration will be used to create the transport. For example, if you add in the config:
transports:
console:
dsn: "console://"
routing:
messages.MyMessage: console
when building, Messenger will call your ConsoleTransportFactory
to create a console
transport. Make sure that the transport name in the config (console
) is unique and does not conflict with others.
-
Take into account the support of retrays and the completion of work. If your transport supports resending, implement
Retry
(as shown above) and make sure that it is marked with theapi.RetryableTransport
interface. Messenger will automatically connect an event listener that will intercept the event of sending an error message and call theRetry
method of your transport with a specified strategy (for example, with exponential delay). Also, iffailure_transport
is configured, Messenger will transfer it to this listener – and your repeat message in case of a final failure will be sent to the specified backup transport. Also, when Messenger is stopped (ctx
is canceled) your transport must complete theReceive
correctly. In the example above,Receive
is just waiting for cancellation, which is suitable for outgoing transport. For incoming transport (for example, reading from a socket), you need to exit the read loop whenctx.Done()
. -
Test the new transport. It is recommended to write a unit test or an example that verifies that your transport integrates correctly: messages are sent/received, retrays (if needed) are triggered, etc.
Adding a custom transport expands Messenger's capabilities without having to fork or modify its code – it's enough to implement several interfaces and register them. Thus, the system can easily adapt to your requirements for queues or message transfer protocols.