CMQ-1 is a lightweight, in-memory mock message queue implementation for testing Go applications. It provides two message delivery semantics inspired by NATS:
- At-most-once (Subscribers): Fast, channel-based delivery where messages may be dropped if the subscriber is slow
- At-least-once (Consumers): Persistent stream-based delivery with offset tracking for guaranteed message delivery
Key Features:
- Zero external dependencies (Go standard library only)
- Thread-safe operations with proper synchronization
- Generic types support for type-safe message handling
- Context-based lifecycle management
- Configurable polling intervals for consumers
- Topic-based message routing with stream filtering
go get github.com/1995parham-learning/cmq-1import (
"context"
"github.com/1995parham-learning/cmq-1/pkg/cmq"
)
// Create a message queue
mmq := cmq.NewMockMessageQueue[string]()
// Subscribe to a topic
sub := mmq.Subscribe("notifications")
// Publish a message
mmq.Publish("notifications", "Hello, World!")
// Fetch messages
ctx := context.Background()
msg, err := sub.Fetch(ctx)
if err != nil {
// Handle error (e.g., context cancelled)
}import (
"context"
"time"
"github.com/1995parham-learning/cmq-1/pkg/cmq"
"github.com/1995parham-learning/cmq-1/pkg/stream"
)
// Create a message queue
mmq := cmq.NewMockMessageQueue[int]()
// Create a stream that filters messages for specific topics
str, err := stream.New[int]("my-stream", []string{"events", "alerts"})
if err != nil {
panic(err)
}
// Register the stream with the queue
if err := mmq.Stream(str); err != nil {
panic(err)
}
// Publish some messages
mmq.Publish("events", 1)
mmq.Publish("alerts", 2)
mmq.Publish("other", 3) // Won't be stored in the stream
// Create a consumer for the stream
consumer, err := mmq.Consume("my-stream")
if err != nil {
panic(err)
}
// Start the consumer (spawns background goroutine)
consumer.Start()
defer consumer.Stop()
// Wait for messages to arrive
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := consumer.Wait(ctx); err != nil {
panic(err)
}
// Fetch messages
for {
msg, ok := consumer.Fetch()
if !ok {
break
}
println(msg)
}By default, consumers poll the stream every second. You can customize this:
import "github.com/1995parham-learning/cmq-1/pkg/consumer"
// Create consumer with 100ms polling interval
str, _ := stream.New[string]("fast-stream", []string{"events"})
c := consumer.NewWithInterval(str, 100*time.Millisecond)Instead of manually calling Stop(), use context-based lifecycle:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
consumer.StartWithContext(ctx)
// Consumer automatically stops when context is cancelledMultiple consumers can read from the same stream independently:
str, _ := stream.New[int]("shared", []string{"topic"})
mmq.Stream(str)
con1, _ := mmq.Consume("shared")
con2, _ := mmq.Consume("shared")
con1.Start()
con2.Start()
// Both consumers maintain independent offsetstype CMQ[T any] interface {
Publish(topic string, message T)
Subscribe(topic string) subscriber.Subscriber[T]
Consume(stream string) (*consumer.Consumer[T], error)
Stream(stream *stream.Stream[T]) error
}Start()- Begin background polling (idempotent)StartWithContext(ctx)- Begin polling with context-based cancellationStop()- Stop background polling (safe to call multiple times)Wait(ctx)- Block until messages are available (reusable)Fetch()- Retrieve next message from queue
Fetch(ctx)- Block until message arrives or context cancelled
New[T](name, topics)- Create stream with topic filtersInsert(message)- Add message to streamFetch(index)- Retrieve message at offsetHas(topic)- Check if stream filters for topic
var (
ErrDuplicateStream = errors.New("stream with the same name already exists")
ErrStreamNotFound = errors.New("stream with given name does not exist")
ErrIndexOutOfRange = errors.New("index out of range")
ErrEmptyTopics = errors.New("topics list cannot be empty")
)Subscribers (At-Most-Once):
- Use unbuffered channels by default
- Messages are dropped if subscriber is slow
- Similar to NATS pub-sub behavior
- Best for non-critical notifications
Consumers (At-Least-Once):
- Messages stored persistently in streams
- Each consumer maintains its own offset
- Polling-based retrieval (configurable interval)
- Best for critical event processing
All operations are protected by appropriate locks:
RWMutexfor read-heavy operations (message queue, streams)Mutexfor consumer state- Non-blocking channel operations with
select
All components use Go 1.18+ generics for type safety:
mmq := cmq.NewMockMessageQueue[MyStruct]()# Run tests
go test ./...
# Run with coverage
go test -cover ./...
# Run with race detector
go test -race ./...This is a learning project. Feel free to:
- Report issues
- Suggest improvements
- Submit pull requests
See LICENSE file for details.
Inspired by NATS messaging system concepts.