Skip to content
This repository was archived by the owner on Mar 9, 2025. It is now read-only.

WIP: future/nsaktaganov #98

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
## Unreleased

BUG FIXES:
* Fix decoding fields for StorageCallVShardError (MasterUUID, ReplicasetUUID).

CHANGES:
* Add comment why and how we handle "NON_MASTER" vshard error.
* Don't support 'type Error struct' anymore.
* Handle vshard error the same way as lua vshard router (resolve issue #77).

FEATURES:

* Add pause between requests in buckets discovering. Configured by config DiscoveryWorkStep, default is 10ms.
* Add ReplicaUUID to the StorageCallVShardError struct.

REFACTOR:

* Use constants for vshard error names and codes.

## v1.2.0

Expand Down
90 changes: 66 additions & 24 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,18 @@ func (s storageCallAssertError) Error() string {
}

type StorageCallVShardError struct {
BucketID uint64 `msgpack:"bucket_id" mapstructure:"bucket_id"`
Reason string `msgpack:"reason"`
Code int `msgpack:"code"`
Type string `msgpack:"type"`
Message string `msgpack:"message"`
Name string `msgpack:"name"`
MasterUUID *string `msgpack:"master_uuid" mapstructure:"master_uuid"` // mapstructure cant decode to source uuid type
ReplicasetUUID *string `msgpack:"replicaset_uuid" mapstructure:"replicaset_uuid"` // mapstructure cant decode to source uuid type
BucketID uint64 `msgpack:"bucket_id" mapstructure:"bucket_id"`
Reason string `msgpack:"reason"`
Code int `msgpack:"code"`
Type string `msgpack:"type"`
Message string `msgpack:"message"`
Name string `msgpack:"name"`
// These 3 fields below are send as string by vshard storage, so we decode them into string, not uuid.UUID type
// Example: 00000000-0000-0002-0002-000000000000
MasterUUID string `msgpack:"master" mapstructure:"master"`
ReplicasetUUID string `msgpack:"replicaset" mapstructure:"replicaset"`
ReplicaUUID string `msgpack:"replica" mapstructure:"replica"`
Destination string `msgpack:"destination" mapstructure:"destination"`
}

func (s StorageCallVShardError) Error() string {
Expand Down Expand Up @@ -165,33 +169,71 @@ func (r *Router) RouterCallImpl(ctx context.Context,
}

switch vshardError.Name {
case "WRONG_BUCKET", "BUCKET_IS_LOCKED":
case VShardErrNameWrongBucket, VShardErrNameBucketIsLocked:
// We reproduce here behaviour in https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L663
r.BucketReset(bucketID)

// TODO we should inspect here err.destination like lua vshard router does,
// but we don't support vshard error fully yet:
// https://github.com/KaymeKaydex/go-vshard-router/issues/94
// So we just retry here as a temporary solution.
r.metrics().RetryOnCall("bucket_migrate")

r.log().Debugf(ctx, "retrying fnc '%s' cause got vshard error: %v", fnc, &vshardError)

// this vshardError will be returned to a caller in case of timeout
err = &vshardError
continue
case "TRANSFER_IS_IN_PROGRESS":
if vshardError.Destination == "" {
break // leads to retry
}

destinationUUID, err := uuid.Parse(vshardError.Destination)
if err != nil {
return nil, nil, fmt.Errorf("protocol violation %s: malformed destination %w: %w",
vshardStorageClientCall, vshardError, err)
}

var loggedOnce bool
for {
idToReplicasetRef := r.getIDToReplicaset()
if _, ok := idToReplicasetRef[destinationUUID]; ok {
_, err := r.BucketSet(bucketID, destinationUUID)
if err == nil {
break // loop
}
r.log().Warnf(ctx, "Failed set bucket %d to %v (possible race): %v", bucketID, destinationUUID, err)
}

if !loggedOnce {
r.log().Warnf(ctx, "Replicaset '%v' was not found, but received from storage as destination - please "+
"update configuration", destinationUUID)
loggedOnce = true
}

const defaultPoolingPause = 50 * time.Millisecond
time.Sleep(defaultPoolingPause)

if time.Since(timeStart) > timeout {
return nil, nil, &vshardError
}
}

// leads to retry
case VShardErrNameTransferIsInProgress:
// Since lua vshard router doesn't retry here, we don't retry too.
// There is a comment why lua vshard router doesn't retry:
// https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L697
r.BucketReset(bucketID)
return nil, nil, &vshardError
case "NON_MASTER":
// We don't know how to handle this case yet, so just return it for now.
// Here is issue for it: https://github.com/KaymeKaydex/go-vshard-router/issues/88
case VShardErrNameNonMaster:
// vshard.storage has returned NON_MASTER error, lua vshard router updates info about master in this case:
// See: https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L704.
// Since we use go-tarantool library, and go-tarantool library doesn't provide API to update info about current master,
// we just return this error as is.
return nil, nil, &vshardError
default:
return nil, nil, &vshardError
}

// retry for VShardErrNameWrongBucket, VShardErrNameBucketIsLocked

r.metrics().RetryOnCall("bucket_migrate")

r.log().Debugf(ctx, "retrying fnc '%s' cause got vshard error: %v", fnc, &vshardError)

// this vshardError will be returned to a caller in case of timeout
err = &vshardError
continue
}

var isVShardRespOk bool
Expand Down
6 changes: 3 additions & 3 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl
rs, err := r.BucketSet(bucketID, rsFuture.rsID)
if err != nil {
r.log().Errorf(ctx, "bucketSearchLegacy: can't set rsID %v for bucketID %d: %v", rsFuture.rsID, bucketID, err)
return nil, Errors[9] // NO_ROUTE_TO_BUCKET
return nil, newVShardErrorNoRouteToBucket(bucketID)
}

// TODO: should we release resources for unhandled futures?
Expand All @@ -116,7 +116,7 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl
-- discovery).
*/

return nil, Errors[9] // NO_ROUTE_TO_BUCKET
return nil, newVShardErrorNoRouteToBucket(bucketID)
}

// The approach in bucketSearchLegacy is very ineffective because
Expand Down Expand Up @@ -177,7 +177,7 @@ func (r *Router) bucketSearchBatched(ctx context.Context, bucketIDToFind uint64)
}

if rs == nil {
return nil, Errors[9] // NO_ROUTE_TO_BUCKET
return nil, newVShardErrorNoRouteToBucket(bucketIDToFind)
}

return rs, nil
Expand Down
Loading
Loading