Skip to content

Commit 2597a05

Browse files
committed
adding ScatterGather extensions queries
1 parent 14812f9 commit 2597a05

File tree

2 files changed

+217
-8
lines changed

2 files changed

+217
-8
lines changed

kotlin/src/main/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensions.kt

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ import org.axonframework.messaging.responsetypes.ResponseTypes
1919
import org.axonframework.queryhandling.QueryGateway
2020
import java.util.*
2121
import java.util.concurrent.CompletableFuture
22+
import java.util.stream.Stream
23+
import kotlin.time.DurationUnit
24+
import kotlin.time.ExperimentalTime
2225

2326
/**
2427
* Reified version of [QueryGateway.query]
@@ -106,3 +109,114 @@ inline fun <reified R, reified Q> QueryGateway.queryForOptional(query: Q): Compl
106109
inline fun <reified R, reified Q> QueryGateway.queryForOptional(queryName: String, query: Q): CompletableFuture<Optional<R>> {
107110
return this.query(queryName, query, ResponseTypes.optionalInstanceOf(R::class.java))
108111
}
112+
113+
/**
114+
* Reified version of [QueryGateway.scatterGather]
115+
* which expects an Stream object as a response using [org.axonframework.messaging.responsetypes.InstanceResponseType]
116+
* @param query Query to send
117+
* @param timeout a timeout for the query
118+
* @param durationUnit the selected DurationUnit for the given timeout
119+
* @param [Q] the type of payload of the query
120+
* @param [R] the response class contained in the given responseType
121+
* @return [Stream] a stream of results
122+
* @see QueryGateway.scatterGather
123+
* @see ResponseTypes
124+
*/
125+
@ExperimentalTime
126+
inline fun <reified R, reified Q> QueryGateway.scatterGatherForSingle(query: Q, timeout: Long,
127+
durationUnit: DurationUnit): Stream<R> {
128+
return this.scatterGather(query, ResponseTypes.instanceOf(R::class.java), timeout, durationUnit)
129+
}
130+
131+
/**
132+
* Reified version of [QueryGateway.scatterGather] with explicit query name
133+
* which expects an Stream object as a response using [org.axonframework.messaging.responsetypes.InstanceResponseType]
134+
* @param query Query to send
135+
* @param queryName Name of the query
136+
* @param timeout a timeout for the query
137+
* @param durationUnit the selected DurationUnit for the given timeout
138+
* @param [Q] the type of payload of the query
139+
* @param [R] the response class contained in the given responseType
140+
* @return [Stream] a stream of results
141+
* @see QueryGateway.scatterGather
142+
* @see ResponseTypes
143+
*/
144+
@ExperimentalTime
145+
inline fun <reified R, reified Q> QueryGateway.scatterGatherForSingle(queryName: String, query: Q, timeout: Long,
146+
durationUnit: DurationUnit): Stream<R> {
147+
return this.scatterGather(queryName, query, ResponseTypes.instanceOf(R::class.java), timeout, durationUnit)
148+
}
149+
150+
/**
151+
* Reified version of [QueryGateway.scatterGather]
152+
* which expects a collection as a response using [org.axonframework.messaging.responsetypes.MultipleInstancesResponseType]
153+
* @param query Query to send
154+
* @param timeout a timeout for the query
155+
* @param durationUnit the selected DurationUnit for the given timeout
156+
* @param [Q] the type of payload of the query
157+
* @param [R] the response class contained in the given responseType
158+
* @return [Stream] a stream of results
159+
* @see QueryGateway.scatterGather
160+
* @see ResponseTypes
161+
*/
162+
@ExperimentalTime
163+
inline fun <reified R, reified Q> QueryGateway.scatterGatherForMultiple(query: Q, timeout: Long,
164+
durationUnit: DurationUnit): Stream<List<R>> {
165+
return this.scatterGather(query, ResponseTypes.multipleInstancesOf(R::class.java), timeout, durationUnit)
166+
}
167+
168+
/**
169+
* Reified version of [QueryGateway.scatterGather] with explicit query name
170+
* which expects a collection as a response using [org.axonframework.messaging.responsetypes.MultipleInstancesResponseType]
171+
* @param query Query to send
172+
* @param queryName Name of the query
173+
* @param timeout a timeout for the query
174+
* @param durationUnit the selected DurationUnit for the given timeout
175+
* @param [Q] the type of payload of the query
176+
* @param [R] the response class contained in the given responseType
177+
* @return [Stream] a stream of results
178+
* @see QueryGateway.scatterGather
179+
* @see ResponseTypes
180+
*/
181+
@ExperimentalTime
182+
inline fun <reified R, reified Q> QueryGateway.scatterGatherForMultiple(queryName: String, query: Q, timeout: Long,
183+
durationUnit: DurationUnit): Stream<List<R>> {
184+
return this.scatterGather(queryName, query, ResponseTypes.multipleInstancesOf(R::class.java), timeout, durationUnit)
185+
}
186+
187+
/**
188+
* Reified version of [QueryGateway.scatterGather]
189+
* which expects a collection as a response using [org.axonframework.messaging.responsetypes.OptionalResponseType]
190+
* @param query Query to send
191+
* @param timeout a timeout for the query
192+
* @param durationUnit the selected DurationUnit for the given timeout
193+
* @param [Q] the type of payload of the query
194+
* @param [R] the response class contained in the given responseType
195+
* @return [Stream] a stream of results
196+
* @see QueryGateway.scatterGather
197+
* @see ResponseTypes
198+
*/
199+
@ExperimentalTime
200+
inline fun <reified R, reified Q> QueryGateway.scatterGatherForOptional(query: Q, timeout: Long,
201+
durationUnit: DurationUnit): Stream<Optional<R>> {
202+
return this.scatterGather(query, ResponseTypes.optionalInstanceOf(R::class.java), timeout, durationUnit)
203+
}
204+
205+
/**
206+
* Reified version of [QueryGateway.scatterGather] with explicit query name
207+
* which expects a collection as a response using [org.axonframework.messaging.responsetypes.OptionalResponseType]
208+
* @param query Query to send
209+
* @param queryName Name of the query
210+
* @param timeout a timeout for the query
211+
* @param durationUnit the selected DurationUnit for the given timeout
212+
* @param [Q] the type of payload of the query
213+
* @param [R] the response class contained in the given responseType
214+
* @return [Stream] a stream of results
215+
* @see QueryGateway.scatterGather
216+
* @see ResponseTypes
217+
*/
218+
@ExperimentalTime
219+
inline fun <reified R, reified Q> QueryGateway.scatterGatherForOptional(queryName: String, query: Q, timeout: Long,
220+
durationUnit: DurationUnit): Stream<Optional<R>> {
221+
return this.scatterGather(queryName, query, ResponseTypes.optionalInstanceOf(R::class.java), timeout, durationUnit)
222+
}

kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensionsTest.kt

Lines changed: 103 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,17 @@ import org.axonframework.messaging.responsetypes.InstanceResponseType
2424
import org.axonframework.queryhandling.QueryGateway
2525
import java.util.*
2626
import java.util.concurrent.CompletableFuture
27-
import kotlin.test.AfterTest
28-
import kotlin.test.BeforeTest
29-
import kotlin.test.Test
30-
import kotlin.test.assertSame
31-
import kotlin.test.assertTrue
27+
import java.util.concurrent.TimeUnit
28+
import java.util.stream.Stream
29+
import kotlin.test.*
30+
import kotlin.time.DurationUnit
31+
import kotlin.time.ExperimentalTime
3232

3333
/**
3434
* Tests Query Gateway extensions.
3535
*
3636
* @author Stefan Andjelkovic
37+
* @author Henrique Sena
3738
*/
3839
class QueryGatewayExtensionsTest {
3940

@@ -43,7 +44,14 @@ class QueryGatewayExtensionsTest {
4344
private val optionalReturnValue: CompletableFuture<Optional<String>> = CompletableFuture.completedFuture(Optional.of("Value"))
4445
private val listReturnValue: CompletableFuture<List<String>> = CompletableFuture.completedFuture(listOf("Value", "Second value"))
4546
private val subjectGateway = mockk<QueryGateway>()
46-
47+
private val timeout: Long = 1000
48+
@ExperimentalTime
49+
private val durationUnit = DurationUnit.valueOf(TimeUnit.SECONDS.name)
50+
private val streamInstanceReturnValue = Stream.of("Value")
51+
private val streamMultipleReturnValue = Stream.of(listOf("Value", "Second Value"))
52+
private val streamOptionalReturnValue = Stream.of(Optional.of("Value"))
53+
54+
@ExperimentalTime
4755
@BeforeTest
4856
fun before() {
4957
every { subjectGateway.query(exampleQuery, matchInstanceResponseType<String>()) } returns instanceReturnValue
@@ -52,6 +60,12 @@ class QueryGatewayExtensionsTest {
5260
every { subjectGateway.query(queryName, exampleQuery, matchInstanceResponseType<String>()) } returns instanceReturnValue
5361
every { subjectGateway.query(queryName, exampleQuery, matchOptionalResponseType<String>()) } returns optionalReturnValue
5462
every { subjectGateway.query(queryName, exampleQuery, matchMultipleInstancesResponseType<String>()) } returns listReturnValue
63+
every { subjectGateway.scatterGather(exampleQuery, matchInstanceResponseType<String>(), timeout, durationUnit) } returns streamInstanceReturnValue
64+
every { subjectGateway.scatterGather(exampleQuery, matchMultipleInstancesResponseType<String>(), timeout, durationUnit) } returns streamMultipleReturnValue
65+
every { subjectGateway.scatterGather(exampleQuery, matchOptionalResponseType<String>(), timeout, durationUnit) } returns streamOptionalReturnValue
66+
every { subjectGateway.scatterGather(queryName, exampleQuery, matchInstanceResponseType<String>(), timeout, durationUnit) } returns streamInstanceReturnValue
67+
every { subjectGateway.scatterGather(queryName, exampleQuery, matchMultipleInstancesResponseType<String>(), timeout, durationUnit) } returns streamMultipleReturnValue
68+
every { subjectGateway.scatterGather(queryName, exampleQuery, matchOptionalResponseType<String>(), timeout, durationUnit) } returns streamOptionalReturnValue
5569
}
5670

5771
@AfterTest
@@ -119,7 +133,7 @@ class QueryGatewayExtensionsTest {
119133
val queryResult = nullableQueryGateway.queryForSingle<String?, ExampleQuery>(query = exampleQuery)
120134

121135
assertSame(queryResult, nullInstanceReturnValue)
122-
assertTrue(nullInstanceReturnValue.get() == null)
136+
assertEquals(nullInstanceReturnValue.get(), null)
123137
verify(exactly = 1) { nullableQueryGateway.query(exampleQuery, matchExpectedResponseType(String::class.java)) }
124138
}
125139

@@ -182,8 +196,89 @@ class QueryGatewayExtensionsTest {
182196
val queryResult = nullableQueryGateway.queryForSingle<String?, ExampleQuery>(queryName = queryName, query = exampleQuery)
183197

184198
assertSame(queryResult, nullInstanceReturnValue)
185-
assertTrue(nullInstanceReturnValue.get() == null)
199+
assertEquals(nullInstanceReturnValue.get(), null)
186200
verify(exactly = 1) { nullableQueryGateway.query(queryName, exampleQuery, matchExpectedResponseType(String::class.java)) }
187201
}
188202

203+
@ExperimentalTime
204+
@Test
205+
fun `ScatterGather for Single should invoke scatterGather method with correct generic parameters`() {
206+
val result = subjectGateway.scatterGatherForSingle<String, ExampleQuery>(
207+
query = exampleQuery,
208+
timeout = timeout,
209+
durationUnit = durationUnit
210+
)
211+
212+
assertSame(result, streamInstanceReturnValue)
213+
verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchExpectedResponseType(String::class.java), timeout, durationUnit) }
214+
}
215+
216+
@ExperimentalTime
217+
@Test
218+
fun `ScatterGather for Multiple should invoke scatterGather method with correct generic parameters`() {
219+
val result = subjectGateway.scatterGatherForMultiple<String, ExampleQuery>(
220+
query = exampleQuery,
221+
timeout = timeout,
222+
durationUnit = durationUnit
223+
)
224+
225+
assertSame(result, streamMultipleReturnValue)
226+
verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchMultipleInstancesResponseType<String>(), timeout, durationUnit) }
227+
}
228+
229+
@ExperimentalTime
230+
@Test
231+
fun `ScatterGather for Optional should invoke scatterGather method with correct generic parameters`() {
232+
val result = subjectGateway.scatterGatherForOptional<String, ExampleQuery>(
233+
query = exampleQuery,
234+
timeout = timeout,
235+
durationUnit = durationUnit
236+
)
237+
238+
assertSame(result, streamOptionalReturnValue)
239+
verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchOptionalResponseType<String>(), timeout, durationUnit) }
240+
}
241+
242+
@ExperimentalTime
243+
@Test
244+
fun `ScatterGather for Single should invoke scatterGather method with explicit query name`() {
245+
val result = subjectGateway.scatterGatherForSingle<String, ExampleQuery>(
246+
queryName = queryName,
247+
query = exampleQuery,
248+
timeout = timeout,
249+
durationUnit = durationUnit
250+
)
251+
252+
assertSame(result, streamInstanceReturnValue)
253+
verify(exactly = 1) { subjectGateway.scatterGather(queryName, exampleQuery, matchExpectedResponseType(String::class.java), timeout, durationUnit) }
254+
}
255+
256+
@ExperimentalTime
257+
@Test
258+
fun `ScatterGather for Multiple should invoke scatterGather method with explicit query name`() {
259+
val result = subjectGateway.scatterGatherForMultiple<String, ExampleQuery>(
260+
queryName = queryName,
261+
query = exampleQuery,
262+
timeout = timeout,
263+
durationUnit = durationUnit
264+
)
265+
266+
assertSame(result, streamMultipleReturnValue)
267+
verify(exactly = 1) { subjectGateway.scatterGather(queryName, exampleQuery, matchMultipleInstancesResponseType<String>(), timeout, durationUnit) }
268+
}
269+
270+
@ExperimentalTime
271+
@Test
272+
fun `ScatterGather for Optional should invoke scatterGather method with explicit query name`() {
273+
val result = subjectGateway.scatterGatherForOptional<String, ExampleQuery>(
274+
queryName = queryName,
275+
query = exampleQuery,
276+
timeout = timeout,
277+
durationUnit = durationUnit
278+
)
279+
280+
assertSame(result, streamOptionalReturnValue)
281+
verify(exactly = 1) { subjectGateway.scatterGather(queryName, exampleQuery, matchOptionalResponseType<String>(), timeout, durationUnit) }
282+
}
283+
189284
}

0 commit comments

Comments
 (0)