6
6
"time"
7
7
8
8
"github.com/google/uuid"
9
- "github.com/mitchellh/mapstructure"
10
9
11
10
"github.com/tarantool/go-tarantool/v2"
12
11
"github.com/tarantool/go-tarantool/v2/pool"
@@ -338,15 +337,154 @@ func (r *Router) RouterCallImpl(ctx context.Context,
338
337
}
339
338
}
340
339
340
+ // RouterMapCallRWOptions sets options for RouterMapCallRW.
341
+ type RouterMapCallRWOptions struct {
342
+ // Timeout defines timeout for RouterMapCallRW.
343
+ Timeout time.Duration
344
+ }
345
+
346
+ type storageMapResponseProto [T any ] struct {
347
+ ok bool
348
+ value T
349
+ err StorageCallVShardError
350
+ }
351
+
352
+ func (r * storageMapResponseProto [T ]) DecodeMsgpack (d * msgpack.Decoder ) error {
353
+ // proto for 'storage_map' method
354
+ // https://github.com/tarantool/vshard/blob/8d299bfecff8bc656056658350ad48c829f9ad3f/vshard/storage/init.lua#L3158
355
+ respArrayLen , err := d .DecodeArrayLen ()
356
+ if err != nil {
357
+ return err
358
+ }
359
+
360
+ if respArrayLen == 0 {
361
+ return fmt .Errorf ("protocol violation: invalid array length: %d" , respArrayLen )
362
+ }
363
+
364
+ code , err := d .PeekCode ()
365
+ if err != nil {
366
+ return err
367
+ }
368
+
369
+ if code == msgpcode .Nil {
370
+ err = d .DecodeNil ()
371
+ if err != nil {
372
+ return err
373
+ }
374
+
375
+ if respArrayLen != 2 {
376
+ return fmt .Errorf ("protocol violation: length is %d on vshard error case" , respArrayLen )
377
+ }
378
+
379
+ err = d .Decode (& r .err )
380
+ if err != nil {
381
+ return fmt .Errorf ("failed to decode storage vshard error: %w" , err )
382
+ }
383
+
384
+ return nil
385
+ }
386
+
387
+ isOk , err := d .DecodeBool ()
388
+ if err != nil {
389
+ return err
390
+ }
391
+
392
+ if ! isOk {
393
+ return fmt .Errorf ("protocol violation: isOk=false" )
394
+ }
395
+
396
+ switch respArrayLen {
397
+ case 1 :
398
+ break
399
+ case 2 :
400
+ err = d .Decode (& r .value )
401
+ if err != nil {
402
+ return fmt .Errorf ("can't decode value %T: %w" , r .value , err )
403
+ }
404
+ default :
405
+ return fmt .Errorf ("protocol violation: invalid array length when no vshard error: %d" , respArrayLen )
406
+ }
407
+
408
+ r .ok = true
409
+
410
+ return nil
411
+ }
412
+
413
+ type storageRefResponseProto struct {
414
+ err error
415
+ bucketCount uint64
416
+ }
417
+
418
+ func (r * storageRefResponseProto ) DecodeMsgpack (d * msgpack.Decoder ) error {
419
+ respArrayLen , err := d .DecodeArrayLen ()
420
+ if err != nil {
421
+ return err
422
+ }
423
+
424
+ if respArrayLen == 0 {
425
+ return fmt .Errorf ("protocol violation: invalid array length: %d" , respArrayLen )
426
+ }
427
+
428
+ code , err := d .PeekCode ()
429
+ if err != nil {
430
+ return err
431
+ }
432
+
433
+ if code == msgpcode .Nil {
434
+ err = d .DecodeNil ()
435
+ if err != nil {
436
+ return err
437
+ }
438
+
439
+ if respArrayLen != 2 {
440
+ return fmt .Errorf ("protocol violation: length is %d on error case" , respArrayLen )
441
+ }
442
+
443
+ // The possible variations of error here are fully unknown yet for us, e.g:
444
+ // vshard error, assert error or some other type of error. So this question requires research.
445
+ // So we do not decode it to some known error format, because we don't use it anyway.
446
+ decodedError , err := d .DecodeInterface ()
447
+ if err != nil {
448
+ return err
449
+ }
450
+
451
+ // convert empty interface into error
452
+ r .err = fmt .Errorf ("%v" , decodedError )
453
+
454
+ return nil
455
+ }
456
+
457
+ r .bucketCount , err = d .DecodeUint64 ()
458
+ if err != nil {
459
+ return err
460
+ }
461
+
462
+ return nil
463
+ }
464
+
341
465
// RouterMapCallRWImpl perform call function on all masters in the cluster
342
466
// with a guarantee that in case of success it was executed with all
343
467
// buckets being accessible for reads and writes.
468
+ // Deprecated: RouterMapCallRWImpl is deprecated.
469
+ // Use more general RouterMapCallRW instead.
344
470
func (r * Router ) RouterMapCallRWImpl (
345
471
ctx context.Context ,
346
472
fnc string ,
347
473
args interface {},
348
474
opts CallOpts ,
349
475
) (map [uuid.UUID ]interface {}, error ) {
476
+ return RouterMapCallRW [interface {}](r , ctx , fnc , args , RouterMapCallRWOptions {Timeout : opts .Timeout })
477
+ }
478
+
479
+ // RouterMapCallRW is a consistent Map-Reduce. The given function is called on all masters in the
480
+ // cluster with a guarantee that in case of success it was executed with all
481
+ // buckets being accessible for reads and writes.
482
+ // T is a return type of user defined function 'fnc'.
483
+ // We define it as a distinct function, not a Router method, because golang limitations,
484
+ // see: https://github.com/golang/go/issues/49085.
485
+ func RouterMapCallRW [T any ](r * Router , ctx context.Context ,
486
+ fnc string , args interface {}, opts RouterMapCallRWOptions ,
487
+ ) (map [uuid.UUID ]T , error ) {
350
488
const vshardStorageServiceCall = "vshard.storage._call"
351
489
352
490
timeout := CallTimeoutMin
@@ -399,32 +537,17 @@ func (r *Router) RouterMapCallRWImpl(
399
537
// proto for 'storage_ref' method:
400
538
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3137
401
539
for _ , rsFuture := range rsFutures {
402
- respData , err := rsFuture .future .Get ()
403
- if err != nil {
404
- return nil , fmt .Errorf ("rs {%s} storage_ref err: %v" , rsFuture .uuid , err )
405
- }
406
-
407
- if len (respData ) < 1 {
408
- return nil , fmt .Errorf ("protocol violation: storage_ref: expected len(respData) 1 or 2, got: %d" , len (respData ))
409
- }
540
+ var storageRefResponse storageRefResponseProto
410
541
411
- if respData [0 ] == nil {
412
- if len (respData ) != 2 {
413
- return nil , fmt .Errorf ("protocol vioaltion: storage_ref: expected len(respData) = 2 when respData[0] == nil, got %d" , len ((respData )))
414
- }
415
-
416
- // The possible variations of error in respData[1] are fully unknown yet for us, this question requires research.
417
- // So we do not convert respData[1] to some known error format, because we don't use it anyway.
418
- return nil , fmt .Errorf ("storage_ref failed on %v: %v" , rsFuture .uuid , respData [1 ])
542
+ if err := rsFuture .future .GetTyped (& storageRefResponse ); err != nil {
543
+ return nil , fmt .Errorf ("rs {%s} storage_ref err: %v" , rsFuture .uuid , err )
419
544
}
420
545
421
- var bucketCount uint64
422
- err = rsFuture .future .GetTyped (& []interface {}{& bucketCount })
423
- if err != nil {
424
- return nil , err
546
+ if storageRefResponse .err != nil {
547
+ return nil , fmt .Errorf ("storage_ref failed on %v: %v" , rsFuture .uuid , storageRefResponse .err )
425
548
}
426
549
427
- totalBucketCount += bucketCount
550
+ totalBucketCount += storageRefResponse . bucketCount
428
551
}
429
552
430
553
if totalBucketCount != r .cfg .TotalBucketCount {
@@ -449,52 +572,20 @@ func (r *Router) RouterMapCallRWImpl(
449
572
}
450
573
451
574
// map stage: get their responses
452
- idToResult := make (map [uuid.UUID ]interface {})
453
- // proto for 'storage_map' method:
454
- // https://github.com/tarantool/vshard/blob/8d299bfecff8bc656056658350ad48c829f9ad3f/vshard/storage/init.lua#L3158
575
+ idToResult := make (map [uuid.UUID ]T )
455
576
for _ , rsFuture := range rsFutures {
456
- respData , err := rsFuture .future .Get ()
457
- if err != nil {
458
- return nil , fmt .Errorf ("rs {%s} storage_map err: %v" , rsFuture .uuid , err )
459
- }
460
-
461
- if len (respData ) < 1 {
462
- return nil , fmt .Errorf ("protocol violation: invalid respData length: must be >= 1, current: %d" , len (respData ))
463
- }
464
-
465
- if respData [0 ] == nil {
466
- if len (respData ) != 2 {
467
- return nil , fmt .Errorf ("protocol violation: invalid respData length when respData[0] == nil, must be = 2, current: %d" , len (respData ))
468
- }
469
-
470
- var assertError assertError
471
- err = mapstructure .Decode (respData [1 ], & assertError )
472
- if err != nil {
473
- // We could not decode respData[1] as assertError, so return respData[1] as is, add info why we could not decode.
474
- return nil , fmt .Errorf ("storage_map failed on %v: %+v (decoding to assertError failed %v)" , rsFuture .uuid , respData [1 ], err )
475
- }
577
+ var storageMapResponse storageMapResponseProto [T ]
476
578
477
- return nil , fmt .Errorf ("storage_map failed on %v: %+v" , rsFuture .uuid , assertError )
478
- }
479
-
480
- var isVShardRespOk bool
481
- err = rsFuture .future .GetTyped (& []interface {}{& isVShardRespOk })
579
+ err := rsFuture .future .GetTyped (& storageMapResponse )
482
580
if err != nil {
483
- return nil , fmt .Errorf ("can't decode isVShardRespOk for storage_map response : %v" , err )
581
+ return nil , fmt .Errorf ("rs {%s} storage_map err : %v" , rsFuture . uuid , err )
484
582
}
485
583
486
- if ! isVShardRespOk {
487
- return nil , fmt .Errorf ("protocol violation: isVShardRespOk = false from storage_map: replicaset % v" , rsFuture .uuid )
584
+ if ! storageMapResponse . ok {
585
+ return nil , fmt .Errorf ("storage_map failed on %v: %+ v" , rsFuture .uuid , storageMapResponse . err )
488
586
}
489
587
490
- switch l := len (respData ); l {
491
- case 1 :
492
- idToResult [rsFuture .uuid ] = nil
493
- case 2 :
494
- idToResult [rsFuture .uuid ] = respData [1 ]
495
- default :
496
- return nil , fmt .Errorf ("protocol vioaltion: invalid respData when respData[0] == true, expected 1 or 2, got %d" , l )
497
- }
588
+ idToResult [rsFuture .uuid ] = storageMapResponse .value
498
589
}
499
590
500
591
r .metrics ().RequestDuration (time .Since (timeStart ), true , true )
0 commit comments