Skip to content

Commit 41d9d82

Browse files
authored
Merge pull request #76 from hsenasilva/feature/scatter-gather-queries-extensions
[#16] Adding Scatter Gather extension queries
2 parents 6f84e14 + 292b9db commit 41d9d82

File tree

2 files changed

+212
-8
lines changed

2 files changed

+212
-8
lines changed

kotlin/src/main/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensions.kt

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ import org.axonframework.messaging.responsetypes.ResponseTypes
2020
import org.axonframework.queryhandling.QueryGateway
2121
import java.util.*
2222
import java.util.concurrent.CompletableFuture
23+
import java.util.concurrent.TimeUnit
24+
import java.util.stream.Stream
25+
26+
/**
27+
* Query Gateway extensions.
28+
*
29+
* @author Henrique Sena
30+
*/
2331

2432
/**
2533
* Reified version of [QueryGateway.query]
@@ -113,3 +121,114 @@ inline fun <reified R, reified Q> QueryGateway.queryOptional(query: Q): Completa
113121
inline fun <reified R, reified Q> QueryGateway.queryOptional(queryName: String, query: Q): CompletableFuture<Optional<R>> {
114122
return this.query(queryName, query, ResponseTypes.optionalInstanceOf(R::class.java))
115123
}
124+
125+
/**
126+
* Reified version of [QueryGateway.scatterGather]
127+
* which expects an Stream object as a response using [org.axonframework.messaging.responsetypes.InstanceResponseType]
128+
* @param query Query to send
129+
* @param timeout a timeout for the query
130+
* @param timeUnit the selected TimeUnit for the given timeout
131+
* @param [Q] the type of payload of the query
132+
* @param [R] the response class contained in the given responseType
133+
* @return [Stream] a stream of results
134+
* @see QueryGateway.scatterGather
135+
* @see ResponseTypes
136+
* @since 0.2.0
137+
*/
138+
inline fun <reified R, reified Q> QueryGateway.scatterGather(query: Q, timeout: Long,
139+
timeUnit: TimeUnit): Stream<R> {
140+
return this.scatterGather(query, ResponseTypes.instanceOf(R::class.java), timeout, timeUnit)
141+
}
142+
143+
/**
144+
* Reified version of [QueryGateway.scatterGather] with explicit query name
145+
* which expects an Stream object as a response using [org.axonframework.messaging.responsetypes.InstanceResponseType]
146+
* @param query Query to send
147+
* @param queryName Name of the query
148+
* @param timeout a timeout for the query
149+
* @param timeUnit the selected TimeUnit for the given timeout
150+
* @param [Q] the type of payload of the query
151+
* @param [R] the response class contained in the given responseType
152+
* @return [Stream] a stream of results
153+
* @see QueryGateway.scatterGather
154+
* @see ResponseTypes
155+
* @since 0.2.0
156+
*/
157+
inline fun <reified R, reified Q> QueryGateway.scatterGather(queryName: String, query: Q, timeout: Long,
158+
timeUnit: TimeUnit): Stream<R> {
159+
return this.scatterGather(queryName, query, ResponseTypes.instanceOf(R::class.java), timeout, timeUnit)
160+
}
161+
162+
/**
163+
* Reified version of [QueryGateway.scatterGather]
164+
* which expects a collection as a response using [org.axonframework.messaging.responsetypes.MultipleInstancesResponseType]
165+
* @param query Query to send
166+
* @param timeout a timeout for the query
167+
* @param timeUnit the selected TimeUnit for the given timeout
168+
* @param [Q] the type of payload of the query
169+
* @param [R] the response class contained in the given responseType
170+
* @return [Stream] a stream of results
171+
* @see QueryGateway.scatterGather
172+
* @see ResponseTypes
173+
* @since 0.2.0
174+
*/
175+
inline fun <reified R, reified Q> QueryGateway.scatterGatherMany(query: Q, timeout: Long,
176+
timeUnit: TimeUnit): Stream<List<R>> {
177+
return this.scatterGather(query, ResponseTypes.multipleInstancesOf(R::class.java), timeout, timeUnit)
178+
}
179+
180+
/**
181+
* Reified version of [QueryGateway.scatterGather] with explicit query name
182+
* which expects a collection as a response using [org.axonframework.messaging.responsetypes.MultipleInstancesResponseType]
183+
* @param query Query to send
184+
* @param queryName Name of the query
185+
* @param timeout a timeout for the query
186+
* @param timeUnit the selected TimeUnit for the given timeout
187+
* @param [Q] the type of payload of the query
188+
* @param [R] the response class contained in the given responseType
189+
* @return [Stream] a stream of results
190+
* @see QueryGateway.scatterGather
191+
* @see ResponseTypes
192+
* @since 0.2.0
193+
*/
194+
inline fun <reified R, reified Q> QueryGateway.scatterGatherMany(queryName: String, query: Q, timeout: Long,
195+
timeUnit: TimeUnit): Stream<List<R>> {
196+
return this.scatterGather(queryName, query, ResponseTypes.multipleInstancesOf(R::class.java), timeout, timeUnit)
197+
}
198+
199+
/**
200+
* Reified version of [QueryGateway.scatterGather]
201+
* which expects a collection as a response using [org.axonframework.messaging.responsetypes.OptionalResponseType]
202+
* @param query Query to send
203+
* @param timeout a timeout for the query
204+
* @param timeUnit the selected TimeUnit for the given timeout
205+
* @param [Q] the type of payload of the query
206+
* @param [R] the response class contained in the given responseType
207+
* @return [Stream] a stream of results
208+
* @see QueryGateway.scatterGather
209+
* @see ResponseTypes
210+
* @since 0.2.0
211+
*/
212+
inline fun <reified R, reified Q> QueryGateway.scatterGatherOptional(query: Q, timeout: Long,
213+
timeUnit: TimeUnit): Stream<Optional<R>> {
214+
return this.scatterGather(query, ResponseTypes.optionalInstanceOf(R::class.java), timeout, timeUnit)
215+
}
216+
217+
/**
218+
* Reified version of [QueryGateway.scatterGather] with explicit query name
219+
* which expects a collection as a response using [org.axonframework.messaging.responsetypes.OptionalResponseType]
220+
* @param query Query to send
221+
* @param queryName Name of the query
222+
* @param timeout a timeout for the query
223+
* @param timeUnit the selected TimeUnit for the given timeout
224+
* @param [Q] the type of payload of the query
225+
* @param [R] the response class contained in the given responseType
226+
* @return [Stream] a stream of results
227+
* @see QueryGateway.scatterGather
228+
* @see ResponseTypes
229+
* @since 0.2.0
230+
*/
231+
inline fun <reified R, reified Q> QueryGateway.scatterGatherOptional(queryName: String, query: Q, timeout: Long,
232+
timeUnit: TimeUnit): Stream<Optional<R>> {
233+
return this.scatterGather(queryName, query, ResponseTypes.optionalInstanceOf(R::class.java), timeout, timeUnit)
234+
}

kotlin/src/test/kotlin/org/axonframework/extensions/kotlin/QueryGatewayExtensionsTest.kt

Lines changed: 93 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,15 @@ import org.axonframework.messaging.responsetypes.InstanceResponseType
2424
import org.axonframework.queryhandling.QueryGateway
2525
import java.util.*
2626
import java.util.concurrent.CompletableFuture
27-
import kotlin.test.AfterTest
28-
import kotlin.test.BeforeTest
29-
import kotlin.test.Test
30-
import kotlin.test.assertSame
31-
import kotlin.test.assertTrue
27+
import java.util.concurrent.TimeUnit
28+
import java.util.stream.Stream
29+
import kotlin.test.*
3230

3331
/**
3432
* Tests Query Gateway extensions.
3533
*
3634
* @author Stefan Andjelkovic
35+
* @author Henrique Sena
3736
*/
3837
internal class QueryGatewayExtensionsTest {
3938

@@ -43,6 +42,11 @@ internal class QueryGatewayExtensionsTest {
4342
private val optionalReturnValue: CompletableFuture<Optional<String>> = CompletableFuture.completedFuture(Optional.of("Value"))
4443
private val listReturnValue: CompletableFuture<List<String>> = CompletableFuture.completedFuture(listOf("Value", "Second value"))
4544
private val subjectGateway = mockk<QueryGateway>()
45+
private val timeout: Long = 1000
46+
private val timeUnit = TimeUnit.SECONDS
47+
private val streamInstanceReturnValue = Stream.of("Value")
48+
private val streamMultipleReturnValue = Stream.of(listOf("Value", "Second Value"))
49+
private val streamOptionalReturnValue = Stream.of(Optional.of("Value"))
4650

4751
@BeforeTest
4852
fun before() {
@@ -52,6 +56,12 @@ internal class QueryGatewayExtensionsTest {
5256
every { subjectGateway.query(queryName, exampleQuery, matchInstanceResponseType<String>()) } returns instanceReturnValue
5357
every { subjectGateway.query(queryName, exampleQuery, matchOptionalResponseType<String>()) } returns optionalReturnValue
5458
every { subjectGateway.query(queryName, exampleQuery, matchMultipleInstancesResponseType<String>()) } returns listReturnValue
59+
every { subjectGateway.scatterGather(exampleQuery, matchInstanceResponseType<String>(), timeout, timeUnit) } returns streamInstanceReturnValue
60+
every { subjectGateway.scatterGather(exampleQuery, matchMultipleInstancesResponseType<String>(), timeout, timeUnit) } returns streamMultipleReturnValue
61+
every { subjectGateway.scatterGather(exampleQuery, matchOptionalResponseType<String>(), timeout, timeUnit) } returns streamOptionalReturnValue
62+
every { subjectGateway.scatterGather(queryName, exampleQuery, matchInstanceResponseType<String>(), timeout, timeUnit) } returns streamInstanceReturnValue
63+
every { subjectGateway.scatterGather(queryName, exampleQuery, matchMultipleInstancesResponseType<String>(), timeout, timeUnit) } returns streamMultipleReturnValue
64+
every { subjectGateway.scatterGather(queryName, exampleQuery, matchOptionalResponseType<String>(), timeout, timeUnit) } returns streamOptionalReturnValue
5565
}
5666

5767
@AfterTest
@@ -117,9 +127,9 @@ internal class QueryGatewayExtensionsTest {
117127
}
118128

119129
val queryResult = nullableQueryGateway.query<String?, ExampleQuery>(query = exampleQuery)
120-
130+
121131
assertSame(queryResult, nullInstanceReturnValue)
122-
assertTrue(nullInstanceReturnValue.get() == null)
132+
assertEquals(nullInstanceReturnValue.get(), null)
123133
verify(exactly = 1) { nullableQueryGateway.query(exampleQuery, matchExpectedResponseType(String::class.java)) }
124134
}
125135

@@ -181,8 +191,83 @@ internal class QueryGatewayExtensionsTest {
181191
val queryResult = nullableQueryGateway.query<String?, ExampleQuery>(queryName = queryName, query = exampleQuery)
182192

183193
assertSame(queryResult, nullInstanceReturnValue)
184-
assertTrue(nullInstanceReturnValue.get() == null)
194+
assertEquals(nullInstanceReturnValue.get(), null)
185195
verify(exactly = 1) { nullableQueryGateway.query(queryName, exampleQuery, matchExpectedResponseType(String::class.java)) }
186196
}
187197

198+
@Test
199+
fun `ScatterGather for Single should invoke scatterGather method with correct generic parameters`() {
200+
val result = subjectGateway.scatterGather<String, ExampleQuery>(
201+
query = exampleQuery,
202+
timeout = timeout,
203+
timeUnit = timeUnit
204+
)
205+
206+
assertSame(result, streamInstanceReturnValue)
207+
verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchExpectedResponseType(String::class.java), timeout, timeUnit) }
208+
}
209+
210+
@Test
211+
fun `ScatterGather for Multiple should invoke scatterGather method with correct generic parameters`() {
212+
val result = subjectGateway.scatterGatherMany<String, ExampleQuery>(
213+
query = exampleQuery,
214+
timeout = timeout,
215+
timeUnit = timeUnit
216+
)
217+
218+
assertSame(result, streamMultipleReturnValue)
219+
verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchMultipleInstancesResponseType<String>(), timeout, timeUnit) }
220+
}
221+
222+
@Test
223+
fun `ScatterGather for Optional should invoke scatterGather method with correct generic parameters`() {
224+
val result = subjectGateway.scatterGatherOptional<String, ExampleQuery>(
225+
query = exampleQuery,
226+
timeout = timeout,
227+
timeUnit = timeUnit
228+
)
229+
230+
assertSame(result, streamOptionalReturnValue)
231+
verify(exactly = 1) { subjectGateway.scatterGather(exampleQuery, matchOptionalResponseType<String>(), timeout, timeUnit) }
232+
}
233+
234+
@Test
235+
fun `ScatterGather for Single should invoke scatterGather method with explicit query name`() {
236+
val result = subjectGateway.scatterGather<String, ExampleQuery>(
237+
queryName = queryName,
238+
query = exampleQuery,
239+
timeout = timeout,
240+
timeUnit = timeUnit
241+
)
242+
243+
assertSame(result, streamInstanceReturnValue)
244+
verify(exactly = 1) { subjectGateway.scatterGather(queryName, exampleQuery, matchExpectedResponseType(String::class.java), timeout, timeUnit) }
245+
}
246+
247+
@Test
248+
fun `ScatterGather for Multiple should invoke scatterGather method with explicit query name`() {
249+
val result = subjectGateway.scatterGatherMany<String, ExampleQuery>(
250+
queryName = queryName,
251+
query = exampleQuery,
252+
timeout = timeout,
253+
timeUnit = timeUnit
254+
)
255+
256+
assertSame(result, streamMultipleReturnValue)
257+
verify(exactly = 1) { subjectGateway.scatterGather(queryName, exampleQuery, matchMultipleInstancesResponseType<String>(), timeout, timeUnit) }
258+
}
259+
260+
@Test
261+
fun `ScatterGather for Optional should invoke scatterGather method with explicit query name`() {
262+
val result = subjectGateway.scatterGatherOptional<String, ExampleQuery>(
263+
queryName = queryName,
264+
query = exampleQuery,
265+
timeout = timeout,
266+
timeUnit = timeUnit
267+
)
268+
269+
assertSame(result, streamOptionalReturnValue)
270+
verify(exactly = 1) { subjectGateway.scatterGather(queryName, exampleQuery, matchOptionalResponseType<String>(), timeout, timeUnit) }
271+
}
272+
188273
}

0 commit comments

Comments
 (0)