1818import com .google .common .annotations .VisibleForTesting ;
1919import com .google .common .base .Preconditions ;
2020import com .google .common .collect .ImmutableSet ;
21- import com .twitter .common .base .Command ;
2221import com .twitter .common .base .MorePreconditions ;
23- import com .twitter .common .zookeeper .Group ;
24- import com .twitter .common .zookeeper .ServerSet ;
25- import com .twitter .thrift .Endpoint ;
26- import com .twitter .thrift .ServiceInstance ;
27- import com .twitter .thrift .Status ;
2822import com .twitter .util .ExceptionalFunction ;
29- import org .apache .commons .lang .StringUtils ;
3023
3124import java .io .BufferedReader ;
3225import java .io .ByteArrayInputStream ;
3528import java .io .InputStream ;
3629import java .io .InputStreamReader ;
3730import java .net .InetSocketAddress ;
38- import java .util .Collections ;
39- import java .util .Map ;
4031
4132/**
4233 * Implementation of the ServerSet interface that uses a file on local disk instead of talking to ZooKeeper directly.
4738 * Note that this implementation only supports monitor() and not join(). Use the standard ZooKeeper implementation for
4839 * join().
4940 */
50- public class ConfigFileServerSet implements ServerSet {
41+ public class ConfigFileServerSet {
5142
5243 // made public for unit testing only
5344 public static String SERVERSET_DIR = "/var/serverset" ;
@@ -83,35 +74,7 @@ public ConfigFileServerSet(String serverSetFilePath) {
8374 }
8475 }
8576
86- @ Override
87- public EndpointStatus join (
88- InetSocketAddress endpoint , Map <String , InetSocketAddress > additionalEndpoints , Status status )
89- throws Group .JoinException , InterruptedException {
90- throw new UnsupportedOperationException ("ConfigFileServerSet does not support join()" );
91- }
92-
93- @ Override
94- public EndpointStatus join (
95- InetSocketAddress endpoint , Map <String , InetSocketAddress > additionalEndpoints )
96- throws Group .JoinException , InterruptedException {
97- throw new UnsupportedOperationException ("ConfigFileServerSet does not support join()" );
98- }
99-
100- @ Override
101- public EndpointStatus join (
102- InetSocketAddress endpoint , Map <String , InetSocketAddress > additionalEndpoints , int shardId )
103- throws Group .JoinException , InterruptedException {
104- throw new UnsupportedOperationException ("ConfigFileServerSet does not support join()" );
105- }
106-
107- @ Override
108- public Command watch (final HostChangeMonitor <ServiceInstance > monitor ) throws MonitorException {
109- monitor (monitor );
110- return null ;
111- }
112-
113- @ Override
114- public void monitor (final HostChangeMonitor <ServiceInstance > monitor ) throws MonitorException {
77+ public void monitor (final ServersetMonitor monitor ) throws Exception {
11578 Preconditions .checkNotNull (monitor );
11679 try {
11780 // Each call to monitor registers a new file watch. This is a bit inefficient if there
@@ -122,30 +85,30 @@ public void monitor(final HostChangeMonitor<ServiceInstance> monitor) throws Mon
12285 new ExceptionalFunction <byte [], Void >() {
12386 @ Override
12487 public Void applyE (byte [] newContents ) throws Exception {
125- ImmutableSet <ServiceInstance > newServerSet = readServerSet (newContents );
88+ ImmutableSet <InetSocketAddress > newServerSet = readServerSet (newContents );
12689 monitor .onChange (newServerSet );
12790 return null ;
12891 }
12992 });
13093 } catch (IOException e ) {
131- throw new MonitorException (
94+ throw new Exception (
13295 "Error setting up watch on dynamic server set file:" + serverSetFilePath , e );
13396 }
13497 }
13598
136- protected Endpoint getEndPointFromServerSetLine (String line ) {
99+ protected InetSocketAddress getEndPointFromServerSetLine (String line ) {
137100 // We expect each line to be of the form "hostname:port". Note that host names can
138101 // contain ':' themselves (e.g. ipv6 addresses).
139102 int index = line .lastIndexOf (':' );
140103 Preconditions .checkArgument (index > 0 && index < line .length () - 1 );
141104
142105 String host = line .substring (0 , index );
143106 int port = Integer .parseInt (line .substring (index + 1 ));
144- return new Endpoint (host , port );
107+ return new InetSocketAddress (host , port );
145108 }
146109
147- public ImmutableSet <ServiceInstance > readServerSet (byte [] fileContent ) throws IOException {
148- ImmutableSet .Builder <ServiceInstance > builder = new ImmutableSet .Builder <>();
110+ public ImmutableSet <InetSocketAddress > readServerSet (byte [] fileContent ) throws IOException {
111+ ImmutableSet .Builder <InetSocketAddress > builder = new ImmutableSet .Builder <>();
149112 InputStream stream = new ByteArrayInputStream (fileContent );
150113 BufferedReader reader = new BufferedReader (new InputStreamReader (stream ));
151114 while (true ) {
@@ -158,12 +121,9 @@ public ImmutableSet<ServiceInstance> readServerSet(byte[] fileContent) throws IO
158121 continue ;
159122 }
160123
161- Endpoint endpoint = getEndPointFromServerSetLine (line );
124+ InetSocketAddress endpoint = getEndPointFromServerSetLine (line );
162125 if (endpoint != null ) {
163- builder .add (new ServiceInstance (
164- endpoint , // endpoint
165- Collections .<String , Endpoint >emptyMap (), // additional endpoints
166- Status .ALIVE )); // status
126+ builder .add (endpoint );
167127 }
168128 }
169129 return builder .build ();
0 commit comments