Skip to content

Commit 5b8eb52

Browse files
committed
feat(event): add handler.StartupQuery() option (closes #124)
feat(handler.go): add DefaultStartupQuery function to construct default query for event handler startup feat(handler.go): add startupQuery field to Handler struct to allow customization of startup query feat(handler.go): modify Startup function to accept query options and merge with default query feat(handler.go): add StartupQuery function to configure Handler's startup query feat(handler.go): modify New function to set default startupQuery if not provided feat(handler.go): modify startup function to use startupQuery when querying events from startupStore test(handler_test.go): add tests for new Startup and StartupQuery functions and their effects on event handling
1 parent 65e0bfd commit 5b8eb52

File tree

2 files changed

+220
-11
lines changed

2 files changed

+220
-11
lines changed

event/handler/handler.go

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@ import (
1818
// handler.
1919
var ErrRunning = errors.New("event handler is already running")
2020

21+
// DefaultStartupQuery constructs a default query for the startup of an event
22+
// handler. It uses the provided slice of event names to create a query that
23+
// sorts events by their timestamps. This function is typically used to
24+
// determine which events should be processed during the startup of an
25+
// event handler.
26+
func DefaultStartupQuery(events []string) query.Query {
27+
return query.New(query.Name(events...), query.SortByTime())
28+
}
29+
2130
// Handler is a type that processes events from an event bus. It associates
2231
// event names with specific functions, which are called whenever their
2332
// respective event occurs. Handler uses multiple workers to process events
@@ -30,6 +39,7 @@ var ErrRunning = errors.New("event handler is already running")
3039
type Handler struct {
3140
bus event.Bus
3241
startupStore event.Store
42+
startupQuery func(event.Query) event.Query
3343
workers int
3444

3545
mux sync.RWMutex
@@ -44,13 +54,37 @@ type Handler struct {
4454
// constructing a new [Handler] using the New function.
4555
type Option func(*Handler)
4656

47-
// Startup sets the startup event store for a [Handler]. This store is used to
48-
// handle events when the [Handler] starts up. The Startup option is typically
49-
// used to initialize the system with initial event handling on startup or
50-
// implement a "catch-up" mechanism for their event handlers.
51-
func Startup(store event.Store) Option {
57+
// Startup configures a [Handler] with a specified event store and options for
58+
// querying events. It is used to setup the event store that the [Handler] will
59+
// use to fetch events during startup. This can be used to initialize the system
60+
// with initial event handling on startup or implement a "catch-up" mechanism
61+
// for their event handlers. The query options allow customization of how the
62+
// events are fetched from the store. The returned [Option] can be used when
63+
// creating a new [Handler].
64+
//
65+
// If [query.Option]s are provided, they will be merged with the default query
66+
// using [query.Merge]. If you want to _replace_ the default query, use the
67+
// [StartupQuery] option instead of providing [query.Option]s to [Startup].
68+
func Startup(store event.Store, opts ...query.Option) Option {
5269
return func(h *Handler) {
5370
h.startupStore = store
71+
if len(opts) > 0 {
72+
StartupQuery(func(q event.Query) event.Query {
73+
return query.Merge(q, query.New(opts...))
74+
})(h)
75+
}
76+
}
77+
}
78+
79+
// StartupQuery is a function that configures a [Handler]'s startup query. It
80+
// accepts a function that takes and returns an event.Query as its argument. The
81+
// provided function will be used by the [Handler] to modify the default query
82+
// used when fetching events from the event store during startup. The resulting
83+
// [Option] can be used when constructing a new [Handler], allowing
84+
// customization of the startup behavior of the [Handler].
85+
func StartupQuery(fn func(event.Query) event.Query) Option {
86+
return func(h *Handler) {
87+
h.startupQuery = fn
5488
}
5589
}
5690

@@ -89,6 +123,11 @@ func New(bus event.Bus, opts ...Option) *Handler {
89123
if h.workers < 1 {
90124
h.workers = 1
91125
}
126+
127+
if h.startupQuery == nil && h.startupStore != nil {
128+
h.startupQuery = func(q event.Query) event.Query { return q }
129+
}
130+
92131
return h
93132
}
94133

@@ -205,10 +244,9 @@ func (h *Handler) handleEvents(ctx context.Context, events <-chan event.Event) <
205244
}
206245

207246
func (h *Handler) startup(ctx context.Context, eventNames []string) error {
208-
str, errs, err := h.startupStore.Query(ctx, query.New(
209-
query.Name(eventNames...),
210-
query.SortByTime(),
211-
))
247+
q := h.startupQuery(DefaultStartupQuery(eventNames))
248+
249+
str, errs, err := h.startupStore.Query(ctx, q)
212250
if err != nil {
213251
return fmt.Errorf("query events %v: %w", eventNames, err)
214252
}

event/handler/handler_test.go

Lines changed: 173 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ import (
55
"testing"
66
"time"
77

8+
"github.com/google/uuid"
89
"github.com/modernice/goes/event"
910
"github.com/modernice/goes/event/eventbus"
1011
"github.com/modernice/goes/event/eventstore"
1112
"github.com/modernice/goes/event/handler"
13+
"github.com/modernice/goes/event/query"
1214
"github.com/modernice/goes/event/test"
1315
)
1416

@@ -57,13 +59,13 @@ func TestHandler(t *testing.T) {
5759
}
5860
}
5961

60-
func TestWithStore(t *testing.T) {
62+
func TestStartup(t *testing.T) {
6163
ctx, cancel := context.WithCancel(context.Background())
6264
defer cancel()
6365

6466
bus := eventbus.New()
6567
store := eventstore.New()
66-
h := handler.New(bus, handler.WithStore(store))
68+
h := handler.New(bus, handler.Startup(store))
6769

6870
fooHandled := make(chan event.Of[test.FooEventData])
6971
barHandled := make(chan event.Of[test.BarEventData])
@@ -102,3 +104,172 @@ func TestWithStore(t *testing.T) {
102104
case <-barHandled:
103105
}
104106
}
107+
108+
func TestStartupQuery(t *testing.T) {
109+
bus := eventbus.New()
110+
store := eventstore.New()
111+
112+
h := handler.New(bus, handler.Startup(store), handler.StartupQuery(func(event.Query) event.Query {
113+
return query.New(query.Name("bar"))
114+
}))
115+
116+
fooHandled := make(chan event.Of[test.FooEventData])
117+
barHandled := make(chan event.Of[test.BarEventData])
118+
119+
h.RegisterEventHandler("foo", func(evt event.Event) { fooHandled <- event.Cast[test.FooEventData](evt) })
120+
h.RegisterEventHandler("bar", func(evt event.Event) { barHandled <- event.Cast[test.BarEventData](evt) })
121+
122+
if err := store.Insert(
123+
context.Background(),
124+
event.New("foo", test.FooEventData{}).Any(),
125+
event.New("bar", test.BarEventData{}).Any(),
126+
); err != nil {
127+
t.Fatalf("Insert() failed with %q", err)
128+
}
129+
130+
errs, err := h.Run(context.Background())
131+
if err != nil {
132+
t.Fatalf("Run() failed with %q", err)
133+
}
134+
135+
go func() {
136+
for err := range errs {
137+
panic(err)
138+
}
139+
}()
140+
141+
select {
142+
case <-time.After(time.Second):
143+
t.Fatalf("bar event was not handled")
144+
case <-barHandled:
145+
}
146+
147+
select {
148+
case <-time.After(50 * time.Millisecond):
149+
case <-fooHandled:
150+
t.Fatalf("foo event was handled")
151+
}
152+
}
153+
154+
func TestStartup_withQuery_merges_names(t *testing.T) {
155+
bus := eventbus.New()
156+
store := eventstore.New()
157+
158+
testID := uuid.New()
159+
160+
h := handler.New(bus, handler.Startup(store, query.Name("bar")))
161+
162+
fooHandled := make(chan event.Of[test.FooEventData])
163+
barHandled := make(chan event.Of[test.BarEventData])
164+
165+
h.RegisterEventHandler("foo", func(evt event.Event) {
166+
t.Log("Handling foo event")
167+
fooHandled <- event.Cast[test.FooEventData](evt)
168+
})
169+
h.RegisterEventHandler("bar", func(evt event.Event) {
170+
t.Log("Handling bar event")
171+
barHandled <- event.Cast[test.BarEventData](evt)
172+
})
173+
174+
t1 := time.Now().Add(time.Minute)
175+
t2 := t1.Add(time.Second)
176+
177+
if err := store.Insert(
178+
context.Background(),
179+
event.New("foo", test.FooEventData{}).Any(),
180+
event.New("bar", test.BarEventData{}, event.Time(t2)).Any(),
181+
event.New("bar", test.BarEventData{}, event.Time(t1), event.ID(testID)).Any(),
182+
); err != nil {
183+
t.Fatalf("Insert() failed with %q", err)
184+
}
185+
186+
errs, err := h.Run(context.Background())
187+
if err != nil {
188+
t.Fatalf("Run() failed with %q", err)
189+
}
190+
191+
go func() {
192+
for err := range errs {
193+
panic(err)
194+
}
195+
}()
196+
197+
select {
198+
case <-time.After(time.Second):
199+
t.Fatalf("foo event was not handled")
200+
case <-fooHandled:
201+
}
202+
203+
select {
204+
case <-time.After(time.Second):
205+
t.Fatalf("bar event was not handled #1")
206+
case evt := <-barHandled:
207+
if evt.ID() != testID {
208+
t.Fatalf("expected event ID %q; got %q", testID, evt.ID())
209+
}
210+
}
211+
212+
select {
213+
case <-time.After(time.Second):
214+
t.Fatalf("bar event was not handled #2")
215+
case evt := <-barHandled:
216+
if evt.ID() == testID {
217+
t.Fatalf("expected event ID not to be %q; got %q", testID, evt.ID())
218+
}
219+
}
220+
}
221+
222+
func TestStartup_withQuery_merges_ids(t *testing.T) {
223+
bus := eventbus.New()
224+
store := eventstore.New()
225+
226+
testID := uuid.New()
227+
228+
h := handler.New(bus, handler.Startup(store, query.ID(testID)))
229+
230+
fooHandled := make(chan event.Of[test.FooEventData])
231+
barHandled := make(chan event.Of[test.BarEventData])
232+
233+
h.RegisterEventHandler("foo", func(evt event.Event) {
234+
t.Log("Handling foo event")
235+
fooHandled <- event.Cast[test.FooEventData](evt)
236+
})
237+
h.RegisterEventHandler("bar", func(evt event.Event) {
238+
t.Log("Handling bar event")
239+
barHandled <- event.Cast[test.BarEventData](evt)
240+
})
241+
242+
if err := store.Insert(
243+
context.Background(),
244+
event.New("foo", test.FooEventData{}).Any(),
245+
event.New("bar", test.BarEventData{}, event.ID(testID)).Any(),
246+
); err != nil {
247+
t.Fatalf("Insert() failed with %q", err)
248+
}
249+
250+
errs, err := h.Run(context.Background())
251+
if err != nil {
252+
t.Fatalf("Run() failed with %q", err)
253+
}
254+
255+
go func() {
256+
for err := range errs {
257+
panic(err)
258+
}
259+
}()
260+
261+
select {
262+
case <-time.After(50 * time.Millisecond):
263+
case <-fooHandled:
264+
t.Fatalf("foo event was handled")
265+
}
266+
267+
select {
268+
case <-time.After(time.Second):
269+
t.Fatalf("bar event was not handled")
270+
case evt := <-barHandled:
271+
if evt.ID() != testID {
272+
t.Fatalf("expected event ID %q; got %q", testID, evt.ID())
273+
}
274+
}
275+
}

0 commit comments

Comments
 (0)