@@ -17,99 +17,119 @@ import (
17
17
wrpcnats "wrpc.io/go/nats"
18
18
)
19
19
20
- var _ store.Handler = & Handler {}
20
+ var (
21
+ errNoSuchStore = store .NewErrorNoSuchStore ()
22
+ errInvalidDataType = store .NewErrorOther ("invalid data type stored in map" )
23
+ )
21
24
22
25
type Handler struct {
23
- data map [Bucket ]map [string ][]uint8
24
- lock sync.Mutex
26
+ sync.Map
25
27
}
26
28
27
- type Bucket string
29
+ func Ok [T any ](v T ) * wrpc.Result [T , store.Error ] {
30
+ return wrpc.Ok [store.Error ](v )
31
+ }
28
32
29
33
func (h * Handler ) Open (ctx context.Context , identifier string ) (* wrpc.Result [wrpc.Own [store.Bucket ], store.Error ], error ) {
30
- h .lock .Lock ()
31
- defer h .lock .Unlock ()
34
+ slog .InfoContext (ctx , "handling `wasi:keyvalue/store.open`" , "identifier" , identifier )
35
+ h .LoadOrStore (string (identifier ), & sync.Map {})
36
+ return Ok (wrpc.Own [store.Bucket ](identifier )), nil
37
+ }
32
38
33
- if h .data == nil {
34
- h .data = make (map [Bucket ]map [string ][]uint8 )
39
+ func (h * Handler ) Bucket_Get (ctx context.Context , bucket wrpc.Borrow [store.Bucket ], key string ) (* wrpc.Result [[]byte , store.Error ], error ) {
40
+ slog .InfoContext (ctx , "handling `wasi:keyvalue/store.bucket.get`" , "bucket" , bucket , "key" , key )
41
+ v , ok := h .Load (string (bucket ))
42
+ if ! ok {
43
+ return wrpc.Err [[]byte ](* errNoSuchStore ), nil
35
44
}
36
- bucket := ( Bucket )( identifier )
37
- if _ , ok := h . data [ bucket ]; ! ok {
38
- h . data [ bucket ] = make ( map [ string ][] uint8 )
45
+ b , ok := v .( * sync. Map )
46
+ if ! ok {
47
+ return wrpc. Err [[] byte ]( * errInvalidDataType ), nil
39
48
}
40
- return wrpc.Ok [store.Error , wrpc.Own [store.Bucket ]](wrpc.Own [store.Bucket ](bucket )), nil
41
- }
42
-
43
- func (h * Handler ) Bucket_Get (ctx__ context.Context , self wrpc.Borrow [store.Bucket ], key string ) (* wrpc.Result [[]uint8 , store.Error ], error ) {
44
- h .lock .Lock ()
45
- defer h .lock .Unlock ()
46
-
47
- bucket := (Bucket )(self )
48
- if _ , ok := h .data [bucket ]; ! ok {
49
- err := store .NewErrorNoSuchStore ()
50
- return wrpc.Err [[]uint8 , store.Error ](* err ), err
49
+ v , ok = b .Load (key )
50
+ if ! ok {
51
+ return Ok ([]byte (nil )), nil
51
52
}
52
- value , ok := h. data [ bucket ][ key ]
53
+ buf , ok := v .([] byte )
53
54
if ! ok {
54
- return wrpc.Ok [store. Error , [] uint8 ]( nil ), nil
55
+ return wrpc.Err [[] byte ]( * errInvalidDataType ), nil
55
56
}
56
- return wrpc. Ok [store. Error , [] uint8 ]( value ), nil
57
+ return Ok ( buf ), nil
57
58
}
58
59
59
- func (h * Handler ) Bucket_Set (ctx__ context.Context , self wrpc.Borrow [store.Bucket ], key string , value []uint8 ) (* wrpc.Result [struct {}, store.Error ], error ) {
60
- h .lock .Lock ()
61
- defer h .lock .Unlock ()
62
-
63
- bucket := (Bucket )(self )
64
- if _ , ok := h .data [bucket ]; ! ok {
65
- err := store .NewErrorNoSuchStore ()
66
- return wrpc.Err [struct {}, store.Error ](* err ), err
60
+ func (h * Handler ) Bucket_Set (ctx context.Context , bucket wrpc.Borrow [store.Bucket ], key string , value []byte ) (* wrpc.Result [struct {}, store.Error ], error ) {
61
+ slog .InfoContext (ctx , "handling `wrpc:keyvalue/store.bucket.set`" , "bucket" , bucket , "key" , key , "value" , value )
62
+ v , ok := h .Load (string (bucket ))
63
+ if ! ok {
64
+ return wrpc.Err [struct {}](* errNoSuchStore ), nil
67
65
}
68
- h.data [bucket ][key ] = value
69
- return wrpc.Ok [store.Error , struct {}](struct {}{}), nil
66
+ b , ok := v .(* sync.Map )
67
+ if ! ok {
68
+ return wrpc.Err [struct {}](* errInvalidDataType ), nil
69
+ }
70
+ b .Store (key , value )
71
+ return Ok (struct {}{}), nil
70
72
}
71
73
72
- func (h * Handler ) Bucket_Delete (ctx__ context.Context , self wrpc.Borrow [store.Bucket ], key string ) (* wrpc.Result [struct {}, store.Error ], error ) {
73
- h .lock .Lock ()
74
- defer h .lock .Unlock ()
75
-
76
- bucket := (Bucket )(self )
77
- if _ , ok := h .data [bucket ]; ! ok {
78
- err := store .NewErrorNoSuchStore ()
79
- return wrpc.Err [struct {}, store.Error ](* err ), err
74
+ func (h * Handler ) Bucket_Delete (ctx context.Context , bucket wrpc.Borrow [store.Bucket ], key string ) (* wrpc.Result [struct {}, store.Error ], error ) {
75
+ slog .InfoContext (ctx , "handling `wrpc:keyvalue/store.bucket.delete`" , "bucket" , bucket , "key" , key )
76
+ v , ok := h .Load (string (bucket ))
77
+ if ! ok {
78
+ return wrpc.Err [struct {}](* errNoSuchStore ), nil
80
79
}
81
- delete (h .data [bucket ], key )
82
-
83
- return wrpc.Ok [store.Error , struct {}](struct {}{}), nil
80
+ b , ok := v .(* sync.Map )
81
+ if ! ok {
82
+ return wrpc.Err [struct {}](* errInvalidDataType ), nil
83
+ }
84
+ b .Delete (key )
85
+ return Ok (struct {}{}), nil
84
86
}
85
87
86
- func (h * Handler ) Bucket_Exists (ctx__ context.Context , self wrpc.Borrow [store.Bucket ], key string ) (* wrpc.Result [bool , store.Error ], error ) {
87
- h .lock .Lock ()
88
- defer h .lock .Unlock ()
89
-
90
- bucket := (Bucket )(self )
91
- if _ , ok := h .data [bucket ]; ! ok {
92
- err := store .NewErrorNoSuchStore ()
93
- return wrpc.Err [bool , store.Error ](* err ), err
88
+ func (h * Handler ) Bucket_Exists (ctx context.Context , bucket wrpc.Borrow [store.Bucket ], key string ) (* wrpc.Result [bool , store.Error ], error ) {
89
+ slog .InfoContext (ctx , "handling `wrpc:keyvalue/store.bucket.exists`" , "bucket" , bucket , "key" , key )
90
+ v , ok := h .Load (string (bucket ))
91
+ if ! ok {
92
+ return wrpc.Err [bool ](* errNoSuchStore ), nil
93
+ }
94
+ b , ok := v .(* sync.Map )
95
+ if ! ok {
96
+ return wrpc.Err [bool ](* errInvalidDataType ), nil
94
97
}
95
- _ , ok := h. data [ bucket ][ key ]
96
- return wrpc. Ok [store. Error , bool ] (ok ), nil
98
+ _ , ok = b . Load ( key )
99
+ return Ok (ok ), nil
97
100
}
98
101
99
- func (h * Handler ) Bucket_ListKeys (ctx__ context.Context , self wrpc.Borrow [store.Bucket ], cursor * uint64 ) (* wrpc.Result [store.KeyResponse , store.Error ], error ) {
100
- h . lock . Lock ( )
101
- defer h . lock . Unlock ()
102
-
103
- bucket := ( Bucket )( self )
104
- if _ , ok := h .data [ bucket ]; ! ok {
105
- err := store . NewErrorNoSuchStore ()
106
- return wrpc.Err [store.KeyResponse , store. Error ](* err ), err
102
+ func (h * Handler ) Bucket_ListKeys (ctx context.Context , bucket wrpc.Borrow [store.Bucket ], cursor * uint64 ) (* wrpc.Result [store.KeyResponse , store.Error ], error ) {
103
+ slog . InfoContext ( ctx , "handling `wrpc:keyvalue/store.bucket.list-keys`" , "bucket" , bucket , "cursor" , cursor )
104
+ if cursor != nil {
105
+ return wrpc. Err [store. KeyResponse ]( * store . NewErrorOther ( "cursors are not supported" )), nil
106
+ }
107
+ v , ok := h .Load ( string ( bucket ))
108
+ if ! ok {
109
+ return wrpc.Err [store.KeyResponse ](* errNoSuchStore ), nil
107
110
}
108
- keyResponse := store.KeyResponse {Keys : []string {}}
109
- for k := range h .data [bucket ] {
110
- keyResponse .Keys = append (keyResponse .Keys , k )
111
+ b , ok := v .(* sync.Map )
112
+ if ! ok {
113
+ return wrpc.Err [store.KeyResponse ](* errInvalidDataType ), nil
114
+ }
115
+ var keys []string
116
+ var err * store.Error
117
+ b .Range (func (k , _ any ) bool {
118
+ s , ok := k .(string )
119
+ if ! ok {
120
+ err = errInvalidDataType
121
+ return false
122
+ }
123
+ keys = append (keys , s )
124
+ return true
125
+ })
126
+ if err != nil {
127
+ return wrpc.Err [store.KeyResponse ](* err ), nil
111
128
}
112
- return wrpc.Ok [store.Error , store.KeyResponse ](keyResponse ), nil
129
+ return Ok (store.KeyResponse {
130
+ Keys : keys ,
131
+ Cursor : nil ,
132
+ }), nil
113
133
}
114
134
115
135
func run () error {
@@ -131,15 +151,15 @@ func run() error {
131
151
client := wrpcnats .NewClient (nc , wrpcnats .WithPrefix ("go" ))
132
152
stop , err := server .Serve (client , & Handler {})
133
153
if err != nil {
134
- return fmt .Errorf ("failed to serve `keyvalue ` world: %w" , err )
154
+ return fmt .Errorf ("failed to serve `server ` world: %w" , err )
135
155
}
136
156
137
157
signalCh := make (chan os.Signal , 1 )
138
158
signal .Notify (signalCh , syscall .SIGINT )
139
159
<- signalCh
140
160
141
161
if err = stop (); err != nil {
142
- return fmt .Errorf ("failed to stop `keyvalue ` world: %w" , err )
162
+ return fmt .Errorf ("failed to stop `server ` world: %w" , err )
143
163
}
144
164
return nil
145
165
}
0 commit comments