File tree Expand file tree Collapse file tree 1 file changed +11
-1
lines changed
src/com/xxdb/streaming/client Expand file tree Collapse file tree 1 file changed +11
-1
lines changed Original file line number Diff line number Diff line change @@ -236,7 +236,7 @@ public boolean tryReconnect(String topic) {
236
236
}
237
237
} else {
238
238
// if set backupSites
239
- synchronized (reconnectTable ) {
239
+ synchronized (this ) {
240
240
topic = HATopicToTrueTopic .get (topic );
241
241
queueManager .removeQueue (topic );
242
242
Site [] sites ;
@@ -289,12 +289,22 @@ public boolean tryReconnect(String topic) {
289
289
siteList .remove ((int ) currentSiteIndex );
290
290
// update sites
291
291
sites = siteList .toArray (new Site [0 ]);
292
+ trueTopicToSites .put (topic , sites );
292
293
293
294
// Calculate the index of the newly successful connection node after a successful deletion.
294
295
if (successfulSiteIndex > currentSiteIndex ) {
295
296
// If the successfully connected node is after the deleted node, reduce the index by 1.
296
297
successfulSiteIndex -= 1 ;
297
298
}
299
+
300
+ // put sites to new sub topic.
301
+ for (String key : trueTopicToSites .keySet ()) {
302
+ // 重新为 key 赋值
303
+ if (key .contains (sites [successfulSiteIndex ].host +":" +sites [successfulSiteIndex ].port )) {
304
+ trueTopicToSites .put (key , sites ); // 更新操作是安全的
305
+ }
306
+ }
307
+
298
308
// update currentSiteIndexMap to new successfully connected site's index;
299
309
currentSiteIndexMap .put (topic , successfulSiteIndex );
300
310
} else if (reconnected ) {
You can’t perform that action at this time.
0 commit comments