diff --git a/.gitignore b/.gitignore index 32858aa..96ce44d 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,11 @@ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* + + +.gradle/ +build/ +!gradle/wrapper/gradle-wrapper.jar + +*.iml +.idea/ diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..878c554 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,5 @@ +language: java +jdk: + - oraclejdk8 +script: + - "./gradlew build" diff --git a/build.gradle b/build.gradle new file mode 100644 index 0000000..8782285 --- /dev/null +++ b/build.gradle @@ -0,0 +1,169 @@ +group 'io.datanerds' +version '1.0-SNAPSHOT' + +def grpcVersion = '1.1.2' +def consulClientVersion = '0.11.3' +def slf4jVersion = '1.7.21' + +buildscript { + repositories { + mavenCentral() + } + dependencies { + classpath "com.google.protobuf:protobuf-gradle-plugin:0.8.1" + } +} + +subprojects { + apply plugin: 'java' + apply plugin: 'com.google.protobuf' + + repositories { + mavenCentral() + mavenLocal() + } + + dependencies { + compile "io.grpc:grpc-all:${grpcVersion}" + compile "io.grpc:grpc-core:${grpcVersion}" + compile "io.grpc:grpc-netty:${grpcVersion}" + compile "io.grpc:grpc-protobuf:${grpcVersion}" + compile "io.grpc:grpc-stub:${grpcVersion}" + compile "com.orbitz.consul:consul-client:${consulClientVersion}" + compile "org.slf4j:slf4j-api:${slf4jVersion}" + + testCompile "junit:junit:4.12" + testCompile 'org.hamcrest:hamcrest-library:1.3' + testCompile 'org.awaitility:awaitility:2.0.0' + testCompile 'com.pszymczyk.consul:embedded-consul:0.2.3' + } + + protobuf { + protoc { + artifact = 'com.google.protobuf:protoc:3.2.0' + } + plugins { + grpc { + artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" + } + } + generateProtoTasks { + all()*.plugins { + grpc { + option 'enable_deprecated=false' + } + } + } + } + + sourceSets { + main { + proto { + srcDirs += file("${projectDir}/build/generated/source/proto/main/java"); + srcDirs += file("${projectDir}/build/generated/source/proto/main/grpc"); + srcDirs += file("${projectDir}/build/generated/source/proto/test/java"); + srcDirs += file("${projectDir}/build/generated/source/proto/test/grpc"); + } + } + } +} + +project(":consulnameresolver") { + task javadocJar(type: Jar) { + classifier = 'javadoc' + from javadoc + } + + task sourcesJar(type: Jar) { + classifier = 'sources' + from sourceSets.main.allSource + } + + artifacts { + archives javadocJar, sourcesJar + } +} + +buildscript { + repositories { + mavenCentral() + } + dependencies { + classpath "io.codearte.gradle.nexus:gradle-nexus-staging-plugin:0.5.3" + } +} + + +apply plugin: 'io.codearte.nexus-staging' + +if (project.hasProperty('staging')) { + + nexusStaging { + username = ossrhUser + password = ossrhPassword + } + + project(":consulnameresolver") { + apply plugin: 'signing' + apply plugin: 'maven' + apply plugin: 'osgi' + + jar { + manifest { + version = project.version + symbolicName = "$project.group.$baseName" + } + } + + signing { + sign configurations.archives + } + + nexusStaging { + packageGroup = "io.datanerds" + stagingProfileId = "36a7a00c49b56" + } + + uploadArchives { + repositories { + mavenDeployer { + beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) } + repository(url: "https://oss.sonatype.org/service/local/staging/deploy/maven2/") { + authentication(userName: ossrhUser, password: ossrhPassword) + } + pom.project { + name project.name + groupId 'io.datanerds' + description project.description + packaging 'jar' + url 'https://github.com/datanerds-io/ConsulNameResolver' + + scm { + connection 'scm:git:https://github.com/datanerds-io/ConsulNameResolver.git' + developerConnection 'scm:git:git@github.com:datanerds-io/ConsulNameResolver.git' + url 'https://github.com/datanerds-io/ConsulNameResolver.git' + } + + licenses { + license { + name 'Eclipse Public License 1.0' + url 'https://opensource.org/licenses/EPL-1.0' + distribution 'repo' + } + } + + developers { + developer { + id = 'utobi' + name = 'Tobias Ullrich' + email = 'github-2017@ullrich.io' + } + } + } + } + } + } + + } +} + diff --git a/consulnameresolver/src/main/java/io/datanerds/grpc/ConsulNameResolver.java b/consulnameresolver/src/main/java/io/datanerds/grpc/ConsulNameResolver.java new file mode 100644 index 0000000..ace4e15 --- /dev/null +++ b/consulnameresolver/src/main/java/io/datanerds/grpc/ConsulNameResolver.java @@ -0,0 +1,122 @@ +package io.datanerds.grpc; + +import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; +import com.orbitz.consul.Consul; +import com.orbitz.consul.HealthClient; +import com.orbitz.consul.cache.ServiceHealthCache; +import com.orbitz.consul.cache.ServiceHealthKey; +import com.orbitz.consul.model.ConsulResponse; +import com.orbitz.consul.model.health.ServiceHealth; +import com.orbitz.consul.option.ImmutableCatalogOptions; +import io.grpc.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +public class ConsulNameResolver extends NameResolver { + public static final String DEFAULT_ADDRESS = "localhost"; + public static final Integer DEFAULT_PORT = 8383; + private static final Logger logger = LoggerFactory.getLogger(ConsulNameResolver.class); + private final String authority; + private final String service; + private Listener listener; + private Consul consul; + private HealthClient healthClient; + private Optional datacenter; + private Optional> tags; + + public ConsulNameResolver(ConsulQueryParameter parameter) { + this.authority = parameter.consulAddress.orElse( + String.format("%s:%s", DEFAULT_ADDRESS, DEFAULT_PORT)); //todo move to URI + this.service = parameter.service; + this.datacenter = parameter.datacenter; + this.tags = parameter.tags; + + HostAndPort consulHostAndPort = HostAndPort.fromString(this.authority); + this.consul = Consul.builder().withHostAndPort(consulHostAndPort).build(); + this.healthClient = this.consul.healthClient(); + } + + @Override + public String getServiceAuthority() { + return this.authority; + } + + @Override + public void shutdown() { + } + + @Override + public void start(Listener listener) { + Preconditions.checkState(this.listener == null, "already started"); + this.listener = Preconditions.checkNotNull(listener, "listener"); + logger.debug("Resolving service '{}' using consul at '%{}'", this.service, this.authority); + + ServiceHealthCache svHealth = + ServiceHealthCache.newCache(this.healthClient, this.service, true, buildCatalogOptions(), 5); + + svHealth.addListener((Map newValues) -> { + if (newValues.isEmpty()) { + updateListenerWithEmptyResult(); + return; + } + listener.onUpdate(generateResolvedServerList(newValues), Attributes.EMPTY); + }); + + try { + svHealth.start(); + runInitialResolve(); + } catch (Exception ex) { + throw new RuntimeException("Exception while trying to start consul client for name resolution", ex); + } + } + + private ImmutableCatalogOptions buildCatalogOptions() { + ImmutableCatalogOptions.Builder options = ImmutableCatalogOptions.builder(); + + if (this.datacenter.isPresent()) { + options.datacenter(com.google.common.base.Optional.of(this.datacenter.get())); + } + + if (this.tags.isPresent()) { + this.tags.get().forEach(options::tag); + } + return options.build(); + } + + private void updateListenerWithEmptyResult() { + logger.warn("No servers could be resolved for '{}'", ConsulNameResolver.this.service); + this.listener.onError(Status.UNAVAILABLE.augmentDescription(String.format( + "No servers could be resolved for service '%s' from authority '%s'", + ConsulNameResolver.this.service, ConsulNameResolver.this.authority))); + this.listener.onUpdate(new ArrayList<>(), Attributes.EMPTY); + } + + private List generateResolvedServerList(Map newValues) { + return newValues + .keySet() + .stream() + .map(key -> new InetSocketAddress(key.getHost(), key.getPort())) + .map(socketAddress -> ResolvedServerInfoGroup.builder() + .add(new ResolvedServerInfo(socketAddress, Attributes.EMPTY))) + .map(ResolvedServerInfoGroup.Builder::build).collect(Collectors.toList()); + } + + private void runInitialResolve() { + ConsulResponse> healthyServiceInstances = + this.healthClient.getHealthyServiceInstances(this.service, buildCatalogOptions()); + if (healthyServiceInstances.getResponse().isEmpty()) { + this.listener.onError(Status.UNAVAILABLE.augmentDescription(String.format( + "No servers could be resolved for service '%s' from authority '%s'", + this.service, this.authority))); + } + } + +} diff --git a/consulnameresolver/src/main/java/io/datanerds/grpc/ConsulNameResolverProvider.java b/consulnameresolver/src/main/java/io/datanerds/grpc/ConsulNameResolverProvider.java new file mode 100644 index 0000000..22cdf3a --- /dev/null +++ b/consulnameresolver/src/main/java/io/datanerds/grpc/ConsulNameResolverProvider.java @@ -0,0 +1,87 @@ +package io.datanerds.grpc; + +import com.google.common.base.Preconditions; +import io.grpc.Attributes; +import io.grpc.NameResolver; +import io.grpc.NameResolverProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.List; +import java.util.Map; + +public class ConsulNameResolverProvider extends NameResolverProvider { + private static final Logger logger = LoggerFactory.getLogger(ConsulNameResolverProvider.class); + + private static final String SCHEME = "consul"; + private static final String QUERY_PARAMETER_TAGS = "tag"; + private static final String QUERY_PARAMETER_DATACENTER = "dc"; + + @Override + protected boolean isAvailable() { + return true; + } + + @Override + protected int priority() { + return 5; + } + + @Override + public String getDefaultScheme() { + return SCHEME; + } + + @Override + public NameResolver newNameResolver(URI targetUri, Attributes params) { + //Contract is to return null if resolver is not able to handle uri + try { + return new ConsulNameResolver(validateInputURI(targetUri)); + } catch (IllegalArgumentException | NullPointerException ex) { + return null; + } + } + + private void validateScheme(URI uri) { + if (!SCHEME.equalsIgnoreCase(uri.getScheme())) { + throw new IllegalArgumentException("scheme for uri is not 'consul'"); + } + } + + protected ConsulQueryParameter validateInputURI(final URI uri) { + validateScheme(uri); + Preconditions.checkNotNull(uri.getHost(), "Host cannot be empty"); + Preconditions.checkNotNull(uri.getPath(), "Path cannot be empty"); + Preconditions.checkArgument(uri.getPath().startsWith("/"), + "the path component (%s) of the target (%s) must start with '/'", uri.getPath(), uri); + + Map> splitQuery = URIUtils.splitQuery(uri); + Map> queryParameters = URIUtils.splitQuery(uri); + Preconditions.checkArgument(!(queryParameters.containsKey(QUERY_PARAMETER_DATACENTER) + && queryParameters.get(QUERY_PARAMETER_DATACENTER).size() != 1), + "Only one datacenter can be defined"); + + String consul; + if (uri.getPort() == -1) { + consul = String.format("%s:%s", uri.getHost(), ConsulNameResolver.DEFAULT_PORT); + } else { + consul = String.format("%s:%s", uri.getHost(), uri.getPort()); + } + + String service = uri.getPath().substring(1); + + ConsulQueryParameter.Builder builder = new ConsulQueryParameter.Builder(service) + .withConsulAddress(consul); + + if (queryParameters.containsKey(QUERY_PARAMETER_DATACENTER)) { + builder.withDatacenter(queryParameters.get(QUERY_PARAMETER_DATACENTER).get(0)); + } + if (queryParameters.containsKey(QUERY_PARAMETER_TAGS)) { + builder.withTags(queryParameters.get(QUERY_PARAMETER_TAGS)); + } + + return builder.build(); + } + +} diff --git a/consulnameresolver/src/main/java/io/datanerds/grpc/ConsulQueryParameter.java b/consulnameresolver/src/main/java/io/datanerds/grpc/ConsulQueryParameter.java new file mode 100644 index 0000000..0976fa0 --- /dev/null +++ b/consulnameresolver/src/main/java/io/datanerds/grpc/ConsulQueryParameter.java @@ -0,0 +1,67 @@ +package io.datanerds.grpc; + +import java.util.*; + +public class ConsulQueryParameter { + public final Optional consulAddress; + public final String service; + public final Optional datacenter; + public final Optional> tags; + + private ConsulQueryParameter(Optional consulAddress, String service, + Optional datacenter, Optional> tags) { + this.consulAddress = consulAddress; + this.service = service; + this.datacenter = datacenter; + this.tags = tags; + } + + public static class Builder { + private String consulAddress; + private String service; + private String datacenter; + private Set tags; + + public Builder(String service) { + if (service == null) { + throw new RuntimeException("Service cannot be null"); + } + this.service = service; + this.tags = new HashSet<>(); + } + + public Builder withConsulAddress(String consulAddress) { + this.consulAddress = consulAddress; + return this; + } + + public Builder withDatacenter(String datacenter) { + this.datacenter = datacenter; + return this; + } + + public Builder withTag(String tag) { + this.tags.add(tag); + return this; + } + + public Builder withTags(List tags) { + if (tags != null) { + this.tags.addAll(tags); + } + return this; + } + + public ConsulQueryParameter build() { + Optional address = Optional.ofNullable(this.consulAddress); + Optional datacenter = Optional.ofNullable(this.datacenter); + Optional> tags; + if (this.tags.isEmpty()) { + tags = Optional.empty(); + } else { + tags = Optional.of(new ArrayList<>(this.tags)); + } + return new ConsulQueryParameter(address, this.service, datacenter, tags); + } + } +} \ No newline at end of file diff --git a/consulnameresolver/src/main/java/io/datanerds/grpc/URIUtils.java b/consulnameresolver/src/main/java/io/datanerds/grpc/URIUtils.java new file mode 100644 index 0000000..4852d80 --- /dev/null +++ b/consulnameresolver/src/main/java/io/datanerds/grpc/URIUtils.java @@ -0,0 +1,35 @@ +package io.datanerds.grpc; + +import com.google.common.base.Strings; + +import java.net.URI; +import java.util.*; +import java.util.stream.Collectors; + +import static java.util.Arrays.stream; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; + +public class URIUtils { + + public static Map> splitQuery(URI url) { + if (Strings.isNullOrEmpty(url.getQuery())) { + return Collections.emptyMap(); + } + + return stream(url.getQuery().split("&")) + .map(URIUtils::splitQueryParameter) + .filter(x -> !Strings.isNullOrEmpty(x.getValue())) + .filter(x -> "dc".equals(x.getKey()) || "tag".equals(x.getKey())) + .collect(Collectors.groupingBy(AbstractMap.SimpleImmutableEntry::getKey, + LinkedHashMap::new, mapping(Map.Entry::getValue, toList()) + )); + } + + private static AbstractMap.SimpleImmutableEntry splitQueryParameter(String it) { + int idx = it.indexOf("="); + String key = idx > 0 ? it.substring(0, idx) : it; + String value = idx > 0 && it.length() > idx + 1 ? it.substring(idx + 1) : null; + return new AbstractMap.SimpleImmutableEntry<>(key, value); + } +} diff --git a/consulnameresolver/src/test/java/io/datanerds/grpc/ConsulNameResolverProviderTest.java b/consulnameresolver/src/test/java/io/datanerds/grpc/ConsulNameResolverProviderTest.java new file mode 100644 index 0000000..0c3c36c --- /dev/null +++ b/consulnameresolver/src/test/java/io/datanerds/grpc/ConsulNameResolverProviderTest.java @@ -0,0 +1,104 @@ +package io.datanerds.grpc; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.net.URI; + +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; + +public class ConsulNameResolverProviderTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void basicChecks() throws Exception { + ConsulNameResolverProvider provider = new ConsulNameResolverProvider(); + assertTrue(provider.isAvailable()); + assertThat(5, is(equalTo(provider.priority()))); + assertThat("consul", is(equalTo(provider.getDefaultScheme()))); + } + + @Test + public void testSanitizeInputBadScheme() throws Exception { + URI input = new URI("test://localhost:1234/test_service"); + this.thrown.expect(IllegalArgumentException.class); + new ConsulNameResolverProvider().validateInputURI(input); + } + + @Test + public void testSanitizeInputBadHost() throws Exception { + URI input = new URI("test:///test_service"); + this.thrown.expect(IllegalArgumentException.class); + new ConsulNameResolverProvider().validateInputURI(input); + } + + @Test + public void testSanitizeInputBadPort() throws Exception { + URI input = new URI("test://localhost:100000/test_service"); + this.thrown.expect(IllegalArgumentException.class); + new ConsulNameResolverProvider().validateInputURI(input); + } + + @Test + public void testSanitizeInputBadService() throws Exception { + URI input = new URI("test://localhost:1234/"); + this.thrown.expect(IllegalArgumentException.class); + new ConsulNameResolverProvider().validateInputURI(input); + } + + @Test + public void testSanitizeInput() throws Exception { + URI input = new URI("consul://localhost:1234/test_service"); + ConsulQueryParameter params = new ConsulNameResolverProvider().validateInputURI(input); + + assertFalse(params.datacenter.isPresent()); + assertFalse(params.tags.isPresent()); + assertTrue(params.consulAddress.isPresent()); + assertThat("localhost:1234", is(equalTo(params.consulAddress.get()))); + assertThat("test_service", is(equalTo(params.service))); + } + + @Test + public void testSanitizeInputWithDC() throws Exception { + URI input = new URI("consul://localhost:1234/test_service?dc=dc1"); + ConsulQueryParameter params = new ConsulNameResolverProvider().validateInputURI(input); + + assertTrue(params.datacenter.isPresent()); + assertThat("dc1", is(equalTo(params.datacenter.get()))); + assertFalse(params.tags.isPresent()); + assertTrue(params.consulAddress.isPresent()); + assertThat("localhost:1234", is(equalTo(params.consulAddress.get()))); + assertThat("test_service", is(equalTo(params.service))); + } + + @Test + public void testSanitizeInputWithTags() throws Exception { + URI input = new URI("consul://localhost:1234/test_service?tag=foo&tag=bar&tag=baz"); + ConsulQueryParameter params = new ConsulNameResolverProvider().validateInputURI(input); + + assertFalse(params.datacenter.isPresent()); + assertTrue(params.tags.isPresent()); + assertThat(params.tags.get(), containsInAnyOrder("foo", "bar", "baz")); + assertTrue(params.consulAddress.isPresent()); + assertThat("localhost:1234", is(equalTo(params.consulAddress.get()))); + assertThat("test_service", is(equalTo(params.service))); + } + + @Test + public void testSanitizeInputWithDCandTags() throws Exception { + URI input = new URI("consul://localhost:1234/test_service?dc=dc1&tag=foo&tag=bar&tag=baz"); + ConsulQueryParameter params = new ConsulNameResolverProvider().validateInputURI(input); + + assertTrue(params.datacenter.isPresent()); + assertThat("dc1", is(equalTo(params.datacenter.get()))); + assertTrue(params.tags.isPresent()); + assertThat(params.tags.get(), containsInAnyOrder("foo", "bar", "baz")); + assertTrue(params.consulAddress.isPresent()); + assertThat("localhost:1234", is(equalTo(params.consulAddress.get()))); + assertThat("test_service", is(equalTo(params.service))); + } +} \ No newline at end of file diff --git a/consulnameresolver/src/test/java/io/datanerds/grpc/URIUtilsTest.java b/consulnameresolver/src/test/java/io/datanerds/grpc/URIUtilsTest.java new file mode 100644 index 0000000..94db734 --- /dev/null +++ b/consulnameresolver/src/test/java/io/datanerds/grpc/URIUtilsTest.java @@ -0,0 +1,76 @@ +package io.datanerds.grpc; + +import org.junit.Test; + +import java.net.URI; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; + +public class URIUtilsTest { + + @Test + public void splitQuery() throws Exception { + URI input = new URI("consul://localhost:1234/test_service"); + Map> result = URIUtils.splitQuery(input); + + assertThat(result.keySet(), hasSize(0)); + + } + + @Test + public void splitQueryWithEmptyDCandTags() throws Exception { + URI input = new URI("consul://localhost:1234/test_service?dc=&tag=&tag="); + Map> result = URIUtils.splitQuery(input); + + assertThat(result.keySet(), hasSize(0)); + } + + @Test + public void splitQueryWithDC() throws Exception { + URI input = new URI("consul://localhost:1234/test_service?dc=dc1"); + Map> result = URIUtils.splitQuery(input); + + assertThat(result.keySet(), hasSize(1)); + assertThat(result.get("dc"), hasSize(1)); + } + + @Test + public void splitQueryWithJunkParams() throws Exception { + URI input = new URI("consul://localhost:1234/test_service?foo=test&bar=baz"); + Map> result = URIUtils.splitQuery(input); + + assertThat(result.keySet(), hasSize(0)); + } + + @Test + public void splitQueryWithTag() throws Exception { + URI input = new URI("consul://localhost:1234/test_service?tag=foo"); + Map> result = URIUtils.splitQuery(input); + + assertThat(result.keySet(), hasSize(1)); + assertThat(result.get("tag"), hasSize(1)); + } + + @Test + public void splitQueryWithTags() throws Exception { + URI input = new URI("consul://localhost:1234/test_service?tag=foo&tag=bar&tag=baz"); + Map> result = URIUtils.splitQuery(input); + + assertThat(result.keySet(), hasSize(1)); + assertThat(result.get("tag"), hasSize(3)); + } + + @Test + public void splitQueryWithDCandTags() throws Exception { + URI input = new URI("consul://localhost:1234/test_service?dc=dc1&tag=foo&tag=bar&tag=baz"); + Map> result = URIUtils.splitQuery(input); + + assertThat(result.keySet(), hasSize(2)); + assertThat(result.get("dc"), hasSize(1)); + assertThat(result.get("tag"), hasSize(3)); + } + +} \ No newline at end of file diff --git a/deploy.sh b/deploy.sh new file mode 100755 index 0000000..11adc63 --- /dev/null +++ b/deploy.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +echo "Deploying artifact to maven central" +read -p "Are you sure you want to deploy? (yes/NO)? " -r DO_DEPLOYMENT + +if ( [ "$DO_DEPLOYMENT" == "yes" ] ); then + read -p "Enter the version (e.g. 0.0.1) " -r VERSION + if [ -z "$VERSION" ]; then + echo "Version cannot be empty!" + exit + fi + echo "Setting version to $VERSION" + + gradle clean build javadocJar sourcesJar uploadArchives closeRepository -Pversion="$VERSION" -Pstaging + + STATUS=$? + if [ "$STATUS" -eq 0 ]; then + echo "Pushing Tag to github" + git tag "v$VERSION" + git push --tags + else + echo "#################" + echo "## BUILD ERROR ##" + echo "#################" + fi +else + echo 'Canceled...' +fi diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..0bee6db Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..00a7e1a --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Tue Mar 21 14:08:08 PDT 2017 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-3.4-bin.zip diff --git a/gradlew b/gradlew new file mode 100755 index 0000000..4453cce --- /dev/null +++ b/gradlew @@ -0,0 +1,172 @@ +#!/usr/bin/env sh + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn ( ) { + echo "$*" +} + +die ( ) { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save ( ) { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 0000000..e95643d --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,84 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/integration-tests/build.gradle b/integration-tests/build.gradle new file mode 100644 index 0000000..0417b27 --- /dev/null +++ b/integration-tests/build.gradle @@ -0,0 +1,4 @@ +apply plugin: 'java' +dependencies { + testCompile([project(":consulnameresolver")]) +} \ No newline at end of file diff --git a/integration-tests/src/test/java/io/datanerds/grpc/ConsulNameResolverTest.java b/integration-tests/src/test/java/io/datanerds/grpc/ConsulNameResolverTest.java new file mode 100644 index 0000000..504ff40 --- /dev/null +++ b/integration-tests/src/test/java/io/datanerds/grpc/ConsulNameResolverTest.java @@ -0,0 +1,271 @@ +package io.datanerds.grpc; + +import com.google.common.net.HostAndPort; +import com.orbitz.consul.*; +import com.orbitz.consul.model.health.ServiceHealth; +import com.pszymczyk.consul.junit.ConsulResource; +import io.datanerds.grpc.ConsulQueryParameter.Builder; +import io.datanerds.grpc.integrationtests.PingPongGrpc; +import io.datanerds.grpc.integrationtests.PingPongProto; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.UUID; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +public class ConsulNameResolverTest { + private static final Logger logger = LoggerFactory.getLogger(ConsulNameResolverTest.class); + + @ClassRule + public static final ConsulResource consulResource = new ConsulResource(); + + @Test + public void checkIfConsulIsUpAndHealthy() throws Exception { + await().atMost(15, SECONDS).until(() -> { + try { + Consul consul = Consul.builder().withHostAndPort(HostAndPort.fromParts("localhost", consulResource + .getHttpPort())).build(); + HealthClient healthClient = consul.healthClient(); + List nodes = healthClient.getHealthyServiceInstances("consul").getResponse(); + return !nodes.isEmpty(); + } catch (ConsulException ex) { + logger.debug("Can't connect to Consul", ex); + return false; + } + }); + } + + @Test + public void checkPingPongServer() throws Exception { + int port = 55555; + PingPongServer server = new PingPongServer(port); + server.start(); + + ManagedChannel channel = ManagedChannelBuilder + .forTarget("localhost:" + port) + .usePlaintext(true) + .build(); + + PingPongGrpc.PingPongBlockingStub blockingStub = PingPongGrpc.newBlockingStub(channel); + PingPongProto.Ping ping = PingPongProto.Ping.newBuilder().setPing("PING").build(); + PingPongProto.Pong pong = blockingStub.pingPong(ping); + assertThat(String.format("PONG-%d: %s", port, ping.getPing()), is(equalTo(pong.getPong()))); + server.stop(); + } + + @Test + public void simpleConsulLookup() throws Exception { + consulResource.reset(); + int port = 55556; + PingPongServer server = new PingPongServer(port); + server.start(); + + registerConsulService(consulResource.getHttpPort(), "test_v1", port, "test_v1"); + ManagedChannel channel = ManagedChannelBuilder + .forTarget(String.format("consul://localhost:%d/%s", consulResource.getHttpPort(), "test_v1")) + .nameResolverFactory( + new ConsulNameResolverProvider() + ) + .usePlaintext(true) + .build(); + + PingPongGrpc.PingPongBlockingStub blockingStub = PingPongGrpc.newBlockingStub(channel); + PingPongProto.Ping ping = PingPongProto.Ping.newBuilder().setPing("PING").build(); + PingPongProto.Pong pong = blockingStub.pingPong(ping); + assertThat(String.format("PONG-%d: %s", port, ping.getPing()), is(equalTo(pong.getPong()))); + server.stop(); + } + + @Test + public void consulLookupWithTags() throws Exception { + consulResource.reset(); + + int port1 = 55555; + int port2 = 55556; + String serviceName = "test_v1"; + String serviceId = UUID.randomUUID().toString(); + + String[] tags = {"foo", "bar", "baz"}; + + ConsulQueryParameter params = new Builder(serviceName) + .withConsulAddress("localhost:" + consulResource.getHttpPort()) + .withTag("foo") + .withTag("bar") + .withTag("baz") + .build(); + + registerConsulService(consulResource.getHttpPort(), serviceName, port1, serviceId, tags); + + MockListener listener = new MockListener(); + ConsulNameResolver resolver = new ConsulNameResolver(params); + resolver.start(listener); + await().atMost(5, SECONDS).until(listener.getServers()::size, is(1)); + + registerConsulService(consulResource.getHttpPort(), serviceName, port2, UUID.randomUUID().toString(), tags); + await().atMost(5, SECONDS).until(listener.getServers()::size, is(2)); + + resolver.shutdown(); + } + + @Test + public void consulLookupWithWrongTags() throws Exception { + consulResource.reset(); + + int port1 = 55555; + int port2 = 55556; + String serviceName = "test_v1"; + + String[] tags = {"foo", "bar", "baz"}; + + ConsulQueryParameter params = new Builder(serviceName) + .withConsulAddress("localhost:" + consulResource.getHttpPort()) + .withTag("OTHER TAG") + .withTag("ANOTHER TAG") + .withTag("SOMETHING ELSE") + .build(); + + registerConsulService(consulResource.getHttpPort(), serviceName, port1, UUID.randomUUID().toString(), tags); + + MockListener listener = new MockListener(); + ConsulNameResolver resolver = new ConsulNameResolver(params); + resolver.start(listener); + + await().atMost(5, SECONDS).until(listener.getErrors()::size, greaterThanOrEqualTo(1)); + resolver.shutdown(); + } + + @Test + public void consulLookupWithPartiallyMatchingTags() throws Exception { + consulResource.reset(); + + int port1 = 55555; + int port2 = 55556; + String serviceName = "test_v1"; + + String[] tags = {"foo", "bar", "baz"}; + + ConsulQueryParameter params = new Builder(serviceName) + .withConsulAddress("localhost:" + consulResource.getHttpPort()) + .withTag("foo") + .build(); + + registerConsulService(consulResource.getHttpPort(), serviceName, port1, UUID.randomUUID().toString(), tags); + + MockListener listener = new MockListener(); + ConsulNameResolver resolver = new ConsulNameResolver(params); + resolver.start(listener); + + await().atMost(5, SECONDS).until(listener.getServers()::size, is(1)); + resolver.shutdown(); + } + + @Test + public void multiConsulLookup() throws Exception { + consulResource.reset(); + + int port1 = 55555; + int port2 = 55556; + String serviceName = "test_v1"; + + ConsulQueryParameter params = new Builder(serviceName) + .withConsulAddress("localhost:" + consulResource.getHttpPort()) + .build(); + + registerConsulService(consulResource.getHttpPort(), serviceName, port1, UUID.randomUUID().toString()); + + MockListener listener = new MockListener(); + ConsulNameResolver resolver = new ConsulNameResolver(params); + resolver.start(listener); + await().atMost(5, SECONDS).until(listener.getServers()::size, is(1)); + + registerConsulService(consulResource.getHttpPort(), serviceName, port2, UUID.randomUUID().toString()); + await().atMost(5, SECONDS).until(listener.getServers()::size, is(2)); + + resolver.shutdown(); + } + + @Test + public void removeNodesFromConsul() throws Exception { + consulResource.reset(); + + int port1 = 55555; + int port2 = 55556; + int port3 = 55557; + + String id1 = UUID.randomUUID().toString(); + String id2 = UUID.randomUUID().toString(); + String id3 = UUID.randomUUID().toString(); + + String serviceName = "test_v1"; + + ConsulQueryParameter params = new Builder(serviceName) + .withConsulAddress("localhost:" + consulResource.getHttpPort()) + .build(); + + String url = String.format("%s://%s:%d/%s?dc=dc1", "consul", "localhost", consulResource + .getHttpPort(), serviceName); + + registerConsulService(consulResource.getHttpPort(), serviceName, port1, id1); + registerConsulService(consulResource.getHttpPort(), serviceName, port2, id2); + registerConsulService(consulResource.getHttpPort(), serviceName, port3, id3); + + MockListener listener = new MockListener(); + + ConsulNameResolver resolver = new ConsulNameResolver(params); + resolver.start(listener); + + assertThat(listener.getErrors(), hasSize(0)); + + await().atMost(5, SECONDS).until(listener.getServers()::size, is(3)); + + unregisterConsulService(consulResource.getHttpPort(), id3); + await().atMost(5, SECONDS).until(listener.getServers()::size, is(2)); + + unregisterConsulService(consulResource.getHttpPort(), id2); + await().atMost(5, SECONDS).until(listener.getServers()::size, is(1)); + + unregisterConsulService(consulResource.getHttpPort(), id1); + await().atMost(5, SECONDS).until(listener.getServers()::size, is(0)); + + assertThat(listener.getErrors(), not(hasSize(0))); + resolver.shutdown(); + } + + private void registerConsulService(int consulPort, String serviceName, int servicePort, String serviceId) + throws NotRegisteredException { + this.registerConsulService(consulPort, serviceName, servicePort, serviceId, new String[0]); + } + + private void registerConsulService(int consulPort, String serviceName, int servicePort, String serviceId, + String... tags) + throws NotRegisteredException { + Consul consul = Consul.builder().withHostAndPort(HostAndPort.fromParts("localhost", consulPort)).build(); + AgentClient agentClient = consul.agentClient(); + agentClient.register(servicePort, 60L, serviceName, serviceId, tags); + agentClient.pass(serviceId); + } + + private boolean isConsulServiceRegistered(int consulPort, String serviceId) { + Consul consul = Consul.builder().withHostAndPort(HostAndPort.fromParts("localhost", consulPort)).build(); + AgentClient agentClient = consul.agentClient(); + boolean isRegistered = agentClient.isRegistered(serviceId); + return isRegistered; + } + + private void unregisterConsulService(int consulPort, String serviceId) + throws NotRegisteredException { + Consul consul = Consul.builder().withHostAndPort(HostAndPort.fromParts("localhost", consulPort)).build(); + AgentClient agentClient = consul.agentClient(); + agentClient.deregister(serviceId); + } + +} diff --git a/integration-tests/src/test/java/io/datanerds/grpc/MockListener.java b/integration-tests/src/test/java/io/datanerds/grpc/MockListener.java new file mode 100644 index 0000000..4375a47 --- /dev/null +++ b/integration-tests/src/test/java/io/datanerds/grpc/MockListener.java @@ -0,0 +1,40 @@ +package io.datanerds.grpc; + +import io.grpc.Attributes; +import io.grpc.NameResolver; +import io.grpc.ResolvedServerInfoGroup; +import io.grpc.Status; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class MockListener implements NameResolver.Listener { + private List servers = Collections.synchronizedList(new ArrayList<>()); + private Attributes attributes; + private List errors = Collections.synchronizedList(new ArrayList<>()); + + @Override + public void onUpdate(List servers, Attributes attributes) { + this.servers.clear(); + this.servers.addAll(servers); + this.attributes = attributes; + } + + @Override + public void onError(Status error) { + this.errors.add(error); + } + + public List getServers() { + return this.servers; + } + + public Attributes getAttributes() { + return this.attributes; + } + + public List getErrors() { + return this.errors; + } +} diff --git a/integration-tests/src/test/java/io/datanerds/grpc/PingPongServer.java b/integration-tests/src/test/java/io/datanerds/grpc/PingPongServer.java new file mode 100644 index 0000000..ed1aa54 --- /dev/null +++ b/integration-tests/src/test/java/io/datanerds/grpc/PingPongServer.java @@ -0,0 +1,63 @@ +package io.datanerds.grpc; + +import io.datanerds.grpc.integrationtests.PingPongGrpc; +import io.datanerds.grpc.integrationtests.PingPongProto; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class PingPongServer { + private static final Logger logger = LoggerFactory.getLogger(PingPongServer.class); + private Server server; + private final int port; + + public PingPongServer() { + this(55551); + } + + public PingPongServer(int port) { + this.port = port; + } + + public void start() throws IOException { + this.server = ServerBuilder.forPort(this.port) + .addService(new PingPongGrpc.PingPongImplBase() { + @Override + public void pingPong(PingPongProto.Ping request, + StreamObserver responseObserver) { + String pong = String.format("PONG-%d: %s", PingPongServer.this.port, request.getPing()); + PingPongProto.Pong response = PingPongProto.Pong.newBuilder().setPong(pong).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + }) + .build() + .start(); + logger.info("Server started, listening on " + this.port); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + PingPongServer.this.stop(); + System.err.println("*** server shut down"); + } + }); + } + + public void stop() { + if (this.server != null) { + this.server.shutdown(); + } + } + + private void blockUntilShutdown() throws InterruptedException { + if (this.server != null) { + this.server.awaitTermination(); + } + } +} diff --git a/integration-tests/src/test/proto/PingPong.proto b/integration-tests/src/test/proto/PingPong.proto new file mode 100644 index 0000000..27d8587 --- /dev/null +++ b/integration-tests/src/test/proto/PingPong.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package io; + +import "google/protobuf/descriptor.proto"; + +option java_package = "io.datanerds.grpc.integrationtests"; +option java_outer_classname = "PingPongProto"; + + +service PingPong { + rpc PingPong (Ping) returns (Pong) { + } +} + +message Ping { + string ping = 1; +} + +message Pong { + string pong = 1; +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle new file mode 100644 index 0000000..47c3b14 --- /dev/null +++ b/settings.gradle @@ -0,0 +1,5 @@ +rootProject.name = 'ConsulNameResolver' +include 'consulnameresolver' +include 'integration-tests' + +