diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/ControlPlaneServicesExtension.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/ControlPlaneServicesExtension.java index 3dc2da93803..083f73b2a93 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/ControlPlaneServicesExtension.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/ControlPlaneServicesExtension.java @@ -62,7 +62,6 @@ import org.eclipse.edc.connector.controlplane.services.transferprocess.TransferProcessProtocolServiceImpl; import org.eclipse.edc.connector.controlplane.services.transferprocess.TransferProcessServiceImpl; import org.eclipse.edc.connector.controlplane.transfer.spi.TransferProcessManager; -import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager; import org.eclipse.edc.connector.controlplane.transfer.spi.flow.TransferTypeParser; import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessObservable; import org.eclipse.edc.connector.controlplane.transfer.spi.store.TransferProcessStore; @@ -184,9 +183,6 @@ public class ControlPlaneServicesExtension implements ServiceExtension { @Inject private ProtocolVersionRegistry protocolVersionRegistry; - @Inject - private DataFlowManager dataFlowManager; - @Inject private TransferTypeParser transferTypeParser; @@ -273,7 +269,7 @@ public TransferProcessService transferProcessService() { public TransferProcessProtocolService transferProcessProtocolService() { return new TransferProcessProtocolServiceImpl(transferProcessStore, transactionContext, contractNegotiationStore, contractValidationService, protocolTokenValidator(), dataAddressValidator, transferProcessObservable, clock, - monitor, telemetry, dataFlowManager); + monitor, telemetry); } @Provider diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java index cae09ab2c52..38a2b5c5202 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImpl.java @@ -21,7 +21,6 @@ import org.eclipse.edc.connector.controlplane.contract.spi.validation.ContractValidationService; import org.eclipse.edc.connector.controlplane.services.spi.protocol.ProtocolTokenValidator; import org.eclipse.edc.connector.controlplane.services.spi.transferprocess.TransferProcessProtocolService; -import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager; import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessObservable; import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessStartedData; import org.eclipse.edc.connector.controlplane.transfer.spi.store.TransferProcessStore; @@ -67,14 +66,13 @@ public class TransferProcessProtocolServiceImpl implements TransferProcessProtoc private final Clock clock; private final Monitor monitor; private final Telemetry telemetry; - private final DataFlowManager dataFlowManager; public TransferProcessProtocolServiceImpl(TransferProcessStore transferProcessStore, TransactionContext transactionContext, ContractNegotiationStore negotiationStore, ContractValidationService contractValidationService, ProtocolTokenValidator protocolTokenValidator, DataAddressValidatorRegistry dataAddressValidator, TransferProcessObservable observable, - Clock clock, Monitor monitor, Telemetry telemetry, DataFlowManager dataFlowManager) { + Clock clock, Monitor monitor, Telemetry telemetry) { this.transferProcessStore = transferProcessStore; this.transactionContext = transactionContext; this.negotiationStore = negotiationStore; @@ -85,7 +83,6 @@ public TransferProcessProtocolServiceImpl(TransferProcessStore transferProcessSt this.clock = clock; this.monitor = monitor; this.telemetry = telemetry; - this.dataFlowManager = dataFlowManager; } @Override @@ -212,18 +209,11 @@ private ServiceResult completedAction(TransferCompletionMessage @NotNull private ServiceResult suspendedAction(TransferSuspensionMessage message, TransferProcess transferProcess) { - if (transferProcess.getType() == PROVIDER) { - var suspension = dataFlowManager.suspend(transferProcess); - if (suspension.failed()) { - return ServiceResult.conflict("Cannot suspend transfer process %s: %s".formatted(transferProcess.getId(), suspension.getFailureDetail())); - } - } - if (transferProcess.canBeTerminated()) { + if (transferProcess.canBeSuspended()) { var reason = message.getReason().stream().map(Object::toString).collect(joining(", ")); - transferProcess.transitionSuspended(reason); + transferProcess.transitionSuspendingRequested(reason); transferProcess.protocolMessageReceived(message.getId()); update(transferProcess); - observable.invokeForEach(l -> l.suspended(transferProcess)); return ServiceResult.success(transferProcess); } else { return ServiceResult.conflict(format("Cannot process %s because %s", message.getClass().getSimpleName(), "transfer cannot be suspended")); diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImplTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImplTest.java index 54c3b5d21dc..95f819b84ad 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImplTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessProtocolServiceImplTest.java @@ -21,7 +21,6 @@ import org.eclipse.edc.connector.controlplane.services.spi.protocol.ProtocolTokenValidator; import org.eclipse.edc.connector.controlplane.services.spi.transferprocess.TransferProcessProtocolService; import org.eclipse.edc.connector.controlplane.transfer.observe.TransferProcessObservableImpl; -import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager; import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessListener; import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessStartedData; import org.eclipse.edc.connector.controlplane.transfer.spi.store.TransferProcessStore; @@ -37,7 +36,6 @@ import org.eclipse.edc.participant.spi.ParticipantAgent; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.iam.TokenRepresentation; -import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.result.ServiceFailure; import org.eclipse.edc.spi.result.ServiceResult; @@ -73,10 +71,10 @@ import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDED; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDING_REQUESTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATING_REQUESTED; import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; -import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; import static org.eclipse.edc.spi.result.ServiceFailure.Reason.BAD_REQUEST; import static org.eclipse.edc.spi.result.ServiceFailure.Reason.CONFLICT; import static org.eclipse.edc.spi.result.ServiceFailure.Reason.NOT_FOUND; @@ -104,7 +102,6 @@ class TransferProcessProtocolServiceImplTest { private final ContractValidationService validationService = mock(); private final DataAddressValidatorRegistry dataAddressValidator = mock(); private final TransferProcessListener listener = mock(); - private final DataFlowManager dataFlowManager = mock(); private final ProtocolTokenValidator protocolTokenValidator = mock(); private TransferProcessProtocolService service; @@ -114,7 +111,7 @@ void setUp() { var observable = new TransferProcessObservableImpl(); observable.registerListener(listener); service = new TransferProcessProtocolServiceImpl(store, transactionContext, negotiationStore, validationService, - protocolTokenValidator, dataAddressValidator, observable, mock(), mock(), mock(), dataFlowManager); + protocolTokenValidator, dataAddressValidator, observable, mock(), mock(), mock()); } @@ -405,167 +402,6 @@ void shouldReturnBadRequest_whenCounterPartyUnauthorized() { } } - @Test - void findById_shouldReturnTransferProcess_whenValidCounterParty() { - var participantAgent = participantAgent(); - var tokenRepresentation = tokenRepresentation(); - var processId = "transferProcessId"; - var transferProcess = transferProcess(INITIAL, processId); - var agreement = contractAgreement(); - - when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), isNull())).thenReturn(ServiceResult.success(participantAgent)); - when(store.findById(processId)).thenReturn(transferProcess); - when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success()); - - var result = service.findById(processId, tokenRepresentation); - - assertThat(result) - .isSucceeded() - .isEqualTo(transferProcess); - } - - @Test - void findById_shouldReturnNotFound_whenNegotiationNotFound() { - var participantAgent = participantAgent(); - var tokenRepresentation = tokenRepresentation(); - - when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any())).thenReturn(ServiceResult.success(participantAgent)); - when(store.findById(any())).thenReturn(null); - - var result = service.findById("invalidId", tokenRepresentation); - - assertThat(result) - .isFailed() - .extracting(ServiceFailure::getReason) - .isEqualTo(NOT_FOUND); - } - - @Test - void findById_shouldReturnBadRequest_whenCounterPartyUnauthorized() { - var processId = "transferProcessId"; - var transferProcess = transferProcess(INITIAL, processId); - var participantAgent = participantAgent(); - var tokenRepresentation = tokenRepresentation(); - var agreement = contractAgreement(); - - when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), isNull())).thenReturn(ServiceResult.success(participantAgent)); - when(store.findById(processId)).thenReturn(transferProcess); - when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.failure("error")); - - var result = service.findById(processId, tokenRepresentation); - - assertThat(result) - .isFailed() - .extracting(ServiceFailure::getReason) - .isEqualTo(BAD_REQUEST); - } - - @ParameterizedTest - @ArgumentsSource(NotifyArguments.class) - void notify_shouldFail_whenTransferProcessNotFound(MethodCall methodCall, M message) { - var participantAgent = participantAgent(); - var tokenRepresentation = tokenRepresentation(); - - when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any())).thenReturn(ServiceResult.success(participantAgent)); - when(store.findByIdAndLease(any())).thenReturn(StoreResult.notFound("not found")); - - var result = methodCall.call(service, message, tokenRepresentation); - - assertThat(result).matches(ServiceResult::failed); - verify(store, never()).save(any()); - verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); - } - - @ParameterizedTest - @ArgumentsSource(NotifyArguments.class) - void notify_shouldFail_whenTokenValidationFails(MethodCall methodCall, M message) { - var tokenRepresentation = tokenRepresentation(); - - when(store.findById(any())).thenReturn(transferProcessBuilder().build()); - when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(transferProcessBuilder().build())); - when(negotiationStore.findContractAgreement(any())).thenReturn(contractAgreement()); - when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.unauthorized("unauthorized")); - - var result = methodCall.call(service, message, tokenRepresentation); - - assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(NOT_FOUND); - verify(store, never()).save(any()); - verifyNoInteractions(listener); - } - - private TransferProcess transferProcess(TransferProcessStates state, String id) { - return transferProcessBuilder() - .id(id) - .state(state.code()) - .build(); - } - - private TransferProcess.Builder transferProcessBuilder() { - return TransferProcess.Builder.newInstance() - .contractId("contractId") - .dataDestination(DataAddress.Builder.newInstance().type("type").build()); - } - - private ParticipantAgent participantAgent() { - return new ParticipantAgent(emptyMap(), emptyMap()); - } - - private TokenRepresentation tokenRepresentation() { - return TokenRepresentation.Builder.newInstance() - .token(UUID.randomUUID().toString()) - .build(); - } - - private ContractAgreement contractAgreement() { - return ContractAgreement.Builder.newInstance() - .id("agreementId") - .providerId("provider") - .consumerId("consumer") - .assetId("assetId") - .policy(Policy.Builder.newInstance().build()) - .build(); - } - - @FunctionalInterface - private interface MethodCall { - ServiceResult call(TransferProcessProtocolService service, M message, TokenRepresentation token); - } - - private static class NotifyArguments implements ArgumentsProvider { - - @Override - public Stream provideArguments(ExtensionContext extensionContext) { - MethodCall started = TransferProcessProtocolService::notifyStarted; - MethodCall completed = TransferProcessProtocolService::notifyCompleted; - MethodCall suspended = TransferProcessProtocolService::notifySuspended; - MethodCall terminated = TransferProcessProtocolService::notifyTerminated; - return Stream.of( - arguments(started, - build(TransferStartMessage.Builder.newInstance()), - CONSUMER, REQUESTED - ), - arguments(completed, - build(TransferCompletionMessage.Builder.newInstance()), - CONSUMER, STARTED - ), - arguments(suspended, - build(TransferSuspensionMessage.Builder.newInstance().code("TestCode").reason("TestReason")), - PROVIDER, STARTED - ), - arguments(terminated, - build(TransferTerminationMessage.Builder.newInstance().code("TestCode").reason("TestReason")), - PROVIDER, STARTED - ) - ); - } - - private M build(TransferRemoteMessage.Builder builder) { - return builder.protocol("protocol").counterPartyAddress("http://any").processId("correlationId").build(); - } - } - @Nested class NotifyStarted { @Test @@ -714,7 +550,6 @@ void shouldReturnError_whenStatusIsNotSuspendedAndTypeProvider() { var transferProcess = transferProcessBuilder().id("transferProcessId") .state(REQUESTED.code()).type(PROVIDER).build(); var dataFlowResponse = DataFlowResponse.Builder.newInstance().dataPlaneId("dataPlaneId").build(); - when(dataFlowManager.start(any(), any())).thenReturn(StatusResult.success(dataFlowResponse)); when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent)); when(store.findById("correlationId")).thenReturn(transferProcess); @@ -730,37 +565,9 @@ void shouldReturnError_whenStatusIsNotSuspendedAndTypeProvider() { @Nested class NotifySuspended { - @Test - void consumer_shouldTransitionToSuspended() { - var participantAgent = participantAgent(); - var tokenRepresentation = tokenRepresentation(); - var message = TransferSuspensionMessage.Builder.newInstance() - .protocol("protocol") - .consumerPid("consumerPid") - .providerPid("providerPid") - .counterPartyAddress("http://any") - .processId("correlationId") - .code("TestCode") - .reason("TestReason") - .build(); - var agreement = contractAgreement(); - var transferProcess = transferProcessBuilder().state(STARTED.code()).type(CONSUMER).build(); - - when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent)); - when(store.findById("correlationId")).thenReturn(transferProcess); - when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); - when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success()); - var result = service.notifySuspended(message, tokenRepresentation); - - assertThat(result).isSucceeded(); - verify(store).save(argThat(t -> t.getState() == SUSPENDED.code())); - verify(listener).suspended(any()); - verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); - } @Test - void provider_shouldSuspendDataFlowAndTransitionToSuspended() { + void shouldTransitionToSuspendingRequested() { var participantAgent = participantAgent(); var tokenRepresentation = tokenRepresentation(); var message = TransferSuspensionMessage.Builder.newInstance() @@ -773,25 +580,23 @@ void provider_shouldSuspendDataFlowAndTransitionToSuspended() { .reason("TestReason") .build(); var agreement = contractAgreement(); - var transferProcess = transferProcessBuilder().state(STARTED.code()).type(PROVIDER).build(); - + var transferProcess = transferProcessBuilder().state(STARTED.code()).build(); when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent)); - when(store.findById("correlationId")).thenReturn(transferProcess); - when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); + when(store.findById(any())).thenReturn(transferProcess); + when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(transferProcess)); when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success()); - when(dataFlowManager.suspend(any())).thenReturn(StatusResult.success()); var result = service.notifySuspended(message, tokenRepresentation); assertThat(result).isSucceeded(); - verify(store).save(argThat(t -> t.getState() == SUSPENDED.code())); - verify(listener).suspended(any()); + verify(store).save(argThat(t -> t.getState() == SUSPENDING_REQUESTED.code())); verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); + verifyNoInteractions(listener); } @Test - void provider_shouldReturnConflict_whenDataFlowCannotBeSuspended() { + void shouldReturnConflict_whenDataFlowCannotBeSuspended() { var participantAgent = participantAgent(); var tokenRepresentation = tokenRepresentation(); var message = TransferSuspensionMessage.Builder.newInstance() @@ -804,19 +609,19 @@ void provider_shouldReturnConflict_whenDataFlowCannotBeSuspended() { .reason("TestReason") .build(); var agreement = contractAgreement(); - var transferProcess = transferProcessBuilder().state(STARTED.code()).type(PROVIDER).build(); + var transferProcess = transferProcessBuilder().state(REQUESTED.code()).type(PROVIDER).build(); when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent)); - when(store.findById("correlationId")).thenReturn(transferProcess); - when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); + when(store.findById(any())).thenReturn(transferProcess); + when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(transferProcess)); when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success()); - when(dataFlowManager.suspend(any())).thenReturn(StatusResult.failure(FATAL_ERROR)); var result = service.notifySuspended(message, tokenRepresentation); assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(CONFLICT); - verify(store, times(1)).save(argThat(tp -> tp.getState() == STARTED.code())); + verify(store, times(1)).save(argThat(tp -> tp.getState() == REQUESTED.code())); + verifyNoInteractions(listener); } @Test @@ -836,8 +641,8 @@ void shouldReturnConflict_whenTransferProcessCannotBeSuspended() { .build(); when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent)); - when(store.findById("correlationId")).thenReturn(transferProcess); - when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); + when(store.findById(any())).thenReturn(transferProcess); + when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(transferProcess)); when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success()); @@ -866,8 +671,8 @@ void shouldReturnBadRequest_whenCounterPartyUnauthorized() { .build(); when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent)); - when(store.findById("correlationId")).thenReturn(transferProcess); - when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); + when(store.findById(any())).thenReturn(transferProcess); + when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(transferProcess)); when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.failure("error")); @@ -898,7 +703,6 @@ void notify_shouldStoreReceivedMessageId(Method when(negotiationStore.findContractAgreement(any())).thenReturn(contractAgreement()); when(validationService.validateAgreement(any(ParticipantAgent.class), any())).thenAnswer(i -> Result.success(i.getArgument(1))); when(validationService.validateRequest(any(ParticipantAgent.class), isA(ContractAgreement.class))).thenReturn(Result.success()); - when(dataFlowManager.suspend(any())).thenReturn(StatusResult.success()); var result = methodCall.call(service, message, tokenRepresentation()); @@ -949,4 +753,166 @@ void notify_shouldIgnoreMessage_whenFinalState( verifyNoInteractions(listener); } } + + @Test + void findById_shouldReturnTransferProcess_whenValidCounterParty() { + var participantAgent = participantAgent(); + var tokenRepresentation = tokenRepresentation(); + var processId = "transferProcessId"; + var transferProcess = transferProcess(INITIAL, processId); + var agreement = contractAgreement(); + + when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), isNull())).thenReturn(ServiceResult.success(participantAgent)); + when(store.findById(processId)).thenReturn(transferProcess); + when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); + when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success()); + + var result = service.findById(processId, tokenRepresentation); + + assertThat(result) + .isSucceeded() + .isEqualTo(transferProcess); + } + + @Test + void findById_shouldReturnNotFound_whenNegotiationNotFound() { + var participantAgent = participantAgent(); + var tokenRepresentation = tokenRepresentation(); + + when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any())).thenReturn(ServiceResult.success(participantAgent)); + when(store.findById(any())).thenReturn(null); + + var result = service.findById("invalidId", tokenRepresentation); + + assertThat(result) + .isFailed() + .extracting(ServiceFailure::getReason) + .isEqualTo(NOT_FOUND); + } + + @Test + void findById_shouldReturnBadRequest_whenCounterPartyUnauthorized() { + var processId = "transferProcessId"; + var transferProcess = transferProcess(INITIAL, processId); + var participantAgent = participantAgent(); + var tokenRepresentation = tokenRepresentation(); + var agreement = contractAgreement(); + + when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), isNull())).thenReturn(ServiceResult.success(participantAgent)); + when(store.findById(processId)).thenReturn(transferProcess); + when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); + when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.failure("error")); + + var result = service.findById(processId, tokenRepresentation); + + assertThat(result) + .isFailed() + .extracting(ServiceFailure::getReason) + .isEqualTo(BAD_REQUEST); + } + + @ParameterizedTest + @ArgumentsSource(NotifyArguments.class) + void notify_shouldFail_whenTransferProcessNotFound(MethodCall methodCall, M message) { + var participantAgent = participantAgent(); + var tokenRepresentation = tokenRepresentation(); + + when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any())).thenReturn(ServiceResult.success(participantAgent)); + when(store.findByIdAndLease(any())).thenReturn(StoreResult.notFound("not found")); + + var result = methodCall.call(service, message, tokenRepresentation); + + assertThat(result).matches(ServiceResult::failed); + verify(store, never()).save(any()); + verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); + } + + @ParameterizedTest + @ArgumentsSource(NotifyArguments.class) + void notify_shouldFail_whenTokenValidationFails(MethodCall methodCall, M message) { + var tokenRepresentation = tokenRepresentation(); + + when(store.findById(any())).thenReturn(transferProcessBuilder().build()); + when(store.findByIdAndLease(any())).thenReturn(StoreResult.success(transferProcessBuilder().build())); + when(negotiationStore.findContractAgreement(any())).thenReturn(contractAgreement()); + when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.unauthorized("unauthorized")); + + var result = methodCall.call(service, message, tokenRepresentation); + + assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(NOT_FOUND); + verify(store, never()).save(any()); + verifyNoInteractions(listener); + } + + private TransferProcess transferProcess(TransferProcessStates state, String id) { + return transferProcessBuilder() + .id(id) + .state(state.code()) + .build(); + } + + private TransferProcess.Builder transferProcessBuilder() { + return TransferProcess.Builder.newInstance() + .contractId("contractId") + .dataDestination(DataAddress.Builder.newInstance().type("type").build()); + } + + private ParticipantAgent participantAgent() { + return new ParticipantAgent(emptyMap(), emptyMap()); + } + + private TokenRepresentation tokenRepresentation() { + return TokenRepresentation.Builder.newInstance() + .token(UUID.randomUUID().toString()) + .build(); + } + + private ContractAgreement contractAgreement() { + return ContractAgreement.Builder.newInstance() + .id("agreementId") + .providerId("provider") + .consumerId("consumer") + .assetId("assetId") + .policy(Policy.Builder.newInstance().build()) + .build(); + } + + @FunctionalInterface + private interface MethodCall { + ServiceResult call(TransferProcessProtocolService service, M message, TokenRepresentation token); + } + + private static class NotifyArguments implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext extensionContext) { + MethodCall started = TransferProcessProtocolService::notifyStarted; + MethodCall completed = TransferProcessProtocolService::notifyCompleted; + MethodCall suspended = TransferProcessProtocolService::notifySuspended; + MethodCall terminated = TransferProcessProtocolService::notifyTerminated; + return Stream.of( + arguments(started, + build(TransferStartMessage.Builder.newInstance()), + CONSUMER, REQUESTED + ), + arguments(completed, + build(TransferCompletionMessage.Builder.newInstance()), + CONSUMER, STARTED + ), + arguments(suspended, + build(TransferSuspensionMessage.Builder.newInstance().code("TestCode").reason("TestReason")), + PROVIDER, STARTED + ), + arguments(terminated, + build(TransferTerminationMessage.Builder.newInstance().code("TestCode").reason("TestReason")), + PROVIDER, STARTED + ) + ); + } + + private M build(TransferRemoteMessage.Builder builder) { + return builder.protocol("protocol").counterPartyAddress("http://any").processId("correlationId").build(); + } + } + } diff --git a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java index c7826485749..5b33b2db63c 100644 --- a/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java +++ b/core/control-plane/control-plane-transfer/src/main/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImpl.java @@ -79,6 +79,7 @@ import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.RESUMING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDING; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDING_REQUESTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATING_REQUESTED; import static org.eclipse.edc.spi.persistence.StateEntityStore.hasState; @@ -176,6 +177,7 @@ protected StateMachineManager.Builder configureStateMachineManager(StateMachineM .processor(processConsumerTransfersInState(REQUESTING, this::processRequesting)) .processor(processProviderTransfersInState(STARTING, this::processStarting)) .processor(processTransfersInState(SUSPENDING, this::processSuspending)) + .processor(processTransfersInState(SUSPENDING_REQUESTED, this::processSuspending)) .processor(processProviderTransfersInState(RESUMING, this::processProviderResuming)) .processor(processConsumerTransfersInState(RESUMING, this::processConsumerResuming)) .processor(processTransfersInState(COMPLETING, this::processCompleting)) @@ -421,10 +423,22 @@ private boolean processSuspending(TransferProcess process) { return entityRetryProcessFactory.retryProcessor(process) .doProcess(result("Suspend DataFlow", (t, c) -> suspendDataFlow(process))) .doProcess(futureResult("Dispatch TransferSuspensionMessage to " + process.getCounterPartyAddress(), - (t, dataFlowResponse) -> dispatch(builder, t, Object.class)) + (t, dataFlowResponse) -> { + if (t.suspensionWasRequestedByCounterParty()) { + return completedFuture(StatusResult.success(null)); + } else { + return dispatch(builder, t, Object.class); + } + }) ) .onSuccess((t, content) -> transitionToSuspended(t)) - .onFailure((t, throwable) -> transitionToSuspending(t, throwable.getMessage())) + .onFailure((t, throwable) -> { + if (t.suspensionWasRequestedByCounterParty()) { + transitionToSuspendingRequested(t, throwable.getMessage()); + } else { + transitionToSuspending(t, throwable.getMessage()); + } + }) .onFinalFailure((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable)) .execute(); } @@ -637,6 +651,12 @@ private void transitionToSuspending(TransferProcess process, String message) { update(process); } + private void transitionToSuspendingRequested(TransferProcess process, String message, Throwable... errors) { + monitor.warning(message, errors); + process.transitionSuspendingRequested(message); + update(process); + } + private void transitionToSuspended(TransferProcess process) { process.transitionSuspended(); update(process); diff --git a/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java b/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java index 2d32c3b3ca1..aa53d4fb0c6 100644 --- a/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java +++ b/core/control-plane/control-plane-transfer/src/test/java/org/eclipse/edc/connector/controlplane/transfer/process/TransferProcessManagerImplTest.java @@ -96,6 +96,7 @@ import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDING; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDING_REQUESTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATING_REQUESTED; @@ -584,111 +585,6 @@ void pendingGuard_shouldSetTheTransferPending_whenPendingGuardMatches() { }); } - @ParameterizedTest - @ArgumentsSource(DispatchFailureArguments.class) - void dispatchFailure(TransferProcessStates starting, TransferProcessStates ending, CompletableFuture> result, UnaryOperator builderEnricher) { - var transferProcess = builderEnricher.apply(createTransferProcessBuilder(starting).state(starting.code())).build(); - when(transferProcessStore.nextNotLeased(anyInt(), or(stateIs(starting.code()), or(consumerStateIs(starting.code()), providerStateIs(starting.code()))))) - .thenReturn(List.of(transferProcess)).thenReturn(emptyList()); - when(dispatcherRegistry.dispatch(any(), any())).thenReturn(result); - when(transferProcessStore.findById(transferProcess.getId())).thenReturn(transferProcess); - when(dataFlowManager.suspend(any())).thenReturn(StatusResult.success()); - when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success()); - - manager.start(); - - await().untilAsserted(() -> { - var captor = ArgumentCaptor.forClass(TransferProcess.class); - verify(transferProcessStore).save(captor.capture()); - assertThat(captor.getAllValues()).hasSize(1).first().satisfies(n -> { - assertThat(n.getState()).isEqualTo(ending.code()); - }); - verify(dispatcherRegistry, only()).dispatch(any(), any()); - }); - } - - private Criterion[] consumerStateIs(int state) { - return aryEq(new Criterion[]{ hasState(state), isNotPending(), criterion("type", "=", CONSUMER.name()) }); - } - - private Criterion[] providerStateIs(int state) { - return aryEq(new Criterion[]{ hasState(state), isNotPending(), criterion("type", "=", PROVIDER.name()) }); - } - - private Criterion[] stateIs(int state) { - return aryEq(new Criterion[]{ hasState(state), isNotPending() }); - } - - private DataFlowResponse createDataFlowResponse() { - return DataFlowResponse.Builder.newInstance() - .dataAddress(DataAddress.Builder.newInstance() - .type("type") - .build()) - .build(); - } - - private TransferProcess createTransferProcess(TransferProcessStates inState) { - return createTransferProcessBuilder(inState).build(); - } - - private TransferProcess.Builder createTransferProcessBuilder(TransferProcessStates state) { - var processId = UUID.randomUUID().toString(); - - return TransferProcess.Builder.newInstance() - .provisionedResourceSet(ProvisionedResourceSet.Builder.newInstance().build()) - .type(CONSUMER) - .id("test-process-" + processId) - .state(state.code()) - .correlationId(UUID.randomUUID().toString()) - .counterPartyAddress("http://an/address") - .contractId(UUID.randomUUID().toString()) - .assetId(UUID.randomUUID().toString()) - .dataDestination(DataAddress.Builder.newInstance().type(DESTINATION_TYPE).build()) - .protocol("protocol"); - } - - private ProvisionedDataDestinationResource provisionedDataDestinationResource() { - return new TestProvisionedDataDestinationResource("test-resource", PROVISIONED_RESOURCE_ID); - } - - private static class DispatchFailureArguments implements ArgumentsProvider { - - private static final int RETRIES_NOT_EXHAUSTED = RETRY_LIMIT; - private static final int RETRIES_EXHAUSTED = RETRIES_NOT_EXHAUSTED + 1; - - @Override - public Stream provideArguments(ExtensionContext extensionContext) { - CompletableFuture> genericError = failedFuture(new EdcException("error")); - var fatalError = completedFuture(StatusResult.failure(FATAL_ERROR)); - return Stream.of( - // retries not exhausted - new DispatchFailure(REQUESTING, REQUESTING, genericError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), - new DispatchFailure(STARTING, STARTING, genericError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED).type(PROVIDER)), - new DispatchFailure(COMPLETING, COMPLETING, genericError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), - new DispatchFailure(SUSPENDING, SUSPENDING, genericError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), - new DispatchFailure(RESUMING, RESUMING, genericError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED).type(PROVIDER)), - new DispatchFailure(RESUMING, RESUMING, genericError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED).type(CONSUMER)), - new DispatchFailure(TERMINATING, TERMINATING, genericError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), - // retries exhausted - new DispatchFailure(REQUESTING, TERMINATED, genericError, b -> b.stateCount(RETRIES_EXHAUSTED)), - new DispatchFailure(STARTING, TERMINATING, genericError, b -> b.stateCount(RETRIES_EXHAUSTED).type(PROVIDER)), - new DispatchFailure(COMPLETING, TERMINATING, genericError, b -> b.stateCount(RETRIES_EXHAUSTED)), - new DispatchFailure(SUSPENDING, TERMINATING, genericError, b -> b.stateCount(RETRIES_EXHAUSTED)), - new DispatchFailure(RESUMING, TERMINATING, genericError, b -> b.stateCount(RETRIES_EXHAUSTED).type(CONSUMER)), - new DispatchFailure(RESUMING, TERMINATING, genericError, b -> b.stateCount(RETRIES_EXHAUSTED).type(PROVIDER)), - new DispatchFailure(TERMINATING, TERMINATED, genericError, b -> b.stateCount(RETRIES_EXHAUSTED)), - // fatal error, in this case retry should never be done - new DispatchFailure(REQUESTING, TERMINATED, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), - new DispatchFailure(STARTING, TERMINATING, fatalError, b -> b.type(PROVIDER).stateCount(RETRIES_NOT_EXHAUSTED)), - new DispatchFailure(COMPLETING, TERMINATING, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), - new DispatchFailure(SUSPENDING, TERMINATING, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), - new DispatchFailure(RESUMING, TERMINATING, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED).type(CONSUMER)), - new DispatchFailure(RESUMING, TERMINATING, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED).type(PROVIDER)), - new DispatchFailure(TERMINATING, TERMINATED, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)) - ); - } - } - @Nested class InitiateConsumerRequest { @Test @@ -1061,4 +957,170 @@ void consumer_shouldTransitionToSuspended_whenMessageSentCorrectly() { } + @Nested + class SuspendingRequestedProvider { + + @Test + void shouldSuspendDataFlowAndTransitionToSuspendedAndNotSendMessage_whenMessageWasSentByCounterPart() { + var process = createTransferProcessBuilder(SUSPENDING_REQUESTED).type(PROVIDER).correlationId("counterPartyId").build(); + when(transferProcessStore.nextNotLeased(anyInt(), stateIs(SUSPENDING_REQUESTED.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); + when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(SUSPENDING_REQUESTED.code()).build()); + when(dataFlowManager.suspend(any())).thenReturn(StatusResult.success()); + + manager.start(); + + await().untilAsserted(() -> { + verify(dataFlowManager).suspend(process); + verifyNoInteractions(dispatcherRegistry); + verify(transferProcessStore, atLeastOnce()).save(argThat(p -> p.getState() == SUSPENDED.code())); + verify(listener).suspended(process); + }); + } + + @Test + void shouldTransitionToSuspendingRequested_whenDataFlowSuspensionFails() { + var process = createTransferProcessBuilder(SUSPENDING_REQUESTED).type(PROVIDER).correlationId("counterPartyId").build(); + when(transferProcessStore.nextNotLeased(anyInt(), stateIs(SUSPENDING_REQUESTED.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); + when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(SUSPENDING_REQUESTED.code()).build()); + when(dataFlowManager.suspend(any())).thenReturn(StatusResult.failure(ERROR_RETRY)); + + manager.start(); + + await().untilAsserted(() -> { + verify(dataFlowManager).suspend(process); + verifyNoInteractions(dispatcherRegistry); + verify(transferProcessStore, atLeastOnce()).save(argThat(p -> p.getState() == SUSPENDING_REQUESTED.code())); + }); + } + + } + + @Nested + class SuspendingRequestedConsumer { + + @Test + void shouldSuspendDataFlowAndTransitionToSuspendedAndNotSendMessage_whenMessageWasSentByCounterPart() { + var process = createTransferProcessBuilder(SUSPENDING_REQUESTED).type(PROVIDER).correlationId("counterPartyId").build(); + when(transferProcessStore.nextNotLeased(anyInt(), stateIs(SUSPENDING_REQUESTED.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); + when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(SUSPENDING_REQUESTED.code()).build()); + when(dataFlowManager.suspend(any())).thenReturn(StatusResult.success()); + + manager.start(); + + await().untilAsserted(() -> { + verify(dataFlowManager).suspend(process); + verifyNoInteractions(dispatcherRegistry); + verify(transferProcessStore, atLeastOnce()).save(argThat(p -> p.getState() == SUSPENDED.code())); + verify(listener).suspended(process); + }); + } + + } + + @ParameterizedTest + @ArgumentsSource(DispatchFailureArguments.class) + void dispatchFailure(TransferProcessStates starting, TransferProcessStates ending, CompletableFuture> result, UnaryOperator builderEnricher) { + var transferProcess = builderEnricher.apply(createTransferProcessBuilder(starting).state(starting.code())).build(); + when(transferProcessStore.nextNotLeased(anyInt(), or(stateIs(starting.code()), or(consumerStateIs(starting.code()), providerStateIs(starting.code()))))) + .thenReturn(List.of(transferProcess)).thenReturn(emptyList()); + when(dispatcherRegistry.dispatch(any(), any())).thenReturn(result); + when(transferProcessStore.findById(transferProcess.getId())).thenReturn(transferProcess); + when(dataFlowManager.suspend(any())).thenReturn(StatusResult.success()); + when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success()); + + manager.start(); + + await().untilAsserted(() -> { + var captor = ArgumentCaptor.forClass(TransferProcess.class); + verify(transferProcessStore).save(captor.capture()); + assertThat(captor.getAllValues()).hasSize(1).first().satisfies(n -> { + assertThat(n.getState()).isEqualTo(ending.code()); + }); + verify(dispatcherRegistry, only()).dispatch(any(), any()); + }); + } + + private Criterion[] consumerStateIs(int state) { + return aryEq(new Criterion[]{ hasState(state), isNotPending(), criterion("type", "=", CONSUMER.name()) }); + } + + private Criterion[] providerStateIs(int state) { + return aryEq(new Criterion[]{ hasState(state), isNotPending(), criterion("type", "=", PROVIDER.name()) }); + } + + private Criterion[] stateIs(int state) { + return aryEq(new Criterion[]{ hasState(state), isNotPending() }); + } + + private DataFlowResponse createDataFlowResponse() { + return DataFlowResponse.Builder.newInstance() + .dataAddress(DataAddress.Builder.newInstance() + .type("type") + .build()) + .build(); + } + + private TransferProcess createTransferProcess(TransferProcessStates inState) { + return createTransferProcessBuilder(inState).build(); + } + + private TransferProcess.Builder createTransferProcessBuilder(TransferProcessStates state) { + var processId = UUID.randomUUID().toString(); + + return TransferProcess.Builder.newInstance() + .provisionedResourceSet(ProvisionedResourceSet.Builder.newInstance().build()) + .type(CONSUMER) + .id("test-process-" + processId) + .state(state.code()) + .correlationId(UUID.randomUUID().toString()) + .counterPartyAddress("http://an/address") + .contractId(UUID.randomUUID().toString()) + .assetId(UUID.randomUUID().toString()) + .dataDestination(DataAddress.Builder.newInstance().type(DESTINATION_TYPE).build()) + .protocol("protocol"); + } + + private ProvisionedDataDestinationResource provisionedDataDestinationResource() { + return new TestProvisionedDataDestinationResource("test-resource", PROVISIONED_RESOURCE_ID); + } + + private static class DispatchFailureArguments implements ArgumentsProvider { + + private static final int RETRIES_NOT_EXHAUSTED = RETRY_LIMIT; + private static final int RETRIES_EXHAUSTED = RETRIES_NOT_EXHAUSTED + 1; + + @Override + public Stream provideArguments(ExtensionContext extensionContext) { + CompletableFuture> genericError = failedFuture(new EdcException("error")); + var fatalError = completedFuture(StatusResult.failure(FATAL_ERROR)); + return Stream.of( + // retries not exhausted + new DispatchFailure(REQUESTING, REQUESTING, genericError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), + new DispatchFailure(STARTING, STARTING, genericError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED).type(PROVIDER)), + new DispatchFailure(COMPLETING, COMPLETING, genericError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), + new DispatchFailure(SUSPENDING, SUSPENDING, genericError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), + new DispatchFailure(RESUMING, RESUMING, genericError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED).type(PROVIDER)), + new DispatchFailure(RESUMING, RESUMING, genericError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED).type(CONSUMER)), + new DispatchFailure(TERMINATING, TERMINATING, genericError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), + // retries exhausted + new DispatchFailure(REQUESTING, TERMINATED, genericError, b -> b.stateCount(RETRIES_EXHAUSTED)), + new DispatchFailure(STARTING, TERMINATING, genericError, b -> b.stateCount(RETRIES_EXHAUSTED).type(PROVIDER)), + new DispatchFailure(COMPLETING, TERMINATING, genericError, b -> b.stateCount(RETRIES_EXHAUSTED)), + new DispatchFailure(SUSPENDING, TERMINATING, genericError, b -> b.stateCount(RETRIES_EXHAUSTED)), + new DispatchFailure(RESUMING, TERMINATING, genericError, b -> b.stateCount(RETRIES_EXHAUSTED).type(CONSUMER)), + new DispatchFailure(RESUMING, TERMINATING, genericError, b -> b.stateCount(RETRIES_EXHAUSTED).type(PROVIDER)), + new DispatchFailure(TERMINATING, TERMINATED, genericError, b -> b.stateCount(RETRIES_EXHAUSTED)), + // fatal error, in this case retry should never be done + new DispatchFailure(REQUESTING, TERMINATED, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), + new DispatchFailure(STARTING, TERMINATING, fatalError, b -> b.type(PROVIDER).stateCount(RETRIES_NOT_EXHAUSTED)), + new DispatchFailure(COMPLETING, TERMINATING, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), + new DispatchFailure(SUSPENDING, TERMINATING, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)), + new DispatchFailure(RESUMING, TERMINATING, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED).type(CONSUMER)), + new DispatchFailure(RESUMING, TERMINATING, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED).type(PROVIDER)), + new DispatchFailure(TERMINATING, TERMINATED, fatalError, b -> b.stateCount(RETRIES_NOT_EXHAUSTED)) + ); + } + } + + } diff --git a/data-protocols/dsp/dsp-transfer-process/lib/dsp-transfer-process-transform-lib/src/main/java/org/eclipse/edc/protocol/dsp/transferprocess/transform/type/v2024/from/JsonObjectFromTransferProcessV2024Transformer.java b/data-protocols/dsp/dsp-transfer-process/lib/dsp-transfer-process-transform-lib/src/main/java/org/eclipse/edc/protocol/dsp/transferprocess/transform/type/v2024/from/JsonObjectFromTransferProcessV2024Transformer.java index cd55cd40631..30327f0b983 100644 --- a/data-protocols/dsp/dsp-transfer-process/lib/dsp-transfer-process-transform-lib/src/main/java/org/eclipse/edc/protocol/dsp/transferprocess/transform/type/v2024/from/JsonObjectFromTransferProcessV2024Transformer.java +++ b/data-protocols/dsp/dsp-transfer-process/lib/dsp-transfer-process-transform-lib/src/main/java/org/eclipse/edc/protocol/dsp/transferprocess/transform/type/v2024/from/JsonObjectFromTransferProcessV2024Transformer.java @@ -82,7 +82,8 @@ private String state(Integer state, TransformerContext context) { return switch (transferProcessState) { case INITIAL, REQUESTING, REQUESTED -> forNamespace(DSPACE_VALUE_TRANSFER_STATE_REQUESTED_TERM); case STARTING, STARTED -> forNamespace(DSPACE_VALUE_TRANSFER_STATE_STARTED_TERM); - case SUSPENDING, SUSPENDED, RESUMING, RESUMED -> forNamespace(DSPACE_VALUE_TRANSFER_STATE_SUSPENDED_TERM); + case SUSPENDING, SUSPENDED, SUSPENDING_REQUESTED, RESUMING, RESUMED -> + forNamespace(DSPACE_VALUE_TRANSFER_STATE_SUSPENDED_TERM); case COMPLETING, COMPLETED, DEPROVISIONING, DEPROVISIONING_REQUESTED, DEPROVISIONED -> forNamespace(DSPACE_VALUE_TRANSFER_STATE_COMPLETED_TERM); case TERMINATING, TERMINATING_REQUESTED, TERMINATED -> forNamespace(DSPACE_VALUE_TRANSFER_STATE_TERMINATED_TERM); diff --git a/data-protocols/dsp/dsp-transfer-process/lib/dsp-transfer-process-transform-lib/src/test/java/org/eclipse/edc/protocol/dsp/transferprocess/transform/v2024/from/JsonObjectFromTransferProcessV2024TransformerTest.java b/data-protocols/dsp/dsp-transfer-process/lib/dsp-transfer-process-transform-lib/src/test/java/org/eclipse/edc/protocol/dsp/transferprocess/transform/v2024/from/JsonObjectFromTransferProcessV2024TransformerTest.java index 54d71f6a2f8..775e4a80f36 100644 --- a/data-protocols/dsp/dsp-transfer-process/lib/dsp-transfer-process-transform-lib/src/test/java/org/eclipse/edc/protocol/dsp/transferprocess/transform/v2024/from/JsonObjectFromTransferProcessV2024TransformerTest.java +++ b/data-protocols/dsp/dsp-transfer-process/lib/dsp-transfer-process-transform-lib/src/test/java/org/eclipse/edc/protocol/dsp/transferprocess/transform/v2024/from/JsonObjectFromTransferProcessV2024TransformerTest.java @@ -46,6 +46,7 @@ import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDING; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDING_REQUESTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATING_REQUESTED; @@ -169,6 +170,7 @@ public Stream provideArguments(ExtensionContext context) { arguments(STARTING, toIri(DSPACE_VALUE_TRANSFER_STATE_STARTED_TERM)), arguments(STARTED, toIri(DSPACE_VALUE_TRANSFER_STATE_STARTED_TERM)), arguments(SUSPENDING, toIri(DSPACE_VALUE_TRANSFER_STATE_SUSPENDED_TERM)), + arguments(SUSPENDING_REQUESTED, toIri(DSPACE_VALUE_TRANSFER_STATE_SUSPENDED_TERM)), arguments(SUSPENDED, toIri(DSPACE_VALUE_TRANSFER_STATE_SUSPENDED_TERM)), arguments(RESUMING, toIri(DSPACE_VALUE_TRANSFER_STATE_SUSPENDED_TERM)), arguments(RESUMED, toIri(DSPACE_VALUE_TRANSFER_STATE_SUSPENDED_TERM)), diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java index cd38d81899a..24e48e87d51 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcess.java @@ -63,6 +63,7 @@ import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDING; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDING_REQUESTED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATING; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATING_REQUESTED; @@ -336,7 +337,8 @@ public void transitionDeprovisioned() { public boolean canBeTerminated() { return currentStateIsOneOf(INITIAL, PROVISIONING, PROVISIONING_REQUESTED, PROVISIONED, REQUESTING, REQUESTED, - STARTING, STARTED, COMPLETING, SUSPENDING, SUSPENDED, RESUMING, TERMINATING, TERMINATING_REQUESTED); + STARTING, STARTED, COMPLETING, SUSPENDING, SUSPENDING_REQUESTED, SUSPENDED, RESUMING, TERMINATING, + TERMINATING_REQUESTED); } public void transitionTerminating(@Nullable String errorDetail) { @@ -367,7 +369,7 @@ public void transitionTerminated() { } public boolean canBeSuspended() { - return currentStateIsOneOf(STARTED, SUSPENDING); + return currentStateIsOneOf(STARTED, SUSPENDING, SUSPENDING_REQUESTED); } public void transitionSuspending(String reason) { @@ -375,6 +377,15 @@ public void transitionSuspending(String reason) { transition(SUSPENDING, state -> canBeSuspended()); } + public void transitionSuspendingRequested(@Nullable String errorDetail) { + this.errorDetail = errorDetail; + transitionSuspendingRequested(); + } + + public void transitionSuspendingRequested() { + transition(SUSPENDING_REQUESTED, state -> canBeSuspended()); + } + public void transitionSuspended(String reason) { this.errorDetail = reason; transitionSuspended(); @@ -490,6 +501,10 @@ public String stateAsString() { return TransferProcessStates.from(state).name(); } + public boolean suspensionWasRequestedByCounterParty() { + return getState() == SUSPENDING_REQUESTED.code(); + } + public boolean terminationWasRequestedByCounterParty() { return getState() == TERMINATING_REQUESTED.code(); } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessStates.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessStates.java index 5023af4fefd..011b1e73837 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessStates.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/controlplane/transfer/spi/types/TransferProcessStates.java @@ -31,6 +31,7 @@ public enum TransferProcessStates { STARTING(550), STARTED(600), SUSPENDING(650), + SUSPENDING_REQUESTED(675), SUSPENDED(700), RESUMING(720), RESUMED(725),