Skip to content

Commit 13e6229

Browse files
author
chengyitian
committed
AJ-673: add new subscribe method for default resubTimeout and subOnce;
1 parent d73a992 commit 13e6229

File tree

3 files changed

+35
-0
lines changed

3 files changed

+35
-0
lines changed

src/com/xxdb/streaming/client/PollingClient.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,15 @@ public TopicPoller subscribe(String host, int port, String tableName, String act
7676
return topicPoller;
7777
}
7878

79+
public TopicPoller subscribe(String host, int port, String tableName, String actionName, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, String userName, String passWord, boolean msgAsTable, List<String> backupSites) throws IOException {
80+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, (MessageHandler) null, offset, reconnect, filter, deserializer, false, userName, passWord, msgAsTable, backupSites, 100, false);
81+
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
82+
List<String> usr = Arrays.asList(userName, passWord);
83+
users.put(tp, usr);
84+
topicPoller = new TopicPoller(queue);
85+
return topicPoller;
86+
}
87+
7988
public TopicPoller subscribe(String host, int port, String tableName, String actionName, long offset, boolean reconnect, Vector filter, String userName, String passWord) throws IOException {
8089
return subscribe(host, port, tableName, actionName, offset, reconnect, filter, null, userName, passWord);
8190
}

src/com/xxdb/streaming/client/ThreadPooledClient.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,16 @@ public void subscribe(String host, int port, String tableName, String actionName
161161
}
162162
}
163163

164+
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String passWord, boolean msgAsTable, List<String> backupSites) throws IOException {
165+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, backupSites, 100, false);
166+
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
167+
List<String> usr = Arrays.asList(userName, passWord);
168+
synchronized (queueHandlers) {
169+
queueHandlers.put(tableNameToTrueTopic.get(topicStr), new QueueHandlerBinder(queue, handler));
170+
users.put(topicStr, usr);
171+
}
172+
}
173+
164174
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, boolean allowExistTopic, String userName, String passWord) throws IOException{
165175
subscribe(host, port, tableName, actionName, handler, offset, reconnect, filter, null, allowExistTopic, userName, passWord);
166176
}

src/com/xxdb/streaming/client/ThreadedClient.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,22 @@ public void subscribe(String host, int port, String tableName, String actionName
230230
}
231231
}
232232

233+
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, int throttle, String userName, String password, List<String> backupSites) throws IOException {
234+
if(batchSize<=0)
235+
throw new IllegalArgumentException("BatchSize must be greater than zero");
236+
if(throttle<0)
237+
throw new IllegalArgumentException("Throttle must be greater than or equal to zero");
238+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, password, false, backupSites, 100, false);
239+
HandlerLopper handlerLopper = new HandlerLopper(queue, handler, batchSize, throttle == 0 ? -1 : throttle);
240+
handlerLopper.start();
241+
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
242+
List<String> usr = Arrays.asList(userName, password);
243+
synchronized (handlerLoppers) {
244+
handlerLoppers.put(topicStr, handlerLopper);
245+
// users.put(topicStr, usr);
246+
}
247+
}
248+
233249
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, float throttle, String userName, String password) throws IOException {
234250
if(batchSize<=0)
235251
throw new IllegalArgumentException("BatchSize must be greater than zero");

0 commit comments

Comments
 (0)