diff --git a/kotlin/pom.xml b/kotlin/pom.xml
index e310c5c6..ca7272a2 100644
--- a/kotlin/pom.xml
+++ b/kotlin/pom.xml
@@ -39,6 +39,12 @@
axon-configuration
provided
+
+ io.projectreactor
+ reactor-core
+ 3.4.5
+ test
+
diff --git a/kotlin/src/main/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensions.kt b/kotlin/src/main/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensions.kt
index e9053e14..fdfa12d3 100644
--- a/kotlin/src/main/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensions.kt
+++ b/kotlin/src/main/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensions.kt
@@ -18,6 +18,7 @@ package org.axonframework.extensions.kotlin
import org.axonframework.messaging.responsetypes.ResponseTypes
import org.axonframework.queryhandling.QueryGateway
+import org.axonframework.queryhandling.SubscriptionQueryResult
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
@@ -136,7 +137,7 @@ inline fun QueryGateway.queryOptional(queryName: String,
* @since 0.2.0
*/
inline fun QueryGateway.scatterGather(query: Q, timeout: Long,
- timeUnit: TimeUnit): Stream {
+ timeUnit: TimeUnit): Stream {
return this.scatterGather(query, ResponseTypes.instanceOf(R::class.java), timeout, timeUnit)
}
@@ -155,7 +156,7 @@ inline fun QueryGateway.scatterGather(query: Q, timeout:
* @since 0.2.0
*/
inline fun QueryGateway.scatterGather(queryName: String, query: Q, timeout: Long,
- timeUnit: TimeUnit): Stream {
+ timeUnit: TimeUnit): Stream {
return this.scatterGather(queryName, query, ResponseTypes.instanceOf(R::class.java), timeout, timeUnit)
}
@@ -173,7 +174,7 @@ inline fun QueryGateway.scatterGather(queryName: String,
* @since 0.2.0
*/
inline fun QueryGateway.scatterGatherMany(query: Q, timeout: Long,
- timeUnit: TimeUnit): Stream> {
+ timeUnit: TimeUnit): Stream> {
return this.scatterGather(query, ResponseTypes.multipleInstancesOf(R::class.java), timeout, timeUnit)
}
@@ -192,7 +193,7 @@ inline fun QueryGateway.scatterGatherMany(query: Q, timeo
* @since 0.2.0
*/
inline fun QueryGateway.scatterGatherMany(queryName: String, query: Q, timeout: Long,
- timeUnit: TimeUnit): Stream> {
+ timeUnit: TimeUnit): Stream> {
return this.scatterGather(queryName, query, ResponseTypes.multipleInstancesOf(R::class.java), timeout, timeUnit)
}
@@ -210,7 +211,7 @@ inline fun QueryGateway.scatterGatherMany(queryName: Stri
* @since 0.2.0
*/
inline fun QueryGateway.scatterGatherOptional(query: Q, timeout: Long,
- timeUnit: TimeUnit): Stream> {
+ timeUnit: TimeUnit): Stream> {
return this.scatterGather(query, ResponseTypes.optionalInstanceOf(R::class.java), timeout, timeUnit)
}
@@ -229,6 +230,37 @@ inline fun QueryGateway.scatterGatherOptional(query: Q, t
* @since 0.2.0
*/
inline fun QueryGateway.scatterGatherOptional(queryName: String, query: Q, timeout: Long,
- timeUnit: TimeUnit): Stream> {
+ timeUnit: TimeUnit): Stream> {
return this.scatterGather(queryName, query, ResponseTypes.optionalInstanceOf(R::class.java), timeout, timeUnit)
-}
\ No newline at end of file
+}
+
+/**
+ * Reified version of [QueryGateway.subscriptionQuery]
+ * which expects a single object as a response using [org.axonframework.messaging.responsetypes.InstanceResponseType]
+ * @param query Query to send
+ * @param Q the type of payload of the query
+ * @param I the type of initial response
+ * @param U the type of update response
+ * @return [SubscriptionQueryResult] wrapping the result of the query
+ * @see QueryGateway.subscriptionQuery
+ * @see ResponseTypes
+ * @since 0.3.0
+ */
+inline fun QueryGateway.subscriptionQuery(query: Q): SubscriptionQueryResult =
+ this.subscriptionQuery(query, ResponseTypes.instanceOf(I::class.java), ResponseTypes.instanceOf(U::class.java))
+
+/**
+ * Reified version of [QueryGateway.subscriptionQuery]
+ * which expects a single object as a response using [org.axonframework.messaging.responsetypes.InstanceResponseType]
+ * @param queryName Name of the query
+ * @param query Query to send
+ * @param Q the type of payload of the query
+ * @param I the type of initial response
+ * @param U the type of update response
+ * @return [SubscriptionQueryResult] wrapping the result of the query
+ * @see QueryGateway.subscriptionQuery
+ * @see ResponseTypes
+ * @since 0.3.0
+ */
+inline fun QueryGateway.subscriptionQuery(queryName: String, query: Q): SubscriptionQueryResult =
+ this.subscriptionQuery(queryName, query, ResponseTypes.instanceOf(I::class.java), ResponseTypes.instanceOf(U::class.java))
\ No newline at end of file
diff --git a/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensionsTest.kt b/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensionsTest.kt
index 90a64f53..5e95f89e 100644
--- a/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensionsTest.kt
+++ b/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensionsTest.kt
@@ -19,9 +19,8 @@ import io.mockk.clearMocks
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
-import org.axonframework.messaging.responsetypes.AbstractResponseType
-import org.axonframework.messaging.responsetypes.InstanceResponseType
import org.axonframework.queryhandling.QueryGateway
+import org.axonframework.queryhandling.SubscriptionQueryResult
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
@@ -47,6 +46,7 @@ internal class QueryGatewayExtensionsTest {
private val streamInstanceReturnValue = Stream.of("Value")
private val streamMultipleReturnValue = Stream.of(listOf("Value", "Second Value"))
private val streamOptionalReturnValue = Stream.of(Optional.of("Value"))
+ private val subscriptionQueryResult = ExampleSubscriptionQueryResult()
@BeforeTest
fun before() {
@@ -62,6 +62,15 @@ internal class QueryGatewayExtensionsTest {
every { subjectGateway.scatterGather(queryName, exampleQuery, matchInstanceResponseType(), timeout, timeUnit) } returns streamInstanceReturnValue
every { subjectGateway.scatterGather(queryName, exampleQuery, matchMultipleInstancesResponseType(), timeout, timeUnit) } returns streamMultipleReturnValue
every { subjectGateway.scatterGather(queryName, exampleQuery, matchOptionalResponseType(), timeout, timeUnit) } returns streamOptionalReturnValue
+ every { subjectGateway.subscriptionQuery(exampleQuery, matchInstanceResponseType(), matchInstanceResponseType()) } returns subscriptionQueryResult
+ every {
+ subjectGateway.subscriptionQuery(
+ queryName,
+ exampleQuery,
+ matchInstanceResponseType(),
+ matchInstanceResponseType()
+ )
+ } returns subscriptionQueryResult
}
@AfterTest
@@ -80,7 +89,7 @@ internal class QueryGatewayExtensionsTest {
@Test
fun `Query without queryName should invoke query method and not require explicit generic types`() {
- val queryResult:CompletableFuture = subjectGateway.query(query = exampleQuery)
+ val queryResult: CompletableFuture = subjectGateway.query(query = exampleQuery)
assertSame(queryResult, instanceReturnValue)
verify(exactly = 1) {
subjectGateway.query(exampleQuery, matchExpectedResponseType(String::class.java))
@@ -90,7 +99,6 @@ internal class QueryGatewayExtensionsTest {
@Test
fun `Query without queryName Optional should invoke query method with correct generic parameters`() {
val queryResult = subjectGateway.queryOptional(query = exampleQuery)
-
assertSame(queryResult, optionalReturnValue)
verify(exactly = 1) { subjectGateway.query(exampleQuery, matchExpectedResponseType(String::class.java)) }
}
@@ -127,7 +135,7 @@ internal class QueryGatewayExtensionsTest {
}
val queryResult = nullableQueryGateway.query(query = exampleQuery)
-
+
assertSame(queryResult, nullInstanceReturnValue)
assertEquals(nullInstanceReturnValue.get(), null)
verify(exactly = 1) { nullableQueryGateway.query(exampleQuery, matchExpectedResponseType(String::class.java)) }
@@ -185,7 +193,7 @@ internal class QueryGatewayExtensionsTest {
fun `Query should handle nullable responses`() {
val nullInstanceReturnValue: CompletableFuture = CompletableFuture.completedFuture(null)
val nullableQueryGateway = mockk {
- every { query(queryName, exampleQuery, matchInstanceResponseType() ) } returns nullInstanceReturnValue
+ every { query(queryName, exampleQuery, matchInstanceResponseType()) } returns nullInstanceReturnValue
}
val queryResult = nullableQueryGateway.query(queryName = queryName, query = exampleQuery)
@@ -195,29 +203,29 @@ internal class QueryGatewayExtensionsTest {
verify(exactly = 1) { nullableQueryGateway.query(queryName, exampleQuery, matchExpectedResponseType(String::class.java)) }
}
- @Test
- fun `ScatterGather for Single should invoke scatterGather method with correct generic parameters`() {
- val result = subjectGateway.scatterGather(
- query = exampleQuery,
- timeout = timeout,
- timeUnit = timeUnit
- )
+ @Test
+ fun `ScatterGather for Single should invoke scatterGather method with correct generic parameters`() {
+ val result = subjectGateway.scatterGather(
+ query = exampleQuery,
+ timeout = timeout,
+ timeUnit = timeUnit
+ )
- assertSame(result, streamInstanceReturnValue)
- verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchExpectedResponseType(String::class.java), timeout, timeUnit) }
- }
+ assertSame(result, streamInstanceReturnValue)
+ verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchExpectedResponseType(String::class.java), timeout, timeUnit) }
+ }
- @Test
- fun `ScatterGather for Multiple should invoke scatterGather method with correct generic parameters`() {
- val result = subjectGateway.scatterGatherMany(
- query = exampleQuery,
- timeout = timeout,
- timeUnit = timeUnit
- )
+ @Test
+ fun `ScatterGather for Multiple should invoke scatterGather method with correct generic parameters`() {
+ val result = subjectGateway.scatterGatherMany(
+ query = exampleQuery,
+ timeout = timeout,
+ timeUnit = timeUnit
+ )
- assertSame(result, streamMultipleReturnValue)
- verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchMultipleInstancesResponseType(), timeout, timeUnit) }
- }
+ assertSame(result, streamMultipleReturnValue)
+ verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchMultipleInstancesResponseType(), timeout, timeUnit) }
+ }
@Test
fun `ScatterGather for Optional should invoke scatterGather method with correct generic parameters`() {
@@ -270,4 +278,57 @@ internal class QueryGatewayExtensionsTest {
verify(exactly = 1) { subjectGateway.scatterGather(queryName, exampleQuery, matchOptionalResponseType(), timeout, timeUnit) }
}
+ @Test
+ fun `Query without queryName should invoke subscription query method with correct generic parameters`() {
+ val queryResult = subjectGateway.subscriptionQuery(query = exampleQuery)
+ assertSame(queryResult, subscriptionQueryResult)
+ verify(exactly = 1) {
+ subjectGateway.subscriptionQuery(
+ exampleQuery,
+ matchExpectedResponseType(InitialResponseType::class.java),
+ matchExpectedResponseType(UpdateResponseType::class.java)
+ )
+ }
+ }
+
+ @Test
+ fun `Query without queryName should invoke subscriptionQuery method and not require explicit generic types`() {
+ val queryResult: SubscriptionQueryResult = subjectGateway.subscriptionQuery(query = exampleQuery)
+ assertSame(queryResult, subscriptionQueryResult)
+ verify(exactly = 1) {
+ subjectGateway.subscriptionQuery(
+ exampleQuery,
+ matchExpectedResponseType(InitialResponseType::class.java),
+ matchExpectedResponseType(UpdateResponseType::class.java)
+ )
+ }
+ }
+
+ @Test
+ fun `Query should invoke subscriptionQuery method with correct generic parameters`() {
+ val queryResult = subjectGateway.subscriptionQuery(queryName = queryName, query = exampleQuery)
+ assertSame(queryResult, subscriptionQueryResult)
+ verify(exactly = 1) {
+ subjectGateway.subscriptionQuery(
+ queryName,
+ exampleQuery,
+ matchExpectedResponseType(InitialResponseType::class.java),
+ matchExpectedResponseType(UpdateResponseType::class.java)
+ )
+ }
+ }
+
+ @Test
+ fun `Query should invoke subscriptionQuery method and not require explicit generic types`() {
+ val queryResult: SubscriptionQueryResult = subjectGateway.subscriptionQuery(queryName = queryName, query = exampleQuery)
+ assertSame(queryResult, subscriptionQueryResult)
+ verify(exactly = 1) {
+ subjectGateway.subscriptionQuery(
+ queryName,
+ exampleQuery,
+ matchExpectedResponseType(InitialResponseType::class.java),
+ matchExpectedResponseType(UpdateResponseType::class.java)
+ )
+ }
+ }
}
diff --git a/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/testObjects.kt b/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/testObjects.kt
index 99c89613..b3fbc0ca 100644
--- a/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/testObjects.kt
+++ b/kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/testObjects.kt
@@ -1,6 +1,9 @@
package org.axonframework.extensions.kotlin
import org.axonframework.modelling.command.TargetAggregateIdentifier
+import org.axonframework.queryhandling.SubscriptionQueryResult
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
/**
* Simple Query class to be used in tests.
@@ -11,3 +14,30 @@ internal data class ExampleQuery(val value: Number)
* Simple Command class to be used in tests.
*/
internal data class ExampleCommand(@TargetAggregateIdentifier val id: String)
+
+/**
+ * Class used as update response type in subscriptionQuery method.
+ */
+internal data class UpdateResponseType(val dummy: String)
+
+/**
+ * Class used as initial response type in subscriptionQuery method.
+ */
+internal data class InitialResponseType(val dummy: String)
+
+/**
+ * Dummy class used as return object from subscriptionQuery method in the mock.
+ */
+internal class ExampleSubscriptionQueryResult : SubscriptionQueryResult {
+ override fun cancel(): Boolean {
+ TODO("Not yet implemented")
+ }
+
+ override fun initialResult(): Mono {
+ TODO("Not yet implemented")
+ }
+
+ override fun updates(): Flux {
+ TODO("Not yet implemented")
+ }
+}