22
22
import kafka .server .KafkaServerStartable ;
23
23
import org .apache .curator .test .InstanceSpec ;
24
24
import org .apache .curator .test .TestingServer ;
25
- import org .junit .rules .TestRule ;
26
- import org .junit .runner .Description ;
27
- import org .junit .runners .model .Statement ;
25
+ import org .junit .rules .ExternalResource ;
28
26
import org .slf4j .Logger ;
29
27
import org .slf4j .LoggerFactory ;
30
28
39
37
/**
40
38
* Starts up a local Zookeeper and a Kafka broker
41
39
*/
42
- public class KafkaJunitRule implements TestRule {
40
+ public class KafkaJunitRule extends ExternalResource {
43
41
44
42
private static final Logger LOGGER = LoggerFactory .getLogger (KafkaJunitRule .class );
45
43
46
44
private TestingServer zookeeper ;
47
45
private KafkaServerStartable kafkaServer ;
48
46
47
+ private int zookeeperPort ;
48
+ private String zookeeperConnectionString ;
49
49
private int kafkaPort = 9092 ;
50
50
private Path kafkaLogDir ;
51
51
52
- @ Override
53
- public Statement apply (final Statement statement , Description description ) {
54
- return new Statement () {
55
- @ Override
56
- public void evaluate () throws Throwable {
57
- try {
58
- startKafkaServer ();
59
- statement .evaluate ();
60
- } finally {
61
- stopKafkaServer ();
62
- }
63
- }
64
- };
65
- }
66
52
67
- private void startKafkaServer () throws Exception {
53
+ @ Override
54
+ protected void before () throws Throwable {
68
55
zookeeper = new TestingServer (true );
69
- String zkQuorumStr = zookeeper .getConnectString ();
70
- KafkaConfig kafkaConfig = buildKafkaConfig (zkQuorumStr );
56
+ zookeeperPort = zookeeper .getPort ();
57
+ zookeeperConnectionString = zookeeper .getConnectString ();
58
+ KafkaConfig kafkaConfig = buildKafkaConfig (zookeeperConnectionString );
71
59
72
60
LOGGER .info ("Starting Kafka server with config: {}" , kafkaConfig .props ().props ());
73
61
kafkaServer = new KafkaServerStartable (kafkaConfig );
74
62
kafkaServer .startup ();
75
63
}
76
64
65
+ @ Override
66
+ protected void after () {
67
+ try {
68
+ if (kafkaServer != null ) {
69
+ LOGGER .info ("Shutting down Kafka Server" );
70
+ kafkaServer .shutdown ();
71
+ }
72
+
73
+ if (zookeeper != null ) {
74
+ LOGGER .info ("Shutting down Zookeeper" );
75
+ zookeeper .close ();
76
+ }
77
+
78
+ if (Files .exists (kafkaLogDir )) {
79
+ LOGGER .info ("Deleting the log dir: {}" , kafkaLogDir );
80
+ Files .walkFileTree (kafkaLogDir , new SimpleFileVisitor <Path >() {
81
+ @ Override
82
+ public FileVisitResult visitFile (Path file , BasicFileAttributes attrs ) throws IOException {
83
+ Files .deleteIfExists (file );
84
+ return FileVisitResult .CONTINUE ;
85
+ }
86
+
87
+ @ Override
88
+ public FileVisitResult postVisitDirectory (Path dir , IOException exc ) throws IOException {
89
+ Files .deleteIfExists (dir );
90
+ return FileVisitResult .CONTINUE ;
91
+ }
92
+ });
93
+ }
94
+ }
95
+ catch (Exception e ){
96
+ LOGGER .error ("Failed to clean-up Kafka" ,e );
97
+ }
98
+ }
99
+
77
100
private KafkaConfig buildKafkaConfig (String zookeeperQuorum ) throws IOException {
78
101
kafkaLogDir = Files .createTempDirectory ("kafka_junit" );
79
102
kafkaPort = InstanceSpec .getRandomPort ();
@@ -88,34 +111,6 @@ private KafkaConfig buildKafkaConfig(String zookeeperQuorum) throws IOException
88
111
return new KafkaConfig (props );
89
112
}
90
113
91
- private void stopKafkaServer () throws IOException {
92
- if (kafkaServer != null ) {
93
- LOGGER .info ("Shutting down Kafka Server" );
94
- kafkaServer .shutdown ();
95
- }
96
-
97
- if (zookeeper != null ) {
98
- LOGGER .info ("Shutting down Zookeeper" );
99
- zookeeper .close ();
100
- }
101
-
102
- if (Files .exists (kafkaLogDir )) {
103
- LOGGER .info ("Deleting the log dir: {}" , kafkaLogDir );
104
- Files .walkFileTree (kafkaLogDir , new SimpleFileVisitor <Path >() {
105
- @ Override
106
- public FileVisitResult visitFile (Path file , BasicFileAttributes attrs ) throws IOException {
107
- Files .deleteIfExists (file );
108
- return FileVisitResult .CONTINUE ;
109
- }
110
-
111
- @ Override
112
- public FileVisitResult postVisitDirectory (Path dir , IOException exc ) throws IOException {
113
- Files .deleteIfExists (dir );
114
- return FileVisitResult .CONTINUE ;
115
- }
116
- });
117
- }
118
- }
119
114
120
115
/**
121
116
* Create a producer configuration.
@@ -140,7 +135,7 @@ public ProducerConfig producerConfig() {
140
135
*/
141
136
public ConsumerConfig consumerConfig () {
142
137
Properties props = new Properties ();
143
- props .put ("zookeeper.connect" , zookeeper . getConnectString () );
138
+ props .put ("zookeeper.connect" , zookeeperConnectionString );
144
139
props .put ("group.id" , "kafka-junit-consumer" );
145
140
props .put ("zookeeper.session.timeout.ms" , "400" );
146
141
props .put ("zookeeper.sync.time.ms" , "200" );
@@ -170,14 +165,14 @@ public int kafkaBrokerPort(){
170
165
* @return zookeeper port
171
166
*/
172
167
public int zookeeperPort (){
173
- return zookeeper . getPort () ;
168
+ return zookeeperPort ;
174
169
}
175
170
176
171
/**
177
172
* Get the zookeeper connection string
178
173
* @return zookeeper connection string
179
174
*/
180
175
public String zookeeperConnectionString (){
181
- return zookeeper . getConnectString () ;
176
+ return zookeeperConnectionString ;
182
177
}
183
178
}
0 commit comments