@@ -591,7 +591,6 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
591
591
592
592
List <String > tp = Arrays .asList (site .host , String .valueOf (site .port ), tableName , actionName );
593
593
List <String > usr = Arrays .asList (userName , passWord );
594
- users .put (tp , usr );
595
594
596
595
params .add (new BasicString (localIP ));
597
596
params .add (new BasicInt (this .listeningPort ));
@@ -609,6 +608,7 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
609
608
610
609
re = dbConn .run ("publishTable" , params );
611
610
connList .add (dbConn );
611
+ users .put (tp , usr );
612
612
} catch (IOException e ) {
613
613
log .error ("Connect to site " + site .host + ":" + site .port + " failed: " + e .getMessage ());
614
614
}
@@ -655,7 +655,6 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
655
655
checkServerVersion (host , port );
656
656
List <String > tp = Arrays .asList (host , String .valueOf (port ), tableName , actionName );
657
657
List <String > usr = Arrays .asList (userName , passWord );
658
- users .put (tp , usr );
659
658
660
659
dbConn = createSubscribeInternalDBConnection ();
661
660
subscribeInternalConnect (dbConn , host , port , userName , passWord );
@@ -700,6 +699,7 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
700
699
701
700
re = dbConn .run ("publishTable" , params );
702
701
connList .add (dbConn );
702
+ users .put (tp , usr );
703
703
if (ifUseBackupSite ) {
704
704
synchronized (subInfos_ ){
705
705
subInfos_ .put (topic , deserializer );
0 commit comments