Skip to content

Commit 5e18495

Browse files
authored
chore: add statemachine event handlers (#2363)
* chore: try out state machine event handlers chore: tests chore: keep the PR focused * chore: address PR feedback * chore: address feedback * chore: address lock holding concern
1 parent fd07258 commit 5e18495

File tree

4 files changed

+481
-33
lines changed

4 files changed

+481
-33
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package statemachine
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"runtime/debug"
7+
"time"
8+
)
9+
10+
var (
11+
_ EventHandler = &eventHandler{}
12+
)
13+
14+
// EventHandler is an interface for handling state transition events in the state machine.
15+
type EventHandler interface {
16+
// TriggerHandler triggers the event handler for a state transition.
17+
TriggerHandler(ctx context.Context, fromState, toState State) error
18+
}
19+
20+
// EventHandlerFunc is a function that handles state transition events. Used to report state changes.
21+
type EventHandlerFunc func(ctx context.Context, fromState, toState State)
22+
23+
// EventHandlerOption is a configurable state machine option.
24+
type EventHandlerOption func(*eventHandler)
25+
26+
// WithHandlerTimeout sets the timeout for the event handler to complete.
27+
func WithHandlerTimeout(timeout time.Duration) EventHandlerOption {
28+
return func(eh *eventHandler) {
29+
eh.timeout = timeout
30+
}
31+
}
32+
33+
// NewEventHandler creates a new event handler with the provided function and options.
34+
func NewEventHandler(handler EventHandlerFunc, options ...EventHandlerOption) EventHandler {
35+
eh := &eventHandler{
36+
handler: handler,
37+
timeout: 5 * time.Second, // Default timeout
38+
}
39+
40+
for _, option := range options {
41+
option(eh)
42+
}
43+
44+
return eh
45+
}
46+
47+
// eventHandler is a struct that implements the EventHandler interface. It contains a handler function that is called when a state transition occurs, and it supports a timeout for the handler to complete.
48+
type eventHandler struct {
49+
handler EventHandlerFunc
50+
timeout time.Duration // Timeout for the handler to complete, default is 5 seconds
51+
}
52+
53+
// TriggerHandler triggers the event handler for a state transition. The trigger is blocking and will wait for the handler to complete or timeout.
54+
func (eh *eventHandler) TriggerHandler(ctx context.Context, fromState, toState State) error {
55+
ctx, cancel := context.WithTimeout(ctx, eh.timeout)
56+
defer cancel()
57+
done := make(chan error, 1)
58+
59+
go func() {
60+
defer func() {
61+
if r := recover(); r != nil {
62+
// Capture panic but don't affect the transition
63+
err := fmt.Errorf("event handler panic from %s to %s: %v: %s\n", fromState, toState, r, debug.Stack())
64+
done <- err
65+
}
66+
close(done)
67+
}()
68+
eh.handler(ctx, fromState, toState)
69+
}()
70+
71+
select {
72+
case err := <-done:
73+
return err
74+
case <-ctx.Done():
75+
err := fmt.Errorf("event handler for transition from %s to %s timed out after %s", fromState, toState, eh.timeout)
76+
return err
77+
}
78+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package statemachine
2+
3+
// State represents the possible states of the install process
4+
type State string
5+
6+
var (
7+
_ Interface = &stateMachine{}
8+
)
9+
10+
// Interface is the interface for the state machine
11+
type Interface interface {
12+
// CurrentState returns the current state
13+
CurrentState() State
14+
// IsFinalState checks if the current state is a final state
15+
IsFinalState() bool
16+
// ValidateTransition checks if a transition from the current state to a new state is valid
17+
ValidateTransition(lock Lock, newState State) error
18+
// Transition attempts to transition to a new state and returns an error if the transition is
19+
// invalid.
20+
Transition(lock Lock, nextState State) error
21+
// AcquireLock acquires a lock on the state machine.
22+
AcquireLock() (Lock, error)
23+
// IsLockAcquired checks if a lock already exists on the state machine.
24+
IsLockAcquired() bool
25+
// RegisterEventHandler registers a blocking event handler for reporting events in the state machine.
26+
RegisterEventHandler(targetState State, handler EventHandlerFunc, options ...EventHandlerOption)
27+
// UnregisterEventHandler unregisters a blocking event handler for reporting events in the state machine.
28+
UnregisterEventHandler(targetState State)
29+
}
30+
31+
type Lock interface {
32+
// Release releases the lock.
33+
Release()
34+
}

api/internal/statemachine/statemachine.go

Lines changed: 63 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,48 @@
11
package statemachine
22

33
import (
4+
"context"
45
"fmt"
56
"slices"
67
"sync"
7-
)
8-
9-
// State represents the possible states of the install process
10-
type State string
118

12-
var (
13-
_ Interface = &stateMachine{}
9+
"github.com/replicatedhq/embedded-cluster/api/pkg/logger"
10+
"github.com/sirupsen/logrus"
1411
)
1512

16-
// Interface is the interface for the state machine
17-
type Interface interface {
18-
// CurrentState returns the current state
19-
CurrentState() State
20-
// IsFinalState checks if the current state is a final state
21-
IsFinalState() bool
22-
// ValidateTransition checks if a transition from the current state to a new state is valid
23-
ValidateTransition(lock Lock, newState State) error
24-
// Transition attempts to transition to a new state and returns an error if the transition is
25-
// invalid.
26-
Transition(lock Lock, nextState State) error
27-
// AcquireLock acquires a lock on the state machine.
28-
AcquireLock() (Lock, error)
29-
// IsLockAcquired checks if a lock already exists on the state machine.
30-
IsLockAcquired() bool
31-
}
32-
33-
type Lock interface {
34-
// Release releases the lock.
35-
Release()
36-
}
37-
3813
// stateMachine manages the state transitions for the install process
3914
type stateMachine struct {
4015
currentState State
4116
validStateTransitions map[State][]State
4217
lock *lock
4318
mu sync.RWMutex
19+
eventHandlers map[State][]EventHandler
20+
logger logrus.FieldLogger
4421
}
4522

23+
// StateMachineOption is a configurable state machine option.
24+
type StateMachineOption func(*stateMachine)
25+
4626
// New creates a new state machine starting in the given state with the given valid state
47-
// transitions.
48-
func New(currentState State, validStateTransitions map[State][]State) *stateMachine {
49-
return &stateMachine{
27+
// transitions and options.
28+
func New(currentState State, validStateTransitions map[State][]State, opts ...StateMachineOption) *stateMachine {
29+
sm := &stateMachine{
5030
currentState: currentState,
5131
validStateTransitions: validStateTransitions,
32+
logger: logger.NewDiscardLogger(),
33+
eventHandlers: make(map[State][]EventHandler),
34+
}
35+
36+
for _, opt := range opts {
37+
opt(sm)
38+
}
39+
40+
return sm
41+
}
42+
43+
func WithLogger(logger logrus.FieldLogger) StateMachineOption {
44+
return func(sm *stateMachine) {
45+
sm.logger = logger
5246
}
5347
}
5448

@@ -109,9 +103,13 @@ func (sm *stateMachine) ValidateTransition(lock Lock, nextState State) error {
109103
return nil
110104
}
111105

112-
func (sm *stateMachine) Transition(lock Lock, nextState State) error {
106+
func (sm *stateMachine) Transition(lock Lock, nextState State) (finalError error) {
113107
sm.mu.Lock()
114-
defer sm.mu.Unlock()
108+
defer func() {
109+
if finalError != nil {
110+
sm.mu.Unlock()
111+
}
112+
}()
115113

116114
if sm.lock == nil {
117115
return fmt.Errorf("lock not acquired")
@@ -123,11 +121,43 @@ func (sm *stateMachine) Transition(lock Lock, nextState State) error {
123121
return fmt.Errorf("invalid transition from %s to %s", sm.currentState, nextState)
124122
}
125123

124+
fromState := sm.currentState
126125
sm.currentState = nextState
127126

127+
// Trigger event handlers after successful transition
128+
handlers, exists := sm.eventHandlers[nextState]
129+
safeHandlers := make([]EventHandler, len(handlers))
130+
copy(safeHandlers, handlers) // Copy to avoid holding the lock while calling handlers
131+
132+
// We can release the lock here since the transition is successful and there will be no further operations to the state machine internal state
133+
sm.mu.Unlock()
134+
135+
if !exists || len(safeHandlers) == 0 {
136+
return nil
137+
}
138+
139+
for _, handler := range safeHandlers {
140+
err := handler.TriggerHandler(context.Background(), fromState, nextState)
141+
if err != nil {
142+
sm.logger.WithFields(logrus.Fields{"fromState": fromState, "toState": nextState}).Errorf("event handler error: %v", err)
143+
}
144+
}
145+
128146
return nil
129147
}
130148

149+
func (sm *stateMachine) RegisterEventHandler(targetState State, handler EventHandlerFunc, options ...EventHandlerOption) {
150+
sm.mu.Lock()
151+
defer sm.mu.Unlock()
152+
sm.eventHandlers[targetState] = append(sm.eventHandlers[targetState], NewEventHandler(handler, options...))
153+
}
154+
155+
func (sm *stateMachine) UnregisterEventHandler(targetState State) {
156+
sm.mu.Lock()
157+
defer sm.mu.Unlock()
158+
delete(sm.eventHandlers, targetState)
159+
}
160+
131161
func (sm *stateMachine) isValidTransition(currentState State, newState State) bool {
132162
validTransitions, ok := sm.validStateTransitions[currentState]
133163
if !ok {

0 commit comments

Comments
 (0)