Skip to content

Custom Transports

Gerfey edited this page Aug 14, 2025 · 4 revisions

Usage Scenarios | Working with Middleware


In addition to the built-in RabbitMQ and in-memory transports, you can extend Messenger by adding support for your own transport (for example, Kafka, NATS or another message source/recipient). To do this, you need to implement several interfaces and register them before building Messenger:

  1. Implement the transport interface. In the package github.com/gerfey/messenger/api the Transport interface is defined (and optionally RetryableTransport). The transport must be able to send messages (Send), receive messages (Receive) and have a name (Name). If you support retries, you can also implement Retry(ctx, env) and mark the transport as api.RetryableTransport. For example, let's simplify and make a transport that writes messages to stdout.:
type ConsoleTransport struct {
    name string
}

func (t *ConsoleTransport) Name() string { 
    return t.name 
}

func (t *ConsoleTransport) Send(ctx context.Context, env api.Envelope) error {
    fmt.Println("ConsoleTransport sent message:", env.Message())
    return nil
}

func (t *ConsoleTransport) Receive(ctx context.Context, handler func(context.Context, api.Envelope) error) error {
    // This transport does not support incoming messages from an external source,
    // therefore, Receive can be implemented as waiting for context cancellation.
    <-ctx.Done()
    return ctx.Err()
}

func (t *Transport) Close() error {
	return nil
}

// Optional: implement RetryableTransport
func (t *ConsoleTransport) Retry(ctx context.Context, env api.Envelope) error {
    // In this example, we simply call Send again
    return t.Send(ctx, env)
}

Here ConsoleTransport is a simple example: the Send method outputs a message to the console, Receive does not actually do anything (it is assumed that the transport is outgoing), and Retry simply resends the message in the same way. In real transport, the Receive must receive data from an external system and call the handler function handler(ctx, env) for each received message (for example, read from a socket, queue, etc.). Note that the env api.Envelope contains both the message itself and metadata (stamps). You can create a new envelope or add a stamp before passing it to the handler (as in-memory transport does, for example, by adding a ReceivedStamp.

  1. Implement a transport factory. An object must be created that satisfies the api.TransportFactory interface. The factory is responsible for creating transport instances based on the settings from the configuration. The following methods are usually implemented:
  • Supports(dsn string) bool – returns true if the factory is able to create a transport for this DSN (for example, checking the prefix).
  • Create(name string, dsn string, options config.OptionsConfig) (api.Transport, error) – creates and configures a transport.

Let's continue our example by making a factory for ConsoleTransport that will respond to DSN with the prefix "console://":

type ConsoleTransportFactory struct{}

func (f *ConsoleTransportFactory) Supports(dsn string) bool {
    return strings.HasPrefix(dsn, "console://")
}

func (f *ConsoleTransportFactory) Create(name string, dsn string, options []byte, serializer api.Serializer) (api.Transport, error) {
    // You can extract parameters from DSN or options if necessary
    transport := &ConsoleTransport{name: name}
    return transport, nil
}

Here, the factory ignores the DSN details (except the prefix) and always creates a ConsoleTransport. In a more complex case, you can parse the DSN (for example, the connection string) or use the options fields from the YAML configuration to configure the transport.

  1. Register the factory in Messenger. Before calling Build(), add your transport factory to the builder:
b := builder.NewBuilder(cfg, logger)
b.RegisterTransportFactory(&ConsoleTransportFactory{})

This will integrate your factory into the search chain. Inside Messenger, FactoryChain is used: it checks every registered factory using Supports. The factory that returns true for the DSN from the configuration will be used to create the transport. For example, if you add in the config:

transports:
  console:
    dsn: "console://"
routing:
  messages.MyMessage: console

when building, Messenger will call your ConsoleTransportFactory to create a console transport. Make sure that the transport name in the config (console) is unique and does not conflict with others.

  1. Take into account the support of retrays and the completion of work. If your transport supports resending, implement Retry (as shown above) and make sure that it is marked with the api.RetryableTransport interface. Messenger will automatically connect an event listener that will intercept the event of sending an error message and call the Retry method of your transport with a specified strategy (for example, with exponential delay). Also, if failure_transport is configured, Messenger will transfer it to this listener – and your repeat message in case of a final failure will be sent to the specified backup transport. Also, when Messenger is stopped (ctx is canceled) your transport must complete the Receive correctly. In the example above, Receive is just waiting for cancellation, which is suitable for outgoing transport. For incoming transport (for example, reading from a socket), you need to exit the read loop when ctx.Done().

  2. Test the new transport. It is recommended to write a unit test or an example that verifies that your transport integrates correctly: messages are sent/received, retrays (if needed) are triggered, etc.

Adding a custom transport expands Messenger's capabilities without having to fork or modify its code – it's enough to implement several interfaces and register them. Thus, the system can easily adapt to your requirements for queues or message transfer protocols.


Usage Scenarios | Working with Middleware

Clone this wiki locally