Skip to content

Commit 4849b02

Browse files
committed
Merge pull request #22 from klucar/ephemeral_topic
Adds URL encoding to DefaultNSQLookup to handle ephemeral topics
2 parents ab9841d + 83bbeca commit 4849b02

File tree

2 files changed

+22
-1
lines changed

2 files changed

+22
-1
lines changed

src/main/java/com/github/brainlag/nsq/lookup/DefaultNSQLookup.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
import com.fasterxml.jackson.databind.JsonNode;
44
import com.fasterxml.jackson.databind.ObjectMapper;
55
import com.github.brainlag.nsq.ServerAddress;
6+
import com.google.common.base.Charsets;
67
import com.google.common.collect.Sets;
78
import org.apache.logging.log4j.LogManager;
89

910
import java.io.IOException;
1011
import java.net.URL;
12+
import java.net.URLEncoder;
1113
import java.util.Set;
1214
public class DefaultNSQLookup implements NSQLookup {
1315
Set<String> addresses = Sets.newHashSet();
@@ -28,7 +30,8 @@ public Set<ServerAddress> lookup(String topic) {
2830
for (String addr : getLookupAddresses()) {
2931
try {
3032
ObjectMapper mapper = new ObjectMapper();
31-
JsonNode jsonNode = mapper.readTree(new URL(addr + "/lookup?topic=" + topic));
33+
String topicEncoded = URLEncoder.encode(topic, Charsets.UTF_8.name());
34+
JsonNode jsonNode = mapper.readTree(new URL(addr + "/lookup?topic=" + topicEncoded));
3235
LogManager.getLogger(this).debug("Server connection information: " + jsonNode.toString());
3336
JsonNode producers = jsonNode.get("data").get("producers");
3437
for (JsonNode node : producers) {

src/test/java/com/github/brainlag/nsq/NSQProducerTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.io.File;
1616
import java.util.Date;
1717
import java.util.List;
18+
import java.util.Set;
1819
import java.util.concurrent.*;
1920
import java.util.concurrent.atomic.AtomicInteger;
2021

@@ -342,6 +343,23 @@ public void testScheduledCallback() throws NSQException, TimeoutException, Inter
342343
consumer.shutdown();
343344
}
344345

346+
@Test
347+
public void testEphemeralTopic() throws InterruptedException, NSQException, TimeoutException {
348+
NSQLookup lookup = new DefaultNSQLookup();
349+
lookup.addLookupAddress("localhost", 4161);
350+
351+
NSQProducer producer = new NSQProducer();
352+
producer.setConfig(getDeflateConfig());
353+
producer.addAddress("localhost", 4150);
354+
producer.start();
355+
String msg = randomString();
356+
producer.produce("testephem#ephemeral", msg.getBytes());
357+
producer.shutdown();
358+
359+
Set<ServerAddress> servers = lookup.lookup("testephem#ephemeral");
360+
assertEquals("Could not find servers for ephemeral topic", 1, servers.size());
361+
}
362+
345363
public static ExecutorService newBackoffThreadExecutor() {
346364
return new ThreadPoolExecutor(1, 1,
347365
0L, TimeUnit.MILLISECONDS,

0 commit comments

Comments
 (0)