@@ -52,6 +52,7 @@ type StorageCallVShardError struct {
52
52
MasterUUID string `msgpack:"master" mapstructure:"master"`
53
53
ReplicasetUUID string `msgpack:"replicaset" mapstructure:"replicaset"`
54
54
ReplicaUUID string `msgpack:"replica" mapstructure:"replica"`
55
+ Destination string `msgpack:"destination" mapstructure:"destination"`
55
56
}
56
57
57
58
func (s StorageCallVShardError ) Error () string {
@@ -171,17 +172,42 @@ func (r *Router) RouterCallImpl(ctx context.Context,
171
172
case VShardErrNameWrongBucket , VShardErrNameBucketIsLocked :
172
173
r .BucketReset (bucketID )
173
174
174
- // TODO we should inspect here err.destination like lua vshard router does,
175
- // but we don't support vshard error fully yet:
176
- // https://github.com/KaymeKaydex/go-vshard-router/issues/94
177
- // So we just retry here as a temporary solution.
178
- r .metrics ().RetryOnCall ("bucket_migrate" )
179
-
180
- r .log ().Debugf (ctx , "retrying fnc '%s' cause got vshard error: %v" , fnc , & vshardError )
181
-
182
- // this vshardError will be returned to a caller in case of timeout
183
- err = & vshardError
184
- continue
175
+ if vshardError .Destination == "" {
176
+ break // leads to retry
177
+ }
178
+
179
+ destinationUUID , err := uuid .Parse (vshardError .Destination )
180
+ if err != nil {
181
+ return nil , nil , fmt .Errorf ("protocol violation %s: malformed destination %w: %w" ,
182
+ vshardStorageClientCall , vshardError , err )
183
+ }
184
+
185
+ var loggedOnce bool
186
+ for {
187
+ idToReplicasetRef := r .getIDToReplicaset ()
188
+ if _ , ok := idToReplicasetRef [destinationUUID ]; ok {
189
+ _ , err := r .BucketSet (bucketID , destinationUUID )
190
+ if err == nil {
191
+ break // loop
192
+ }
193
+ r .log ().Warnf (ctx , "Failed set bucket %d to %v (possible race): %v" , bucketID , destinationUUID , err )
194
+ }
195
+
196
+ if ! loggedOnce {
197
+ r .log ().Warnf (ctx , "Replicaset '%v' was not found, but received from storage as destination - please " +
198
+ "update configuration" , destinationUUID )
199
+ loggedOnce = true
200
+ }
201
+
202
+ const defaultPoolingPause = 50 * time .Millisecond
203
+ time .Sleep (defaultPoolingPause )
204
+
205
+ if time .Since (timeStart ) > timeout {
206
+ return nil , nil , & vshardError
207
+ }
208
+ }
209
+
210
+ // leads to retry
185
211
case VShardErrNameTransferIsInProgress :
186
212
// Since lua vshard router doesn't retry here, we don't retry too.
187
213
// There is a comment why lua vshard router doesn't retry:
@@ -197,6 +223,15 @@ func (r *Router) RouterCallImpl(ctx context.Context,
197
223
default :
198
224
return nil , nil , & vshardError
199
225
}
226
+
227
+ // retry for VShardErrNameWrongBucket, VShardErrNameBucketIsLocked
228
+
229
+ r .metrics ().RetryOnCall ("bucket_migrate" )
230
+
231
+ r .log ().Debugf (ctx , "retrying fnc '%s' cause got vshard error: %v" , fnc , & vshardError )
232
+
233
+ // this vshardError will be returned to a caller in case of timeout
234
+ err = & vshardError
200
235
}
201
236
202
237
var isVShardRespOk bool
0 commit comments