Skip to content

Commit 388b89f

Browse files
committed
✨ feat: added redis keyspace notification #4
1 parent 072280f commit 388b89f

File tree

6 files changed

+121
-16
lines changed

6 files changed

+121
-16
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ go.work
3131
.DS_Store
3232

3333
# Main
34+
main/
3435
main.go
3536

3637
# Logs

.vscode/launch.json

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
{
2-
// Use IntelliSense to learn about possible attributes.
3-
// Hover to view descriptions of existing attributes.
4-
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
5-
"version": "0.2.0",
6-
"configurations": [
7-
{
8-
"name": "Launch Package",
9-
"type": "go",
10-
"request": "launch",
11-
"mode": "auto",
12-
"program": "${workspaceFolder}/main.go"
13-
}
14-
]
15-
}
2+
// Use IntelliSense to learn about possible attributes.
3+
// Hover to view descriptions of existing attributes.
4+
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
5+
"version": "0.2.0",
6+
"configurations": [
7+
{
8+
"name": "Launch Package",
9+
"type": "go",
10+
"request": "launch",
11+
"mode": "auto",
12+
"program": "${workspaceFolder}/main/main.go"
13+
}
14+
]
15+
}

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
# ==============================================================================
44
# Start Rest
55
run:
6-
go run main.go
6+
go run main/main.go
77

88
build:
9-
go build main.go
9+
go build main/main.go
1010

1111
# ==============================================================================
1212
# Modules support

example/redisconn_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,23 @@ func TestConsumeCallback(t *testing.T) {
130130
return
131131
}
132132
}
133+
134+
func TestKeySpaceNotification(t *testing.T) {
135+
r, _ := createConn()
136+
svc := redisconn.NewRedisService(r.GetConn())
137+
err := svc.Set("_redis_keyspace_key_", 123, time.Second*5)
138+
if err != nil {
139+
logger.Errorf("Setting key on redis got an error", err)
140+
return
141+
}
142+
svc.SyncKeySpace().AddCallback(func(msg *redis.Message, err error) {
143+
if err != nil {
144+
logger.Errorf("Redis KeySpace Notification got an error", err)
145+
return
146+
}
147+
logger.Infof(msg.String())
148+
})
149+
svc.SyncKeySpace().Register()
150+
time.Sleep(15 * time.Second) // waiting to done
151+
svc.SyncKeySpace().Unregister() // stop watching
152+
}

redisconn_keyspace.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package redisconn
2+
3+
import (
4+
"sync"
5+
6+
"github.com/go-redis/redis"
7+
)
8+
9+
type RedisKeySpaceService interface {
10+
AddCallback(c func(message *redis.Message, err error)) RedisKeySpaceService
11+
Register()
12+
Unregister()
13+
PatternKeySpace() string
14+
PatternKeyEvent() string
15+
}
16+
17+
type redisKeySpaceServiceImpl struct {
18+
client *redis.Client
19+
callback []func(message *redis.Message, err error)
20+
stop chan struct{}
21+
mutex sync.Mutex
22+
}
23+
24+
func NewRedisKeySpaceService(client *redis.Client) RedisKeySpaceService {
25+
return &redisKeySpaceServiceImpl{
26+
client: client,
27+
callback: make([]func(event *redis.Message, err error), 0),
28+
stop: make(chan struct{}),
29+
}
30+
}
31+
32+
func (s *redisKeySpaceServiceImpl) AddCallback(c func(message *redis.Message, err error)) RedisKeySpaceService {
33+
s.mutex.Lock()
34+
defer s.mutex.Unlock()
35+
s.callback = append(s.callback, c)
36+
return s
37+
}
38+
39+
func (s *redisKeySpaceServiceImpl) Register() {
40+
go s.run()
41+
}
42+
43+
func (s *redisKeySpaceServiceImpl) Unregister() {
44+
close(s.stop)
45+
}
46+
47+
func (s *redisKeySpaceServiceImpl) PatternKeySpace() string {
48+
return "__keyspace@*__:*"
49+
}
50+
51+
func (s *redisKeySpaceServiceImpl) PatternKeyEvent() string {
52+
return "__keyevent@*__:*"
53+
}
54+
55+
func (s *redisKeySpaceServiceImpl) run() {
56+
pattern := []string{s.PatternKeySpace(), s.PatternKeyEvent()}
57+
pubsub := s.client.PSubscribe(pattern...)
58+
for {
59+
select {
60+
case <-s.stop:
61+
_ = pubsub.Close()
62+
return
63+
default:
64+
msg, err := pubsub.ReceiveMessage()
65+
s.hook(msg, err)
66+
}
67+
}
68+
}
69+
70+
func (s *redisKeySpaceServiceImpl) hook(message *redis.Message, err error) {
71+
s.mutex.Lock()
72+
defer s.mutex.Unlock()
73+
for _, callback := range s.callback {
74+
callback(message, err)
75+
}
76+
}

redisconn_service.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,15 @@ type RedisService interface {
2424

2525
// sync pubsub hook
2626
SyncPubSub() RedisPubSubService
27+
// sync keyspace notification hook
28+
SyncKeySpace() RedisKeySpaceService
2729
}
2830

2931
type redisServiceImpl struct {
3032
redisConn *redis.Client
3133
mutex *RedisMutex
3234
pubsub RedisPubSubService
35+
keyspace RedisKeySpaceService
3336
}
3437

3538
func newRedisMutex() *RedisMutex {
@@ -41,6 +44,7 @@ func NewRedisService(redisConn *redis.Client) RedisService {
4144
redisConn: redisConn,
4245
mutex: newRedisMutex(),
4346
pubsub: NewRedisPubSub(redisConn),
47+
keyspace: NewRedisKeySpaceService(redisConn),
4448
}
4549
return s
4650
}
@@ -221,3 +225,7 @@ func (r *redisServiceImpl) Handler() *redis.Client {
221225
func (r *redisServiceImpl) SyncPubSub() RedisPubSubService {
222226
return r.pubsub
223227
}
228+
229+
func (r *redisServiceImpl) SyncKeySpace() RedisKeySpaceService {
230+
return r.keyspace
231+
}

0 commit comments

Comments
 (0)