Skip to content

Commit d81b70c

Browse files
authored
feat: support for most of the TP tck test (#5031)
1 parent a8a3da7 commit d81b70c

File tree

9 files changed

+163
-140
lines changed

9 files changed

+163
-140
lines changed

core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.eclipse.edc.spi.query.Criterion;
3333
import org.eclipse.edc.spi.response.StatusResult;
3434
import org.eclipse.edc.spi.result.Result;
35+
import org.eclipse.edc.spi.result.ServiceFailure;
3536
import org.eclipse.edc.spi.types.domain.transfer.DataFlowProvisionMessage;
3637
import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage;
3738
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
@@ -61,6 +62,7 @@
6162
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.PROVISIONING;
6263
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.RECEIVED;
6364
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.STARTED;
65+
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.SUSPENDED;
6466
import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState;
6567
import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR;
6668
import static org.eclipse.edc.spi.result.Result.success;
@@ -293,7 +295,11 @@ private StatusResult<DataFlow> stop(DataFlow dataFlow, String reason) {
293295
} else {
294296
var revokeResult = authorizationService.revokeEndpointDataReference(dataFlow.getId(), reason);
295297
if (revokeResult.failed()) {
296-
return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlow.getId(), revokeResult.getFailureDetail()));
298+
if (dataFlow.getState() == SUSPENDED.code() && revokeResult.reason().equals(ServiceFailure.Reason.NOT_FOUND)) {
299+
monitor.warning("Revoking an EDR for DataFlow '%s' in state suspended returned not found error. This may indicate that the EDR was already revoked when it was suspended".formatted(dataFlow.getId()));
300+
} else {
301+
return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlow.getId(), revokeResult.getFailureDetail()));
302+
}
297303
}
298304
}
299305

@@ -472,7 +478,7 @@ private boolean processFailed(DataFlow dataFlow) {
472478
private Processor processDataFlowInState(DataFlowStates state, Function<DataFlow, Boolean> function, Supplier<Criterion>... additionalCriteria) {
473479
Supplier<Collection<DataFlow>> entitiesSupplier = () -> {
474480
var additional = Arrays.stream(additionalCriteria).map(Supplier::get);
475-
var filter = Stream.concat(Stream.of(new Criterion[]{ hasState(state.code()) }), additional)
481+
var filter = Stream.concat(Stream.of(new Criterion[]{hasState(state.code())}), additional)
476482
.toArray(Criterion[]::new);
477483
return store.nextNotLeased(batchSize, filter);
478484
};

core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java

Lines changed: 137 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.eclipse.edc.spi.response.ResponseFailure;
3434
import org.eclipse.edc.spi.response.StatusResult;
3535
import org.eclipse.edc.spi.result.Result;
36+
import org.eclipse.edc.spi.result.ServiceResult;
3637
import org.eclipse.edc.spi.result.StoreResult;
3738
import org.eclipse.edc.spi.retry.ExponentialWaitStrategy;
3839
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
@@ -131,6 +132,126 @@ public void setUp() {
131132
.build();
132133
}
133134

135+
@Test
136+
void completed_shouldNotifyResultToControlPlane() {
137+
var dataFlow = dataFlowBuilder().state(COMPLETED.code()).build();
138+
when(store.nextNotLeased(anyInt(), stateIs(COMPLETED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
139+
when(transferProcessApiClient.completed(any())).thenReturn(StatusResult.success());
140+
141+
manager.start();
142+
143+
await().untilAsserted(() -> {
144+
verify(transferProcessApiClient).completed(any());
145+
verify(store).save(argThat(it -> it.getState() == NOTIFIED.code()));
146+
});
147+
}
148+
149+
@Test
150+
void completed_shouldTransitionToCompleted_whenRetryableError() {
151+
var dataFlow = dataFlowBuilder().state(COMPLETED.code()).build();
152+
when(store.nextNotLeased(anyInt(), stateIs(COMPLETED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
153+
when(transferProcessApiClient.completed(any())).thenReturn(StatusResult.failure(ERROR_RETRY));
154+
155+
manager.start();
156+
157+
await().untilAsserted(() -> {
158+
verify(transferProcessApiClient).completed(any());
159+
verify(store).save(argThat(it -> it.getState() == COMPLETED.code()));
160+
});
161+
}
162+
163+
@Test
164+
void completed_shouldTransitionToTerminated_whenFatalError() {
165+
var dataFlow = dataFlowBuilder().state(COMPLETED.code()).build();
166+
when(store.nextNotLeased(anyInt(), stateIs(COMPLETED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
167+
when(transferProcessApiClient.completed(any())).thenReturn(StatusResult.failure(FATAL_ERROR));
168+
169+
manager.start();
170+
171+
await().untilAsserted(() -> {
172+
verify(transferProcessApiClient).completed(any());
173+
verify(store).save(argThat(it -> it.getState() == TERMINATED.code()));
174+
});
175+
}
176+
177+
@Test
178+
void failed_shouldNotifyResultToControlPlane() {
179+
var dataFlow = dataFlowBuilder().state(FAILED.code()).errorDetail("an error").build();
180+
when(store.nextNotLeased(anyInt(), stateIs(FAILED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
181+
when(store.findById(any())).thenReturn(dataFlow);
182+
183+
when(transferProcessApiClient.failed(any(), eq("an error"))).thenReturn(StatusResult.success());
184+
185+
manager.start();
186+
187+
await().untilAsserted(() -> {
188+
verify(transferProcessApiClient).failed(any(), eq("an error"));
189+
verify(store).save(argThat(it -> it.getState() == NOTIFIED.code()));
190+
});
191+
}
192+
193+
@Test
194+
void failed_shouldTransitionToFailed_whenRetryableError() {
195+
var dataFlow = dataFlowBuilder().state(FAILED.code()).errorDetail("an error").build();
196+
when(store.nextNotLeased(anyInt(), stateIs(FAILED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
197+
when(store.findById(any())).thenReturn(dataFlow);
198+
199+
when(transferProcessApiClient.failed(any(), eq("an error"))).thenReturn(StatusResult.failure(ERROR_RETRY));
200+
201+
manager.start();
202+
203+
await().untilAsserted(() -> {
204+
verify(transferProcessApiClient).failed(any(), eq("an error"));
205+
verify(store).save(argThat(it -> it.getState() == FAILED.code()));
206+
});
207+
}
208+
209+
@Test
210+
void failed_shouldTransitionToTerminated_whenFatalError() {
211+
var dataFlow = dataFlowBuilder().state(FAILED.code()).errorDetail("an error").build();
212+
when(store.nextNotLeased(anyInt(), stateIs(FAILED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
213+
when(store.findById(any())).thenReturn(dataFlow);
214+
215+
when(transferProcessApiClient.failed(any(), eq("an error"))).thenReturn(StatusResult.failure(FATAL_ERROR));
216+
217+
manager.start();
218+
219+
await().untilAsserted(() -> {
220+
verify(transferProcessApiClient).failed(any(), eq("an error"));
221+
verify(store).save(argThat(it -> it.getState() == TERMINATED.code()));
222+
});
223+
}
224+
225+
private DataFlow.Builder dataFlowBuilder() {
226+
return DataFlow.Builder.newInstance()
227+
.source(DataAddress.Builder.newInstance().type("source").build())
228+
.destination(DataAddress.Builder.newInstance().type("destination").build())
229+
.callbackAddress(URI.create("http://any"))
230+
.transferType(new TransferType("DestinationType", PUSH))
231+
.properties(Map.of("key", "value"));
232+
}
233+
234+
private Criterion[] stateIs(int state) {
235+
return aryEq(new Criterion[]{hasState(state)});
236+
}
237+
238+
private Criterion[] startedFlowOwnedByThisRuntime() {
239+
return arrayContains(new Criterion[]{hasState(STARTED.code()), new Criterion("runtimeId", "=", runtimeId)});
240+
}
241+
242+
private Criterion[] startedFlowOwnedByAnotherRuntime() {
243+
return arrayContains(new Criterion[]{hasState(STARTED.code()), new Criterion("runtimeId", "!=", runtimeId)});
244+
}
245+
246+
private DataFlowStartMessage createRequest() {
247+
return DataFlowStartMessage.Builder.newInstance()
248+
.id("1")
249+
.processId("1")
250+
.sourceDataAddress(DataAddress.Builder.newInstance().type("type").build())
251+
.destinationDataAddress(DataAddress.Builder.newInstance().type("type").build())
252+
.build();
253+
}
254+
134255
@Nested
135256
class Start {
136257

@@ -427,7 +548,20 @@ void shouldTerminateDataFlow() {
427548
void shouldTerminatePullDataFlow() {
428549
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).id("dataFlowId").transferType(new TransferType("DestinationType", FlowType.PULL)).build();
429550
when(store.findByIdAndLease(dataFlow.getId())).thenReturn(StoreResult.success(dataFlow));
430-
when(authorizationService.revokeEndpointDataReference(dataFlow.getId(), null)).thenReturn(Result.success());
551+
when(authorizationService.revokeEndpointDataReference(dataFlow.getId(), null)).thenReturn(ServiceResult.success());
552+
553+
var result = manager.terminate(dataFlow.getId(), null);
554+
555+
assertThat(result).isSucceeded();
556+
verify(store).save(argThat(d -> d.getState() == TERMINATED.code()));
557+
verify(authorizationService).revokeEndpointDataReference(dataFlow.getId(), null);
558+
}
559+
560+
@Test
561+
void shouldTerminatePullDataFlow_whenSuspendedAndRevokeNotFound() {
562+
var dataFlow = dataFlowBuilder().state(SUSPENDED.code()).id("dataFlowId").transferType(new TransferType("DestinationType", FlowType.PULL)).build();
563+
when(store.findByIdAndLease(dataFlow.getId())).thenReturn(StoreResult.success(dataFlow));
564+
when(authorizationService.revokeEndpointDataReference(dataFlow.getId(), null)).thenReturn(ServiceResult.notFound("not found"));
431565

432566
var result = manager.terminate(dataFlow.getId(), null);
433567

@@ -440,7 +574,7 @@ void shouldTerminatePullDataFlow() {
440574
void shouldFailToTerminatePullDataFlow_whenRevocationFails() {
441575
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).id("dataFlowId").transferType(new TransferType("DestinationType", FlowType.PULL)).build();
442576
when(store.findByIdAndLease(dataFlow.getId())).thenReturn(StoreResult.success(dataFlow));
443-
when(authorizationService.revokeEndpointDataReference(dataFlow.getId(), null)).thenReturn(Result.failure("failure"));
577+
when(authorizationService.revokeEndpointDataReference(dataFlow.getId(), null)).thenReturn(ServiceResult.notFound("failure"));
444578

445579
var result = manager.terminate(dataFlow.getId(), null);
446580

@@ -540,96 +674,6 @@ void shouldTransitionToDeprovisioning_whenFlowIsProvisionedOnConsumerSide() {
540674

541675
}
542676

543-
@Test
544-
void completed_shouldNotifyResultToControlPlane() {
545-
var dataFlow = dataFlowBuilder().state(COMPLETED.code()).build();
546-
when(store.nextNotLeased(anyInt(), stateIs(COMPLETED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
547-
when(transferProcessApiClient.completed(any())).thenReturn(StatusResult.success());
548-
549-
manager.start();
550-
551-
await().untilAsserted(() -> {
552-
verify(transferProcessApiClient).completed(any());
553-
verify(store).save(argThat(it -> it.getState() == NOTIFIED.code()));
554-
});
555-
}
556-
557-
@Test
558-
void completed_shouldTransitionToCompleted_whenRetryableError() {
559-
var dataFlow = dataFlowBuilder().state(COMPLETED.code()).build();
560-
when(store.nextNotLeased(anyInt(), stateIs(COMPLETED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
561-
when(transferProcessApiClient.completed(any())).thenReturn(StatusResult.failure(ERROR_RETRY));
562-
563-
manager.start();
564-
565-
await().untilAsserted(() -> {
566-
verify(transferProcessApiClient).completed(any());
567-
verify(store).save(argThat(it -> it.getState() == COMPLETED.code()));
568-
});
569-
}
570-
571-
@Test
572-
void completed_shouldTransitionToTerminated_whenFatalError() {
573-
var dataFlow = dataFlowBuilder().state(COMPLETED.code()).build();
574-
when(store.nextNotLeased(anyInt(), stateIs(COMPLETED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
575-
when(transferProcessApiClient.completed(any())).thenReturn(StatusResult.failure(FATAL_ERROR));
576-
577-
manager.start();
578-
579-
await().untilAsserted(() -> {
580-
verify(transferProcessApiClient).completed(any());
581-
verify(store).save(argThat(it -> it.getState() == TERMINATED.code()));
582-
});
583-
}
584-
585-
@Test
586-
void failed_shouldNotifyResultToControlPlane() {
587-
var dataFlow = dataFlowBuilder().state(FAILED.code()).errorDetail("an error").build();
588-
when(store.nextNotLeased(anyInt(), stateIs(FAILED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
589-
when(store.findById(any())).thenReturn(dataFlow);
590-
591-
when(transferProcessApiClient.failed(any(), eq("an error"))).thenReturn(StatusResult.success());
592-
593-
manager.start();
594-
595-
await().untilAsserted(() -> {
596-
verify(transferProcessApiClient).failed(any(), eq("an error"));
597-
verify(store).save(argThat(it -> it.getState() == NOTIFIED.code()));
598-
});
599-
}
600-
601-
@Test
602-
void failed_shouldTransitionToFailed_whenRetryableError() {
603-
var dataFlow = dataFlowBuilder().state(FAILED.code()).errorDetail("an error").build();
604-
when(store.nextNotLeased(anyInt(), stateIs(FAILED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
605-
when(store.findById(any())).thenReturn(dataFlow);
606-
607-
when(transferProcessApiClient.failed(any(), eq("an error"))).thenReturn(StatusResult.failure(ERROR_RETRY));
608-
609-
manager.start();
610-
611-
await().untilAsserted(() -> {
612-
verify(transferProcessApiClient).failed(any(), eq("an error"));
613-
verify(store).save(argThat(it -> it.getState() == FAILED.code()));
614-
});
615-
}
616-
617-
@Test
618-
void failed_shouldTransitionToTerminated_whenFatalError() {
619-
var dataFlow = dataFlowBuilder().state(FAILED.code()).errorDetail("an error").build();
620-
when(store.nextNotLeased(anyInt(), stateIs(FAILED.code()))).thenReturn(List.of(dataFlow)).thenReturn(emptyList());
621-
when(store.findById(any())).thenReturn(dataFlow);
622-
623-
when(transferProcessApiClient.failed(any(), eq("an error"))).thenReturn(StatusResult.failure(FATAL_ERROR));
624-
625-
manager.start();
626-
627-
await().untilAsserted(() -> {
628-
verify(transferProcessApiClient).failed(any(), eq("an error"));
629-
verify(store).save(argThat(it -> it.getState() == TERMINATED.code()));
630-
});
631-
}
632-
633677
@Nested
634678
class Received {
635679

@@ -671,7 +715,7 @@ void shouldTransitionToCompleted_whenTransferSucceeds() {
671715

672716
transferFuture.complete(StreamResult.success());
673717

674-
await().untilAsserted(() -> {
718+
await().untilAsserted(() -> {
675719
verify(store).save(argThat(it -> it.getState() == COMPLETED.code()));
676720
});
677721
}
@@ -1049,34 +1093,4 @@ void shouldRestartPullFlow_whenAnotherRuntimeAbandonedIt() {
10491093
}
10501094
}
10511095

1052-
private DataFlow.Builder dataFlowBuilder() {
1053-
return DataFlow.Builder.newInstance()
1054-
.source(DataAddress.Builder.newInstance().type("source").build())
1055-
.destination(DataAddress.Builder.newInstance().type("destination").build())
1056-
.callbackAddress(URI.create("http://any"))
1057-
.transferType(new TransferType("DestinationType", PUSH))
1058-
.properties(Map.of("key", "value"));
1059-
}
1060-
1061-
private Criterion[] stateIs(int state) {
1062-
return aryEq(new Criterion[]{hasState(state)});
1063-
}
1064-
1065-
private Criterion[] startedFlowOwnedByThisRuntime() {
1066-
return arrayContains(new Criterion[] { hasState(STARTED.code()), new Criterion("runtimeId", "=", runtimeId) });
1067-
}
1068-
1069-
private Criterion[] startedFlowOwnedByAnotherRuntime() {
1070-
return arrayContains(new Criterion[] { hasState(STARTED.code()), new Criterion("runtimeId", "!=", runtimeId) });
1071-
}
1072-
1073-
private DataFlowStartMessage createRequest() {
1074-
return DataFlowStartMessage.Builder.newInstance()
1075-
.id("1")
1076-
.processId("1")
1077-
.sourceDataAddress(DataAddress.Builder.newInstance().type("type").build())
1078-
.destinationDataAddress(DataAddress.Builder.newInstance().type("type").build())
1079-
.build();
1080-
}
1081-
10821096
}

extensions/data-plane/data-plane-iam/src/main/java/org/eclipse/edc/connector/dataplane/iam/service/DataPlaneAuthorizationServiceImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.eclipse.edc.spi.iam.TokenParameters;
2424
import org.eclipse.edc.spi.iam.TokenRepresentation;
2525
import org.eclipse.edc.spi.result.Result;
26+
import org.eclipse.edc.spi.result.ServiceResult;
2627
import org.eclipse.edc.spi.types.domain.DataAddress;
2728
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
2829

@@ -98,7 +99,7 @@ public Result<DataAddress> authorize(String token, Map<String, Object> requestDa
9899
}
99100

100101
@Override
101-
public Result<Void> revokeEndpointDataReference(String transferProcessId, String reason) {
102+
public ServiceResult<Void> revokeEndpointDataReference(String transferProcessId, String reason) {
102103
return accessTokenService.revoke(transferProcessId, reason);
103104
}
104105

extensions/data-plane/data-plane-iam/src/main/java/org/eclipse/edc/connector/dataplane/iam/service/DefaultDataPlaneAccessTokenServiceImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.eclipse.edc.spi.query.Criterion;
2626
import org.eclipse.edc.spi.query.QuerySpec;
2727
import org.eclipse.edc.spi.result.Result;
28+
import org.eclipse.edc.spi.result.ServiceResult;
2829
import org.eclipse.edc.spi.types.domain.DataAddress;
2930
import org.eclipse.edc.token.spi.KeyIdDecorator;
3031
import org.eclipse.edc.token.spi.TokenDecorator;
@@ -135,7 +136,7 @@ public Result<AccessTokenData> resolve(String token) {
135136
}
136137

137138
@Override
138-
public Result<Void> revoke(String transferProcessId, String reason) {
139+
public ServiceResult<Void> revoke(String transferProcessId, String reason) {
139140

140141
var query = QuerySpec.Builder.newInstance()
141142
.filter(new Criterion("additionalProperties.process_id", "=", transferProcessId))
@@ -144,7 +145,8 @@ public Result<Void> revoke(String transferProcessId, String reason) {
144145
var tokens = accessTokenDataStore.query(query);
145146
return tokens.stream().map(this::deleteTokenData)
146147
.reduce(Result::merge)
147-
.orElseGet(() -> Result.failure("AccessTokenData associated to the transfer with ID '%s' does not exist.".formatted(transferProcessId)));
148+
.map(r -> r.succeeded() ? ServiceResult.<Void>success() : ServiceResult.<Void>unexpected(r.getFailureDetail()))
149+
.orElseGet(() -> ServiceResult.notFound("AccessTokenData associated to the transfer with ID '%s' does not exist.".formatted(transferProcessId)));
148150

149151
}
150152

0 commit comments

Comments
 (0)