diff --git a/kotlin/pom.xml b/kotlin/pom.xml index e310c5c6..ca7272a2 100644 --- a/kotlin/pom.xml +++ b/kotlin/pom.xml @@ -39,6 +39,12 @@ axon-configuration provided + + io.projectreactor + reactor-core + 3.4.5 + test + diff --git a/kotlin/src/main/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensions.kt b/kotlin/src/main/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensions.kt index e9053e14..fdfa12d3 100644 --- a/kotlin/src/main/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensions.kt +++ b/kotlin/src/main/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensions.kt @@ -18,6 +18,7 @@ package org.axonframework.extensions.kotlin import org.axonframework.messaging.responsetypes.ResponseTypes import org.axonframework.queryhandling.QueryGateway +import org.axonframework.queryhandling.SubscriptionQueryResult import java.util.* import java.util.concurrent.CompletableFuture import java.util.concurrent.TimeUnit @@ -136,7 +137,7 @@ inline fun QueryGateway.queryOptional(queryName: String, * @since 0.2.0 */ inline fun QueryGateway.scatterGather(query: Q, timeout: Long, - timeUnit: TimeUnit): Stream { + timeUnit: TimeUnit): Stream { return this.scatterGather(query, ResponseTypes.instanceOf(R::class.java), timeout, timeUnit) } @@ -155,7 +156,7 @@ inline fun QueryGateway.scatterGather(query: Q, timeout: * @since 0.2.0 */ inline fun QueryGateway.scatterGather(queryName: String, query: Q, timeout: Long, - timeUnit: TimeUnit): Stream { + timeUnit: TimeUnit): Stream { return this.scatterGather(queryName, query, ResponseTypes.instanceOf(R::class.java), timeout, timeUnit) } @@ -173,7 +174,7 @@ inline fun QueryGateway.scatterGather(queryName: String, * @since 0.2.0 */ inline fun QueryGateway.scatterGatherMany(query: Q, timeout: Long, - timeUnit: TimeUnit): Stream> { + timeUnit: TimeUnit): Stream> { return this.scatterGather(query, ResponseTypes.multipleInstancesOf(R::class.java), timeout, timeUnit) } @@ -192,7 +193,7 @@ inline fun QueryGateway.scatterGatherMany(query: Q, timeo * @since 0.2.0 */ inline fun QueryGateway.scatterGatherMany(queryName: String, query: Q, timeout: Long, - timeUnit: TimeUnit): Stream> { + timeUnit: TimeUnit): Stream> { return this.scatterGather(queryName, query, ResponseTypes.multipleInstancesOf(R::class.java), timeout, timeUnit) } @@ -210,7 +211,7 @@ inline fun QueryGateway.scatterGatherMany(queryName: Stri * @since 0.2.0 */ inline fun QueryGateway.scatterGatherOptional(query: Q, timeout: Long, - timeUnit: TimeUnit): Stream> { + timeUnit: TimeUnit): Stream> { return this.scatterGather(query, ResponseTypes.optionalInstanceOf(R::class.java), timeout, timeUnit) } @@ -229,6 +230,37 @@ inline fun QueryGateway.scatterGatherOptional(query: Q, t * @since 0.2.0 */ inline fun QueryGateway.scatterGatherOptional(queryName: String, query: Q, timeout: Long, - timeUnit: TimeUnit): Stream> { + timeUnit: TimeUnit): Stream> { return this.scatterGather(queryName, query, ResponseTypes.optionalInstanceOf(R::class.java), timeout, timeUnit) -} \ No newline at end of file +} + +/** + * Reified version of [QueryGateway.subscriptionQuery] + * which expects a single object as a response using [org.axonframework.messaging.responsetypes.InstanceResponseType] + * @param query Query to send + * @param Q the type of payload of the query + * @param I the type of initial response + * @param U the type of update response + * @return [SubscriptionQueryResult] wrapping the result of the query + * @see QueryGateway.subscriptionQuery + * @see ResponseTypes + * @since 0.3.0 + */ +inline fun QueryGateway.subscriptionQuery(query: Q): SubscriptionQueryResult = + this.subscriptionQuery(query, ResponseTypes.instanceOf(I::class.java), ResponseTypes.instanceOf(U::class.java)) + +/** + * Reified version of [QueryGateway.subscriptionQuery] + * which expects a single object as a response using [org.axonframework.messaging.responsetypes.InstanceResponseType] + * @param queryName Name of the query + * @param query Query to send + * @param Q the type of payload of the query + * @param I the type of initial response + * @param U the type of update response + * @return [SubscriptionQueryResult] wrapping the result of the query + * @see QueryGateway.subscriptionQuery + * @see ResponseTypes + * @since 0.3.0 + */ +inline fun QueryGateway.subscriptionQuery(queryName: String, query: Q): SubscriptionQueryResult = + this.subscriptionQuery(queryName, query, ResponseTypes.instanceOf(I::class.java), ResponseTypes.instanceOf(U::class.java)) \ No newline at end of file diff --git a/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensionsTest.kt b/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensionsTest.kt index 90a64f53..5e95f89e 100644 --- a/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensionsTest.kt +++ b/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensionsTest.kt @@ -19,9 +19,8 @@ import io.mockk.clearMocks import io.mockk.every import io.mockk.mockk import io.mockk.verify -import org.axonframework.messaging.responsetypes.AbstractResponseType -import org.axonframework.messaging.responsetypes.InstanceResponseType import org.axonframework.queryhandling.QueryGateway +import org.axonframework.queryhandling.SubscriptionQueryResult import java.util.* import java.util.concurrent.CompletableFuture import java.util.concurrent.TimeUnit @@ -47,6 +46,7 @@ internal class QueryGatewayExtensionsTest { private val streamInstanceReturnValue = Stream.of("Value") private val streamMultipleReturnValue = Stream.of(listOf("Value", "Second Value")) private val streamOptionalReturnValue = Stream.of(Optional.of("Value")) + private val subscriptionQueryResult = ExampleSubscriptionQueryResult() @BeforeTest fun before() { @@ -62,6 +62,15 @@ internal class QueryGatewayExtensionsTest { every { subjectGateway.scatterGather(queryName, exampleQuery, matchInstanceResponseType(), timeout, timeUnit) } returns streamInstanceReturnValue every { subjectGateway.scatterGather(queryName, exampleQuery, matchMultipleInstancesResponseType(), timeout, timeUnit) } returns streamMultipleReturnValue every { subjectGateway.scatterGather(queryName, exampleQuery, matchOptionalResponseType(), timeout, timeUnit) } returns streamOptionalReturnValue + every { subjectGateway.subscriptionQuery(exampleQuery, matchInstanceResponseType(), matchInstanceResponseType()) } returns subscriptionQueryResult + every { + subjectGateway.subscriptionQuery( + queryName, + exampleQuery, + matchInstanceResponseType(), + matchInstanceResponseType() + ) + } returns subscriptionQueryResult } @AfterTest @@ -80,7 +89,7 @@ internal class QueryGatewayExtensionsTest { @Test fun `Query without queryName should invoke query method and not require explicit generic types`() { - val queryResult:CompletableFuture = subjectGateway.query(query = exampleQuery) + val queryResult: CompletableFuture = subjectGateway.query(query = exampleQuery) assertSame(queryResult, instanceReturnValue) verify(exactly = 1) { subjectGateway.query(exampleQuery, matchExpectedResponseType(String::class.java)) @@ -90,7 +99,6 @@ internal class QueryGatewayExtensionsTest { @Test fun `Query without queryName Optional should invoke query method with correct generic parameters`() { val queryResult = subjectGateway.queryOptional(query = exampleQuery) - assertSame(queryResult, optionalReturnValue) verify(exactly = 1) { subjectGateway.query(exampleQuery, matchExpectedResponseType(String::class.java)) } } @@ -127,7 +135,7 @@ internal class QueryGatewayExtensionsTest { } val queryResult = nullableQueryGateway.query(query = exampleQuery) - + assertSame(queryResult, nullInstanceReturnValue) assertEquals(nullInstanceReturnValue.get(), null) verify(exactly = 1) { nullableQueryGateway.query(exampleQuery, matchExpectedResponseType(String::class.java)) } @@ -185,7 +193,7 @@ internal class QueryGatewayExtensionsTest { fun `Query should handle nullable responses`() { val nullInstanceReturnValue: CompletableFuture = CompletableFuture.completedFuture(null) val nullableQueryGateway = mockk { - every { query(queryName, exampleQuery, matchInstanceResponseType() ) } returns nullInstanceReturnValue + every { query(queryName, exampleQuery, matchInstanceResponseType()) } returns nullInstanceReturnValue } val queryResult = nullableQueryGateway.query(queryName = queryName, query = exampleQuery) @@ -195,29 +203,29 @@ internal class QueryGatewayExtensionsTest { verify(exactly = 1) { nullableQueryGateway.query(queryName, exampleQuery, matchExpectedResponseType(String::class.java)) } } - @Test - fun `ScatterGather for Single should invoke scatterGather method with correct generic parameters`() { - val result = subjectGateway.scatterGather( - query = exampleQuery, - timeout = timeout, - timeUnit = timeUnit - ) + @Test + fun `ScatterGather for Single should invoke scatterGather method with correct generic parameters`() { + val result = subjectGateway.scatterGather( + query = exampleQuery, + timeout = timeout, + timeUnit = timeUnit + ) - assertSame(result, streamInstanceReturnValue) - verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchExpectedResponseType(String::class.java), timeout, timeUnit) } - } + assertSame(result, streamInstanceReturnValue) + verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchExpectedResponseType(String::class.java), timeout, timeUnit) } + } - @Test - fun `ScatterGather for Multiple should invoke scatterGather method with correct generic parameters`() { - val result = subjectGateway.scatterGatherMany( - query = exampleQuery, - timeout = timeout, - timeUnit = timeUnit - ) + @Test + fun `ScatterGather for Multiple should invoke scatterGather method with correct generic parameters`() { + val result = subjectGateway.scatterGatherMany( + query = exampleQuery, + timeout = timeout, + timeUnit = timeUnit + ) - assertSame(result, streamMultipleReturnValue) - verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchMultipleInstancesResponseType(), timeout, timeUnit) } - } + assertSame(result, streamMultipleReturnValue) + verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchMultipleInstancesResponseType(), timeout, timeUnit) } + } @Test fun `ScatterGather for Optional should invoke scatterGather method with correct generic parameters`() { @@ -270,4 +278,57 @@ internal class QueryGatewayExtensionsTest { verify(exactly = 1) { subjectGateway.scatterGather(queryName, exampleQuery, matchOptionalResponseType(), timeout, timeUnit) } } + @Test + fun `Query without queryName should invoke subscription query method with correct generic parameters`() { + val queryResult = subjectGateway.subscriptionQuery(query = exampleQuery) + assertSame(queryResult, subscriptionQueryResult) + verify(exactly = 1) { + subjectGateway.subscriptionQuery( + exampleQuery, + matchExpectedResponseType(InitialResponseType::class.java), + matchExpectedResponseType(UpdateResponseType::class.java) + ) + } + } + + @Test + fun `Query without queryName should invoke subscriptionQuery method and not require explicit generic types`() { + val queryResult: SubscriptionQueryResult = subjectGateway.subscriptionQuery(query = exampleQuery) + assertSame(queryResult, subscriptionQueryResult) + verify(exactly = 1) { + subjectGateway.subscriptionQuery( + exampleQuery, + matchExpectedResponseType(InitialResponseType::class.java), + matchExpectedResponseType(UpdateResponseType::class.java) + ) + } + } + + @Test + fun `Query should invoke subscriptionQuery method with correct generic parameters`() { + val queryResult = subjectGateway.subscriptionQuery(queryName = queryName, query = exampleQuery) + assertSame(queryResult, subscriptionQueryResult) + verify(exactly = 1) { + subjectGateway.subscriptionQuery( + queryName, + exampleQuery, + matchExpectedResponseType(InitialResponseType::class.java), + matchExpectedResponseType(UpdateResponseType::class.java) + ) + } + } + + @Test + fun `Query should invoke subscriptionQuery method and not require explicit generic types`() { + val queryResult: SubscriptionQueryResult = subjectGateway.subscriptionQuery(queryName = queryName, query = exampleQuery) + assertSame(queryResult, subscriptionQueryResult) + verify(exactly = 1) { + subjectGateway.subscriptionQuery( + queryName, + exampleQuery, + matchExpectedResponseType(InitialResponseType::class.java), + matchExpectedResponseType(UpdateResponseType::class.java) + ) + } + } } diff --git a/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/testObjects.kt b/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/testObjects.kt index 99c89613..b3fbc0ca 100644 --- a/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/testObjects.kt +++ b/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/testObjects.kt @@ -1,6 +1,9 @@ package org.axonframework.extensions.kotlin import org.axonframework.modelling.command.TargetAggregateIdentifier +import org.axonframework.queryhandling.SubscriptionQueryResult +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono /** * Simple Query class to be used in tests. @@ -11,3 +14,30 @@ internal data class ExampleQuery(val value: Number) * Simple Command class to be used in tests. */ internal data class ExampleCommand(@TargetAggregateIdentifier val id: String) + +/** + * Class used as update response type in subscriptionQuery method. + */ +internal data class UpdateResponseType(val dummy: String) + +/** + * Class used as initial response type in subscriptionQuery method. + */ +internal data class InitialResponseType(val dummy: String) + +/** + * Dummy class used as return object from subscriptionQuery method in the mock. + */ +internal class ExampleSubscriptionQueryResult : SubscriptionQueryResult { + override fun cancel(): Boolean { + TODO("Not yet implemented") + } + + override fun initialResult(): Mono { + TODO("Not yet implemented") + } + + override fun updates(): Flux { + TODO("Not yet implemented") + } +}