57
57
import java .io .File ;
58
58
import java .io .IOException ;
59
59
import java .net .URI ;
60
+ import java .util .List ;
60
61
import java .util .Properties ;
61
62
import java .util .UUID ;
62
63
import java .util .concurrent .CountDownLatch ;
@@ -117,35 +118,40 @@ public static void setUp(final Vertx vertx, final VertxTestContext context) thro
117
118
118
119
/*
119
120
1: event sent by the source to the Broker
120
- 2: event sent by the service in the response
121
+ 2: event sent by the trigger 1 in the response
122
+ 3: event sent by the trigger 2 in the response
121
123
2
122
124
+----------------------+
123
125
| |
124
126
| +-----+-----+
125
127
| 1 | |
126
128
| +---------->+ Trigger 1 |
127
- v | | |
129
+ v | 3 | |
128
130
+------------+ +-------------+ +-------+----+----+ +-----------+
129
131
| | 1 | | 2 | |
130
132
| HTTPClient +--------->+ Receiver | +--------+ Dispatcher |
131
133
| | | | | | |
132
134
+------------+ +------+------+ | +--------+---+----+ +-----------+
133
- | | ^ | | |
135
+ | | ^ | 3 | |
134
136
| v | +---------->+ Trigger 2 |
135
137
1 | +--------+--------+ | 2 | |
136
138
| | | 1 | +-----------+
137
139
+----->+ Kafka +--------+
138
140
| | 2 +-----------+
139
- +-----------------+ | |
141
+ +-----------------+ 3 | |
140
142
| Trigger 3 |
141
143
| |
142
144
+-----------+
145
+
146
+
147
+
148
+
143
149
*/
144
150
@ Test
145
151
@ Timeout (timeUnit = TimeUnit .MINUTES , value = 1 )
146
- public void execute (final Vertx vertx , final VertxTestContext context ) {
152
+ public void execute (final Vertx vertx , final VertxTestContext context ) throws InterruptedException {
147
153
148
- final var checkpoints = context .checkpoint (3 );
154
+ final var checkpoints = context .checkpoint (4 );
149
155
150
156
// event sent by the source to the Broker (see 1 in diagram)
151
157
final var expectedRequestEvent = CloudEventBuilder .v1 ()
@@ -158,7 +164,7 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
158
164
.build ();
159
165
160
166
// event sent in the response by the Callable service (see 2 in diagram)
161
- final var expectedResponseEvent = CloudEventBuilder .v03 ()
167
+ final var expectedResponseEventService2 = CloudEventBuilder .v03 ()
162
168
.withId (UUID .randomUUID ().toString ())
163
169
.withDataSchema (URI .create ("/api/data-schema-ce-2" ))
164
170
.withSubject ("subject-ce-2" )
@@ -167,6 +173,20 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
167
173
.withType (TYPE_CE_2 )
168
174
.build ();
169
175
176
+ // event sent in the response by the Callable service 2 (see 3 in diagram)
177
+ final var expectedResponseEventService1 = CloudEventBuilder .v1 ()
178
+ .withId (UUID .randomUUID ().toString ())
179
+ .withDataSchema (URI .create ("/api/data-schema-ce-3" ))
180
+ .withSource (URI .create ("/api/rossi" ))
181
+ .withSubject ("subject-ce-3" )
182
+ .withType (TYPE_CE_1 )
183
+ .build ();
184
+
185
+ final var service1ExpectedEventsIterator = List .of (
186
+ expectedRequestEvent ,
187
+ expectedResponseEventService1
188
+ ).iterator ();
189
+
170
190
final var resource = DataPlaneContract .Resource .newBuilder ()
171
191
.addTopics (TOPIC )
172
192
.setIngress (DataPlaneContract .Ingress .newBuilder ().setPath (format ("/%s/%s" , BROKER_NAMESPACE , BROKER_NAME )))
@@ -207,9 +227,13 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
207
227
new ContractPublisher (vertx .eventBus (), ResourcesReconcilerMessageHandler .ADDRESS )
208
228
.accept (DataPlaneContract .Contract .newBuilder ().addResources (resource ).build ());
209
229
210
- await ().atMost (10 , TimeUnit .SECONDS ).untilAsserted (() -> assertThat (vertx .deploymentIDs ())
230
+ await ()
231
+ .atMost (10 , TimeUnit .SECONDS )
232
+ .untilAsserted (() -> assertThat (vertx .deploymentIDs ())
211
233
.hasSize (resource .getEgressesCount () + NUM_RESOURCES + NUM_SYSTEM_VERTICLES ));
212
234
235
+ Thread .sleep (2000 ); // Give consumers time to start
236
+
213
237
// start service
214
238
vertx .createHttpServer ()
215
239
.exceptionHandler (context ::failNow )
@@ -221,22 +245,29 @@ public void execute(final Vertx vertx, final VertxTestContext context) {
221
245
222
246
// service 1 receives event sent by the HTTPClient
223
247
if (request .path ().equals (PATH_SERVICE_1 )) {
248
+ final var expectedEvent = service1ExpectedEventsIterator .next ();
224
249
context .verify (() -> {
225
- assertThat (event ).isEqualTo (expectedRequestEvent );
250
+ assertThat (event ).isEqualTo (expectedEvent );
226
251
checkpoints .flag (); // 2
227
252
});
228
253
229
- // write event to the response, the event will be handled by service 2
230
- VertxMessageFactory .createWriter (request .response ())
231
- .writeBinary (expectedResponseEvent );
254
+ if (service1ExpectedEventsIterator .hasNext ()) {
255
+ // write event to the response, the event will be handled by service 2
256
+ VertxMessageFactory .createWriter (request .response ())
257
+ .writeBinary (expectedResponseEventService2 );
258
+ }
232
259
}
233
260
234
261
// service 2 receives event in the response
235
262
if (request .path ().equals (PATH_SERVICE_2 )) {
236
263
context .verify (() -> {
237
- assertThat (event ).isEqualTo (expectedResponseEvent );
264
+ assertThat (event ).isEqualTo (expectedResponseEventService2 );
238
265
checkpoints .flag (); // 3
239
266
});
267
+
268
+ // write event to the response, the event will be handled by service 2
269
+ VertxMessageFactory .createWriter (request .response ())
270
+ .writeBinary (expectedResponseEventService1 );
240
271
}
241
272
242
273
if (request .path ().equals (PATH_SERVICE_3 )) {
0 commit comments