Skip to content

Commit 0529b48

Browse files
authored
Merge pull request #196 from rsobies/master
[#17] Subscription queries extensions
2 parents 0cc4f4e + 6fdd32d commit 0529b48

File tree

4 files changed

+162
-33
lines changed

4 files changed

+162
-33
lines changed

kotlin/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@
3939
<artifactId>axon-configuration</artifactId>
4040
<scope>provided</scope>
4141
</dependency>
42+
<dependency>
43+
<groupId>io.projectreactor</groupId>
44+
<artifactId>reactor-core</artifactId>
45+
<version>3.4.5</version>
46+
<scope>test</scope>
47+
</dependency>
4248
</dependencies>
4349

4450
<build>

kotlin/src/main/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensions.kt

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.axonframework.extensions.kotlin
1818

1919
import org.axonframework.messaging.responsetypes.ResponseTypes
2020
import org.axonframework.queryhandling.QueryGateway
21+
import org.axonframework.queryhandling.SubscriptionQueryResult
2122
import java.util.*
2223
import java.util.concurrent.CompletableFuture
2324
import java.util.concurrent.TimeUnit
@@ -136,7 +137,7 @@ inline fun <reified R, reified Q> QueryGateway.queryOptional(queryName: String,
136137
* @since 0.2.0
137138
*/
138139
inline fun <reified R, reified Q> QueryGateway.scatterGather(query: Q, timeout: Long,
139-
timeUnit: TimeUnit): Stream<R> {
140+
timeUnit: TimeUnit): Stream<R> {
140141
return this.scatterGather(query, ResponseTypes.instanceOf(R::class.java), timeout, timeUnit)
141142
}
142143

@@ -155,7 +156,7 @@ inline fun <reified R, reified Q> QueryGateway.scatterGather(query: Q, timeout:
155156
* @since 0.2.0
156157
*/
157158
inline fun <reified R, reified Q> QueryGateway.scatterGather(queryName: String, query: Q, timeout: Long,
158-
timeUnit: TimeUnit): Stream<R> {
159+
timeUnit: TimeUnit): Stream<R> {
159160
return this.scatterGather(queryName, query, ResponseTypes.instanceOf(R::class.java), timeout, timeUnit)
160161
}
161162

@@ -173,7 +174,7 @@ inline fun <reified R, reified Q> QueryGateway.scatterGather(queryName: String,
173174
* @since 0.2.0
174175
*/
175176
inline fun <reified R, reified Q> QueryGateway.scatterGatherMany(query: Q, timeout: Long,
176-
timeUnit: TimeUnit): Stream<List<R>> {
177+
timeUnit: TimeUnit): Stream<List<R>> {
177178
return this.scatterGather(query, ResponseTypes.multipleInstancesOf(R::class.java), timeout, timeUnit)
178179
}
179180

@@ -192,7 +193,7 @@ inline fun <reified R, reified Q> QueryGateway.scatterGatherMany(query: Q, timeo
192193
* @since 0.2.0
193194
*/
194195
inline fun <reified R, reified Q> QueryGateway.scatterGatherMany(queryName: String, query: Q, timeout: Long,
195-
timeUnit: TimeUnit): Stream<List<R>> {
196+
timeUnit: TimeUnit): Stream<List<R>> {
196197
return this.scatterGather(queryName, query, ResponseTypes.multipleInstancesOf(R::class.java), timeout, timeUnit)
197198
}
198199

@@ -210,7 +211,7 @@ inline fun <reified R, reified Q> QueryGateway.scatterGatherMany(queryName: Stri
210211
* @since 0.2.0
211212
*/
212213
inline fun <reified R, reified Q> QueryGateway.scatterGatherOptional(query: Q, timeout: Long,
213-
timeUnit: TimeUnit): Stream<Optional<R>> {
214+
timeUnit: TimeUnit): Stream<Optional<R>> {
214215
return this.scatterGather(query, ResponseTypes.optionalInstanceOf(R::class.java), timeout, timeUnit)
215216
}
216217

@@ -229,6 +230,37 @@ inline fun <reified R, reified Q> QueryGateway.scatterGatherOptional(query: Q, t
229230
* @since 0.2.0
230231
*/
231232
inline fun <reified R, reified Q> QueryGateway.scatterGatherOptional(queryName: String, query: Q, timeout: Long,
232-
timeUnit: TimeUnit): Stream<Optional<R>> {
233+
timeUnit: TimeUnit): Stream<Optional<R>> {
233234
return this.scatterGather(queryName, query, ResponseTypes.optionalInstanceOf(R::class.java), timeout, timeUnit)
234-
}
235+
}
236+
237+
/**
238+
* Reified version of [QueryGateway.subscriptionQuery]
239+
* which expects a single object as a response using [org.axonframework.messaging.responsetypes.InstanceResponseType]
240+
* @param query Query to send
241+
* @param Q the type of payload of the query
242+
* @param I the type of initial response
243+
* @param U the type of update response
244+
* @return [SubscriptionQueryResult] wrapping the result of the query
245+
* @see QueryGateway.subscriptionQuery
246+
* @see ResponseTypes
247+
* @since 0.3.0
248+
*/
249+
inline fun <reified Q, reified I, reified U> QueryGateway.subscriptionQuery(query: Q): SubscriptionQueryResult<I, U> =
250+
this.subscriptionQuery(query, ResponseTypes.instanceOf(I::class.java), ResponseTypes.instanceOf(U::class.java))
251+
252+
/**
253+
* Reified version of [QueryGateway.subscriptionQuery]
254+
* which expects a single object as a response using [org.axonframework.messaging.responsetypes.InstanceResponseType]
255+
* @param queryName Name of the query
256+
* @param query Query to send
257+
* @param Q the type of payload of the query
258+
* @param I the type of initial response
259+
* @param U the type of update response
260+
* @return [SubscriptionQueryResult] wrapping the result of the query
261+
* @see QueryGateway.subscriptionQuery
262+
* @see ResponseTypes
263+
* @since 0.3.0
264+
*/
265+
inline fun <reified Q, reified I, reified U> QueryGateway.subscriptionQuery(queryName: String, query: Q): SubscriptionQueryResult<I, U> =
266+
this.subscriptionQuery(queryName, query, ResponseTypes.instanceOf(I::class.java), ResponseTypes.instanceOf(U::class.java))

kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensionsTest.kt

Lines changed: 87 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@ import io.mockk.clearMocks
1919
import io.mockk.every
2020
import io.mockk.mockk
2121
import io.mockk.verify
22-
import org.axonframework.messaging.responsetypes.AbstractResponseType
23-
import org.axonframework.messaging.responsetypes.InstanceResponseType
2422
import org.axonframework.queryhandling.QueryGateway
23+
import org.axonframework.queryhandling.SubscriptionQueryResult
2524
import java.util.*
2625
import java.util.concurrent.CompletableFuture
2726
import java.util.concurrent.TimeUnit
@@ -47,6 +46,7 @@ internal class QueryGatewayExtensionsTest {
4746
private val streamInstanceReturnValue = Stream.of("Value")
4847
private val streamMultipleReturnValue = Stream.of(listOf("Value", "Second Value"))
4948
private val streamOptionalReturnValue = Stream.of(Optional.of("Value"))
49+
private val subscriptionQueryResult = ExampleSubscriptionQueryResult()
5050

5151
@BeforeTest
5252
fun before() {
@@ -62,6 +62,15 @@ internal class QueryGatewayExtensionsTest {
6262
every { subjectGateway.scatterGather(queryName, exampleQuery, matchInstanceResponseType<String>(), timeout, timeUnit) } returns streamInstanceReturnValue
6363
every { subjectGateway.scatterGather(queryName, exampleQuery, matchMultipleInstancesResponseType<String>(), timeout, timeUnit) } returns streamMultipleReturnValue
6464
every { subjectGateway.scatterGather(queryName, exampleQuery, matchOptionalResponseType<String>(), timeout, timeUnit) } returns streamOptionalReturnValue
65+
every { subjectGateway.subscriptionQuery(exampleQuery, matchInstanceResponseType<InitialResponseType>(), matchInstanceResponseType<UpdateResponseType>()) } returns subscriptionQueryResult
66+
every {
67+
subjectGateway.subscriptionQuery(
68+
queryName,
69+
exampleQuery,
70+
matchInstanceResponseType<InitialResponseType>(),
71+
matchInstanceResponseType<UpdateResponseType>()
72+
)
73+
} returns subscriptionQueryResult
6574
}
6675

6776
@AfterTest
@@ -80,7 +89,7 @@ internal class QueryGatewayExtensionsTest {
8089

8190
@Test
8291
fun `Query without queryName should invoke query method and not require explicit generic types`() {
83-
val queryResult:CompletableFuture<String> = subjectGateway.query(query = exampleQuery)
92+
val queryResult: CompletableFuture<String> = subjectGateway.query(query = exampleQuery)
8493
assertSame(queryResult, instanceReturnValue)
8594
verify(exactly = 1) {
8695
subjectGateway.query(exampleQuery, matchExpectedResponseType(String::class.java))
@@ -90,7 +99,6 @@ internal class QueryGatewayExtensionsTest {
9099
@Test
91100
fun `Query without queryName Optional should invoke query method with correct generic parameters`() {
92101
val queryResult = subjectGateway.queryOptional<String, ExampleQuery>(query = exampleQuery)
93-
94102
assertSame(queryResult, optionalReturnValue)
95103
verify(exactly = 1) { subjectGateway.query(exampleQuery, matchExpectedResponseType(String::class.java)) }
96104
}
@@ -127,7 +135,7 @@ internal class QueryGatewayExtensionsTest {
127135
}
128136

129137
val queryResult = nullableQueryGateway.query<String?, ExampleQuery>(query = exampleQuery)
130-
138+
131139
assertSame(queryResult, nullInstanceReturnValue)
132140
assertEquals(nullInstanceReturnValue.get(), null)
133141
verify(exactly = 1) { nullableQueryGateway.query(exampleQuery, matchExpectedResponseType(String::class.java)) }
@@ -185,7 +193,7 @@ internal class QueryGatewayExtensionsTest {
185193
fun `Query should handle nullable responses`() {
186194
val nullInstanceReturnValue: CompletableFuture<String?> = CompletableFuture.completedFuture(null)
187195
val nullableQueryGateway = mockk<QueryGateway> {
188-
every { query(queryName, exampleQuery, matchInstanceResponseType<String?>() ) } returns nullInstanceReturnValue
196+
every { query(queryName, exampleQuery, matchInstanceResponseType<String?>()) } returns nullInstanceReturnValue
189197
}
190198

191199
val queryResult = nullableQueryGateway.query<String?, ExampleQuery>(queryName = queryName, query = exampleQuery)
@@ -195,29 +203,29 @@ internal class QueryGatewayExtensionsTest {
195203
verify(exactly = 1) { nullableQueryGateway.query(queryName, exampleQuery, matchExpectedResponseType(String::class.java)) }
196204
}
197205

198-
@Test
199-
fun `ScatterGather for Single should invoke scatterGather method with correct generic parameters`() {
200-
val result = subjectGateway.scatterGather<String, ExampleQuery>(
201-
query = exampleQuery,
202-
timeout = timeout,
203-
timeUnit = timeUnit
204-
)
206+
@Test
207+
fun `ScatterGather for Single should invoke scatterGather method with correct generic parameters`() {
208+
val result = subjectGateway.scatterGather<String, ExampleQuery>(
209+
query = exampleQuery,
210+
timeout = timeout,
211+
timeUnit = timeUnit
212+
)
205213

206-
assertSame(result, streamInstanceReturnValue)
207-
verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchExpectedResponseType(String::class.java), timeout, timeUnit) }
208-
}
214+
assertSame(result, streamInstanceReturnValue)
215+
verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchExpectedResponseType(String::class.java), timeout, timeUnit) }
216+
}
209217

210-
@Test
211-
fun `ScatterGather for Multiple should invoke scatterGather method with correct generic parameters`() {
212-
val result = subjectGateway.scatterGatherMany<String, ExampleQuery>(
213-
query = exampleQuery,
214-
timeout = timeout,
215-
timeUnit = timeUnit
216-
)
218+
@Test
219+
fun `ScatterGather for Multiple should invoke scatterGather method with correct generic parameters`() {
220+
val result = subjectGateway.scatterGatherMany<String, ExampleQuery>(
221+
query = exampleQuery,
222+
timeout = timeout,
223+
timeUnit = timeUnit
224+
)
217225

218-
assertSame(result, streamMultipleReturnValue)
219-
verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchMultipleInstancesResponseType<String>(), timeout, timeUnit) }
220-
}
226+
assertSame(result, streamMultipleReturnValue)
227+
verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchMultipleInstancesResponseType<String>(), timeout, timeUnit) }
228+
}
221229

222230
@Test
223231
fun `ScatterGather for Optional should invoke scatterGather method with correct generic parameters`() {
@@ -270,4 +278,57 @@ internal class QueryGatewayExtensionsTest {
270278
verify(exactly = 1) { subjectGateway.scatterGather(queryName, exampleQuery, matchOptionalResponseType<String>(), timeout, timeUnit) }
271279
}
272280

281+
@Test
282+
fun `Query without queryName should invoke subscription query method with correct generic parameters`() {
283+
val queryResult = subjectGateway.subscriptionQuery<ExampleQuery, InitialResponseType, UpdateResponseType>(query = exampleQuery)
284+
assertSame(queryResult, subscriptionQueryResult)
285+
verify(exactly = 1) {
286+
subjectGateway.subscriptionQuery(
287+
exampleQuery,
288+
matchExpectedResponseType(InitialResponseType::class.java),
289+
matchExpectedResponseType(UpdateResponseType::class.java)
290+
)
291+
}
292+
}
293+
294+
@Test
295+
fun `Query without queryName should invoke subscriptionQuery method and not require explicit generic types`() {
296+
val queryResult: SubscriptionQueryResult<InitialResponseType, UpdateResponseType> = subjectGateway.subscriptionQuery(query = exampleQuery)
297+
assertSame(queryResult, subscriptionQueryResult)
298+
verify(exactly = 1) {
299+
subjectGateway.subscriptionQuery(
300+
exampleQuery,
301+
matchExpectedResponseType(InitialResponseType::class.java),
302+
matchExpectedResponseType(UpdateResponseType::class.java)
303+
)
304+
}
305+
}
306+
307+
@Test
308+
fun `Query should invoke subscriptionQuery method with correct generic parameters`() {
309+
val queryResult = subjectGateway.subscriptionQuery<ExampleQuery, InitialResponseType, UpdateResponseType>(queryName = queryName, query = exampleQuery)
310+
assertSame(queryResult, subscriptionQueryResult)
311+
verify(exactly = 1) {
312+
subjectGateway.subscriptionQuery(
313+
queryName,
314+
exampleQuery,
315+
matchExpectedResponseType(InitialResponseType::class.java),
316+
matchExpectedResponseType(UpdateResponseType::class.java)
317+
)
318+
}
319+
}
320+
321+
@Test
322+
fun `Query should invoke subscriptionQuery method and not require explicit generic types`() {
323+
val queryResult: SubscriptionQueryResult<InitialResponseType, UpdateResponseType> = subjectGateway.subscriptionQuery(queryName = queryName, query = exampleQuery)
324+
assertSame(queryResult, subscriptionQueryResult)
325+
verify(exactly = 1) {
326+
subjectGateway.subscriptionQuery(
327+
queryName,
328+
exampleQuery,
329+
matchExpectedResponseType(InitialResponseType::class.java),
330+
matchExpectedResponseType(UpdateResponseType::class.java)
331+
)
332+
}
333+
}
273334
}

kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/testObjects.kt

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package org.axonframework.extensions.kotlin
22

33
import org.axonframework.modelling.command.TargetAggregateIdentifier
4+
import org.axonframework.queryhandling.SubscriptionQueryResult
5+
import reactor.core.publisher.Flux
6+
import reactor.core.publisher.Mono
47

58
/**
69
* Simple Query class to be used in tests.
@@ -11,3 +14,30 @@ internal data class ExampleQuery(val value: Number)
1114
* Simple Command class to be used in tests.
1215
*/
1316
internal data class ExampleCommand(@TargetAggregateIdentifier val id: String)
17+
18+
/**
19+
* Class used as update response type in subscriptionQuery method.
20+
*/
21+
internal data class UpdateResponseType(val dummy: String)
22+
23+
/**
24+
* Class used as initial response type in subscriptionQuery method.
25+
*/
26+
internal data class InitialResponseType(val dummy: String)
27+
28+
/**
29+
* Dummy class used as return object from subscriptionQuery method in the mock.
30+
*/
31+
internal class ExampleSubscriptionQueryResult : SubscriptionQueryResult<InitialResponseType, UpdateResponseType> {
32+
override fun cancel(): Boolean {
33+
TODO("Not yet implemented")
34+
}
35+
36+
override fun initialResult(): Mono<InitialResponseType> {
37+
TODO("Not yet implemented")
38+
}
39+
40+
override fun updates(): Flux<UpdateResponseType> {
41+
TODO("Not yet implemented")
42+
}
43+
}

0 commit comments

Comments
 (0)