Skip to content

Commit 6e491ac

Browse files
committed
fix(command/cmdbus): track dispatch before publishing event
fix(builtin_test.go): prevent panic on context.Canceled error to avoid unnecessary panic when context is cancelled fix(handler_test.go): replace context.WithTimeout with context.WithCancel to avoid unnecessary timeout fix(bus.go): change WithArtificialDelay return type to Option for better type safety
1 parent 5cf3f58 commit 6e491ac

File tree

4 files changed

+13
-12
lines changed

4 files changed

+13
-12
lines changed

command/builtin/builtin_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package builtin_test
22

33
import (
44
"context"
5+
"errors"
56
"testing"
67
"time"
78

@@ -297,7 +298,9 @@ func TestDeleteAggregate_CustomEvent_MatchAll(t *testing.T) {
297298

298299
func panicOn(errs <-chan error) {
299300
for err := range errs {
300-
panic(err)
301+
if !errors.Is(err, context.Canceled) {
302+
panic(err)
303+
}
301304
}
302305
}
303306

command/cmdbus/bus.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -290,12 +290,6 @@ func (b *Bus[ErrorCode]) Dispatch(ctx context.Context, cmd command.Command, opts
290290
Payload: load,
291291
})
292292

293-
b.debugLog("publishing %q event ...", evt.Name())
294-
295-
if err := b.bus.Publish(ctx, evt.Any()); err != nil {
296-
return fmt.Errorf("publish %q event: %w", evt.Name(), err)
297-
}
298-
299293
out := make(chan error)
300294
accepted := make(chan struct{})
301295
aborted := make(chan struct{})
@@ -313,6 +307,12 @@ func (b *Bus[ErrorCode]) Dispatch(ctx context.Context, cmd command.Command, opts
313307

314308
defer b.cleanupDispatch(cmd.ID())
315309

310+
b.debugLog("publishing %q event ...", evt.Name())
311+
312+
if err := b.bus.Publish(ctx, evt.Any()); err != nil {
313+
return fmt.Errorf("publish %q event: %w", evt.Name(), err)
314+
}
315+
316316
var timeout <-chan time.Time
317317
if b.assignTimeout > 0 {
318318
timer := time.NewTimer(b.assignTimeout)

command/handler_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ func TestHandler_Handle(t *testing.T) {
2020
enc := newEncoder()
2121
ebus := eventbus.New()
2222
subBus := cmdbus.New[int](enc, ebus)
23-
pubBus := cmdbus.New[int](enc, ebus)
23+
pubBus := cmdbus.New[int](enc, ebus, cmdbus.AssignTimeout(0))
2424
h := command.NewHandler[any](subBus)
2525

26-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
26+
ctx, cancel := context.WithCancel(context.Background())
2727
defer cancel()
2828

2929
handled := make(chan command.Command)
@@ -52,9 +52,7 @@ func TestHandler_Handle(t *testing.T) {
5252
case err, ok := <-errs:
5353
if ok {
5454
t.Fatal(err)
55-
break
5655
}
57-
break
5856
case h := <-handled:
5957
if h.ID() != cmd.ID() || h.Name() != cmd.Name() || !reflect.DeepEqual(h.Payload(), cmd.Payload()) {
6058
t.Fatalf("handled Command differs from dispatched Command. want=%v got=%v", cmd, h)

event/eventbus/bus.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type Option func(*chanbus)
5353
// the rate of event publishing. The delay duration is specified by the provided
5454
// time.Duration value. The function returns an Option that can be used to
5555
// configure a chanbus instance.
56-
func WithArtificialDelay(delay time.Duration) func(*chanbus) {
56+
func WithArtificialDelay(delay time.Duration) Option {
5757
return func(c *chanbus) {
5858
c.artificialDelay = delay
5959
}

0 commit comments

Comments
 (0)