@@ -52,6 +52,7 @@ type StorageCallVShardError struct {
52
52
MasterUUID string `msgpack:"master" mapstructure:"master"`
53
53
ReplicasetUUID string `msgpack:"replicaset" mapstructure:"replicaset"`
54
54
ReplicaUUID string `msgpack:"replica" mapstructure:"replica"`
55
+ Destination string `msgpack:"destination" mapstructure:"destination"`
55
56
}
56
57
57
58
func (s StorageCallVShardError ) Error () string {
@@ -169,19 +170,45 @@ func (r *Router) RouterCallImpl(ctx context.Context,
169
170
170
171
switch vshardError .Name {
171
172
case VShardErrNameWrongBucket , VShardErrNameBucketIsLocked :
173
+ // We reproduce here behaviour in https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L663
172
174
r .BucketReset (bucketID )
173
175
174
- // TODO we should inspect here err.destination like lua vshard router does,
175
- // but we don't support vshard error fully yet:
176
- // https://github.com/KaymeKaydex/go-vshard-router/issues/94
177
- // So we just retry here as a temporary solution.
178
- r .metrics ().RetryOnCall ("bucket_migrate" )
179
-
180
- r .log ().Debugf (ctx , "retrying fnc '%s' cause got vshard error: %v" , fnc , & vshardError )
181
-
182
- // this vshardError will be returned to a caller in case of timeout
183
- err = & vshardError
184
- continue
176
+ if vshardError .Destination == "" {
177
+ break // leads to retry
178
+ }
179
+
180
+ destinationUUID , err := uuid .Parse (vshardError .Destination )
181
+ if err != nil {
182
+ return nil , nil , fmt .Errorf ("protocol violation %s: malformed destination %w: %w" ,
183
+ vshardStorageClientCall , vshardError , err )
184
+ }
185
+
186
+ var loggedOnce bool
187
+ for {
188
+ idToReplicasetRef := r .getIDToReplicaset ()
189
+ if _ , ok := idToReplicasetRef [destinationUUID ]; ok {
190
+ _ , err := r .BucketSet (bucketID , destinationUUID )
191
+ if err == nil {
192
+ break // loop
193
+ }
194
+ r .log ().Warnf (ctx , "Failed set bucket %d to %v (possible race): %v" , bucketID , destinationUUID , err )
195
+ }
196
+
197
+ if ! loggedOnce {
198
+ r .log ().Warnf (ctx , "Replicaset '%v' was not found, but received from storage as destination - please " +
199
+ "update configuration" , destinationUUID )
200
+ loggedOnce = true
201
+ }
202
+
203
+ const defaultPoolingPause = 50 * time .Millisecond
204
+ time .Sleep (defaultPoolingPause )
205
+
206
+ if time .Since (timeStart ) > timeout {
207
+ return nil , nil , & vshardError
208
+ }
209
+ }
210
+
211
+ // leads to retry
185
212
case VShardErrNameTransferIsInProgress :
186
213
// Since lua vshard router doesn't retry here, we don't retry too.
187
214
// There is a comment why lua vshard router doesn't retry:
@@ -197,6 +224,16 @@ func (r *Router) RouterCallImpl(ctx context.Context,
197
224
default :
198
225
return nil , nil , & vshardError
199
226
}
227
+
228
+ // retry for VShardErrNameWrongBucket, VShardErrNameBucketIsLocked
229
+
230
+ r .metrics ().RetryOnCall ("bucket_migrate" )
231
+
232
+ r .log ().Debugf (ctx , "retrying fnc '%s' cause got vshard error: %v" , fnc , & vshardError )
233
+
234
+ // this vshardError will be returned to a caller in case of timeout
235
+ err = & vshardError
236
+ continue
200
237
}
201
238
202
239
var isVShardRespOk bool
0 commit comments