Skip to content
This repository was archived by the owner on Mar 9, 2025. It is now read-only.

Commit 84f7c54

Browse files
resolve issue #110 (#111)
* new methods 'Router.Call' to replace the deprecated one 'RouterCallImpl' * new backward-compatible signature for StorageResultTypedFunc to fix interface for RouterCallImpl
1 parent be1cfc4 commit 84f7c54

File tree

6 files changed

+320
-67
lines changed

6 files changed

+320
-67
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ FEATURES:
1919
* Add pause between requests in buckets discovering. Configured by config DiscoveryWorkStep, default is 10ms.
2020
* Add ReplicaUUID to the StorageCallVShardError struct.
2121
* New method 'RouterMapCallRW[T]' to replace the deprecated one 'RouterMapCallRWImpl'.
22+
* New method 'Router.Call' to replace the deprecated one 'RouterCallImpl'.
2223

2324
REFACTOR:
2425

@@ -29,6 +30,7 @@ REFACTOR:
2930
* Add custom msgpackv5 decoder for 'vshard.storage.bucket_stat' response (partially #100).
3031
* Add custom msgpackv5 decoder for 'BucketStatInfo', since msgpackv5 library has an issue (see commit content).
3132
* Add custom msgpackv5 decoder for 'RouterMapCallRW'.
33+
* New backward-compatible signature for StorageResultTypedFunc to fix interface for RouterCallImpl.
3234

3335
TESTS:
3436
* Rename bootstrap_test.go -> tarantool_test.go and new test in this file.

api.go

Lines changed: 190 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ type VshardMode string
2222
const (
2323
ReadMode VshardMode = "read"
2424
WriteMode VshardMode = "write"
25+
26+
// callTimeoutDefault is a default timeout when no timeout is provided
27+
callTimeoutDefault = 500 * time.Millisecond
2528
)
2629

2730
func (c VshardMode) String() string {
@@ -31,7 +34,7 @@ func (c VshardMode) String() string {
3134
type vshardStorageCallResponseProto struct {
3235
AssertError *assertError // not nil if there is assert error
3336
VshardError *StorageCallVShardError // not nil if there is vshard response
34-
Data []interface{} // raw response data
37+
CallResp VshardRouterCallResp
3538
}
3639

3740
func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
@@ -115,14 +118,13 @@ func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error
115118
}
116119

117120
// isVShardRespOk is true
118-
r.Data = make([]interface{}, 0, respArrayLen-1)
119-
121+
r.CallResp.rawMessages = make([]msgpack.RawMessage, 0, respArrayLen-1)
120122
for i := 1; i < respArrayLen; i++ {
121-
elem, err := d.DecodeInterface()
123+
elem, err := d.DecodeRaw()
122124
if err != nil {
123-
return fmt.Errorf("failed to decode into interface element #%d of response array", i+1)
125+
return fmt.Errorf("failed to decode into msgpack.RawMessage element #%d of response array", i-1)
124126
}
125-
r.Data = append(r.Data, elem)
127+
r.CallResp.rawMessages = append(r.CallResp.rawMessages, elem)
126128
}
127129

128130
return nil
@@ -167,62 +169,180 @@ func (s StorageCallVShardError) Error() string {
167169
return fmt.Sprintf("%+v", alias(s))
168170
}
169171

170-
type StorageResultTypedFunc = func(result interface{}) error
172+
type StorageResultTypedFunc = func(result ...interface{}) error
171173

172174
type CallOpts struct {
173175
VshardMode VshardMode // vshard mode in call
174176
PoolMode pool.Mode
175177
Timeout time.Duration
176178
}
177179

178-
// revive warns us: time-naming: var CallTimeoutMin is of type time.Duration; don't use unit-specific suffix "Min".
179-
// But the original lua vshard implementation uses this naming, so we use it too.
180-
//
181-
//nolint:revive
182-
const CallTimeoutMin = time.Second / 2
180+
// VshardRouterCallMode is a type to represent call mode for Router.Call method.
181+
type VshardRouterCallMode int
182+
183+
const (
184+
// VshardRouterCallModeRO sets a read-only mode for Router.Call.
185+
VshardRouterCallModeRO VshardRouterCallMode = iota
186+
// VshardRouterCallModeRW sets a read-write mode for Router.Call.
187+
VshardRouterCallModeRW
188+
// VshardRouterCallModeRE acts like VshardRouterCallModeRO
189+
// with preference for a replica rather than a master.
190+
// This mode is not supported yet.
191+
VshardRouterCallModeRE
192+
// VshardRouterCallModeBRO acts like VshardRouterCallModeRO with balancing.
193+
VshardRouterCallModeBRO
194+
// VshardRouterCallModeBRE acts like VshardRouterCallModeRO with balancing
195+
// and preference for a replica rather than a master.
196+
VshardRouterCallModeBRE
197+
)
198+
199+
// VshardRouterCallOptions represents options to Router.Call[XXX] methods.
200+
type VshardRouterCallOptions struct {
201+
Timeout time.Duration
202+
}
203+
204+
// VshardRouterCallResp represents a response from Router.Call[XXX] methods.
205+
type VshardRouterCallResp struct {
206+
rawMessages []msgpack.RawMessage
207+
}
208+
209+
// Get returns a response from user defined function as []interface{}.
210+
func (r VshardRouterCallResp) Get() ([]interface{}, error) {
211+
resp := make([]interface{}, len(r.rawMessages))
212+
return resp, r.GetTyped(resp)
213+
}
214+
215+
// GetTyped decodes a response from user defined function into custom values.
216+
func (r VshardRouterCallResp) GetTyped(result []interface{}) error {
217+
minLen := len(result)
218+
if dataLen := len(r.rawMessages); dataLen < minLen {
219+
minLen = dataLen
220+
}
221+
222+
for i := 0; i < minLen; i++ {
223+
if err := msgpack.Unmarshal(r.rawMessages[i], &result[i]); err != nil {
224+
return fmt.Errorf("failed to decode into result[%d] element #%d of response array: %w", i, i, err)
225+
}
226+
}
227+
228+
return nil
229+
}
183230

184231
// RouterCallImpl Perform shard operation function will restart operation
185232
// after wrong bucket response until timeout is reached
233+
// Deprecated: RouterCallImpl is deprecated.
234+
// See https://github.com/KaymeKaydex/go-vshard-router/issues/110.
235+
// Use Call method with RO, RW, RE, BRO, BRE modes instead.
186236
func (r *Router) RouterCallImpl(ctx context.Context,
187237
bucketID uint64,
188238
opts CallOpts,
189239
fnc string,
190240
args interface{}) (interface{}, StorageResultTypedFunc, error) {
191241

242+
var vshardCallOpts = VshardRouterCallOptions{
243+
Timeout: opts.Timeout,
244+
}
245+
246+
var vshardCallMode VshardRouterCallMode
247+
248+
switch opts.VshardMode {
249+
case WriteMode:
250+
vshardCallMode = VshardRouterCallModeRW
251+
case ReadMode:
252+
switch opts.PoolMode {
253+
case pool.ANY:
254+
vshardCallMode = VshardRouterCallModeBRO
255+
case pool.RO:
256+
vshardCallMode = VshardRouterCallModeRO
257+
case pool.RW:
258+
return nil, nil, fmt.Errorf("unexpected opts %+v", opts)
259+
case pool.PreferRO:
260+
vshardCallMode = VshardRouterCallModeBRE
261+
case pool.PreferRW:
262+
return nil, nil, fmt.Errorf("unexpected opts %+v", opts)
263+
default:
264+
return nil, nil, fmt.Errorf("unexpected opts.PoolMode %v", opts.PoolMode)
265+
}
266+
default:
267+
return nil, nil, fmt.Errorf("unexpected opts.VshardMode %v", opts.VshardMode)
268+
}
269+
270+
vshardCallResp, err := r.Call(ctx, bucketID, vshardCallMode, fnc, args, vshardCallOpts)
271+
if err != nil {
272+
return nil, nil, err
273+
}
274+
275+
data, err := vshardCallResp.Get()
276+
if err != nil {
277+
return nil, nil, err
278+
}
279+
280+
return data, func(result ...interface{}) error {
281+
return vshardCallResp.GetTyped(result)
282+
}, nil
283+
}
284+
285+
// Call calls the function identified by 'fnc' on the shard storing the bucket identified by 'bucket_id'.
286+
func (r *Router) Call(ctx context.Context, bucketID uint64, mode VshardRouterCallMode,
287+
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
192288
const vshardStorageClientCall = "vshard.storage.call"
193289

194290
if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
195-
return nil, nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
291+
return VshardRouterCallResp{}, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
196292
}
197293

198-
if opts.Timeout == 0 {
199-
opts.Timeout = CallTimeoutMin
294+
var poolMode pool.Mode
295+
var vshardMode VshardMode
296+
297+
switch mode {
298+
case VshardRouterCallModeRO:
299+
poolMode, vshardMode = pool.RO, ReadMode
300+
case VshardRouterCallModeRW:
301+
poolMode, vshardMode = pool.RW, WriteMode
302+
case VshardRouterCallModeRE:
303+
// poolMode, vshardMode = pool.PreferRO, ReadMode
304+
// since go-tarantool always use balance=true politic,
305+
// we can't support this case until: https://github.com/tarantool/go-tarantool/issues/400
306+
return VshardRouterCallResp{}, fmt.Errorf("mode VshardCallModeRE is not supported yet")
307+
case VshardRouterCallModeBRO:
308+
poolMode, vshardMode = pool.ANY, ReadMode
309+
case VshardRouterCallModeBRE:
310+
poolMode, vshardMode = pool.PreferRO, ReadMode
311+
default:
312+
return VshardRouterCallResp{}, fmt.Errorf("unknown VshardCallMode(%d)", mode)
200313
}
201314

202-
timeout := opts.Timeout
203-
timeStart := time.Now()
315+
timeout := callTimeoutDefault
316+
if opts.Timeout > 0 {
317+
timeout = opts.Timeout
318+
}
319+
320+
ctx, cancel := context.WithTimeout(ctx, timeout)
321+
defer cancel()
322+
323+
tntReq := tarantool.NewCallRequest(vshardStorageClientCall).
324+
Context(ctx).
325+
Args([]interface{}{
326+
bucketID,
327+
vshardMode,
328+
fnc,
329+
args,
330+
})
204331

205-
req := tarantool.NewCallRequest(vshardStorageClientCall)
206-
req = req.Context(ctx)
207-
req = req.Args([]interface{}{
208-
bucketID,
209-
opts.VshardMode.String(),
210-
fnc,
211-
args,
212-
})
332+
requestStartTime := time.Now()
213333

214334
var err error
215335

216336
for {
217-
if since := time.Since(timeStart); since > timeout {
218-
r.metrics().RequestDuration(since, false, false)
337+
if spent := time.Since(requestStartTime); spent > timeout {
338+
r.metrics().RequestDuration(spent, false, false)
219339

220-
r.log().Debugf(ctx, "Return result on timeout; since %s of timeout %s", since, timeout)
340+
r.log().Debugf(ctx, "Return result on timeout; spent %s of timeout %s", spent, timeout)
221341
if err == nil {
222342
err = fmt.Errorf("cant get call cause call impl timeout")
223343
}
224344

225-
return nil, nil, err
345+
return VshardRouterCallResp{}, err
226346
}
227347

228348
var rs *Replicaset
@@ -242,18 +362,16 @@ func (r *Router) RouterCallImpl(ctx context.Context,
242362

243363
r.log().Infof(ctx, "Try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID)
244364

245-
future := rs.conn.Do(req, opts.PoolMode)
246-
247365
var storageCallResponse vshardStorageCallResponseProto
248-
err = future.GetTyped(&storageCallResponse)
366+
err = rs.conn.Do(tntReq, poolMode).GetTyped(&storageCallResponse)
249367
if err != nil {
250-
return nil, nil, fmt.Errorf("got error on future.Get(): %w", err)
368+
return VshardRouterCallResp{}, fmt.Errorf("got error on future.GetTyped(): %w", err)
251369
}
252370

253371
r.log().Debugf(ctx, "Got call result response data %+v", storageCallResponse)
254372

255373
if storageCallResponse.AssertError != nil {
256-
return nil, nil, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError)
374+
return VshardRouterCallResp{}, fmt.Errorf("%s: %s failed: %+v", vshardStorageClientCall, fnc, storageCallResponse.AssertError)
257375
}
258376

259377
if storageCallResponse.VshardError != nil {
@@ -267,7 +385,7 @@ func (r *Router) RouterCallImpl(ctx context.Context,
267385
if vshardError.Destination != "" {
268386
destinationUUID, err := uuid.Parse(vshardError.Destination)
269387
if err != nil {
270-
return nil, nil, fmt.Errorf("protocol violation %s: malformed destination %w: %w",
388+
return VshardRouterCallResp{}, fmt.Errorf("protocol violation %s: malformed destination %w: %w",
271389
vshardStorageClientCall, vshardError, err)
272390
}
273391

@@ -291,8 +409,8 @@ func (r *Router) RouterCallImpl(ctx context.Context,
291409
const defaultPoolingPause = 50 * time.Millisecond
292410
time.Sleep(defaultPoolingPause)
293411

294-
if time.Since(timeStart) > timeout {
295-
return nil, nil, vshardError
412+
if spent := time.Since(requestStartTime); spent > timeout {
413+
return VshardRouterCallResp{}, vshardError
296414
}
297415
}
298416
}
@@ -311,30 +429,52 @@ func (r *Router) RouterCallImpl(ctx context.Context,
311429
// There is a comment why lua vshard router doesn't retry:
312430
// https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L697
313431
r.BucketReset(bucketID)
314-
return nil, nil, vshardError
432+
return VshardRouterCallResp{}, vshardError
315433
case VShardErrNameNonMaster:
316434
// vshard.storage has returned NON_MASTER error, lua vshard router updates info about master in this case:
317435
// See: https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L704.
318436
// Since we use go-tarantool library, and go-tarantool library doesn't provide API to update info about current master,
319437
// we just return this error as is.
320-
return nil, nil, vshardError
438+
return VshardRouterCallResp{}, vshardError
321439
default:
322-
return nil, nil, vshardError
440+
return VshardRouterCallResp{}, vshardError
323441
}
324442
}
325443

326-
r.metrics().RequestDuration(time.Since(timeStart), true, false)
444+
r.metrics().RequestDuration(time.Since(requestStartTime), true, false)
327445

328-
return storageCallResponse.Data, func(result interface{}) error {
329-
if len(storageCallResponse.Data) == 0 {
330-
return nil
331-
}
446+
return storageCallResponse.CallResp, nil
447+
}
448+
}
449+
450+
// CallRO is an alias for Call with VshardRouterCallModeRO.
451+
func (r *Router) CallRO(ctx context.Context, bucketID uint64,
452+
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
453+
return r.Call(ctx, bucketID, VshardRouterCallModeRO, fnc, args, opts)
454+
}
332455

333-
var stub bool
456+
// CallRW is an alias for Call with VshardRouterCallModeRW.
457+
func (r *Router) CallRW(ctx context.Context, bucketID uint64,
458+
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
459+
return r.Call(ctx, bucketID, VshardRouterCallModeRW, fnc, args, opts)
460+
}
334461

335-
return future.GetTyped(&[]interface{}{&stub, result})
336-
}, nil
337-
}
462+
// CallRE is an alias for Call with VshardRouterCallModeRE.
463+
func (r *Router) CallRE(ctx context.Context, bucketID uint64,
464+
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
465+
return r.Call(ctx, bucketID, VshardRouterCallModeRE, fnc, args, opts)
466+
}
467+
468+
// CallBRO is an alias for Call with VshardRouterCallModeBRO.
469+
func (r *Router) CallBRO(ctx context.Context, bucketID uint64,
470+
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
471+
return r.Call(ctx, bucketID, VshardRouterCallModeBRO, fnc, args, opts)
472+
}
473+
474+
// CallBRE is an alias for Call with VshardRouterCallModeBRE.
475+
func (r *Router) CallBRE(ctx context.Context, bucketID uint64,
476+
fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) {
477+
return r.Call(ctx, bucketID, VshardRouterCallModeBRE, fnc, args, opts)
338478
}
339479

340480
// RouterMapCallRWOptions sets options for RouterMapCallRW.
@@ -487,7 +627,7 @@ func RouterMapCallRW[T any](r *Router, ctx context.Context,
487627
) (map[uuid.UUID]T, error) {
488628
const vshardStorageServiceCall = "vshard.storage._call"
489629

490-
timeout := CallTimeoutMin
630+
timeout := callTimeoutDefault
491631
if opts.Timeout > 0 {
492632
timeout = opts.Timeout
493633
}

api_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestRouter_RouterCallImpl(t *testing.T) {
6767
conn: mPool,
6868
})
6969

70-
_, _, err := r.RouterCallImpl(ctx, 5, CallOpts{Timeout: time.Second}, "test", []byte("test"))
70+
_, _, err := r.RouterCallImpl(ctx, 5, CallOpts{Timeout: time.Second, VshardMode: ReadMode}, "test", []byte("test"))
7171
require.ErrorIs(t, err, futureError)
7272
})
7373
}

0 commit comments

Comments
 (0)