Skip to content

Commit 00106de

Browse files
authored
Ordered pipeline behaviour implementation fixes #270 (#368)
1 parent a9efad9 commit 00106de

File tree

9 files changed

+222
-10
lines changed

9 files changed

+222
-10
lines changed

projects/kediatr-core/src/main/kotlin/com/trendyol/kediatr/MediatorImpl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class MediatorImpl(
4646
handler: RequestHandlerDelegate<TRequest, TResponse>
4747
): TResponse =
4848
pipelineBehaviors
49-
.reversed()
49+
.sortedByDescending { it.order }
5050
.fold(handler) { next, pipeline ->
5151
{ pipeline.handle(request) { next(it) } }
5252
}(request)

projects/kediatr-core/src/main/kotlin/com/trendyol/kediatr/PipelineBehavior.kt

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,37 @@ package com.trendyol.kediatr
33
/**
44
* Interface to be implemented for a non-blocking pipeline behavior
55
*
6-
* @since 1.0.12
76
*/
87
interface PipelineBehavior {
8+
companion object {
9+
/**
10+
* Useful constant for the highest precedence value.
11+
* @see java.lang.Integer.MIN_VALUE
12+
*/
13+
const val HIGHEST_PRECEDENCE = Int.MIN_VALUE
14+
15+
/**
16+
* Useful constant for the lowest precedence value.
17+
* @see java.lang.Integer.MAX_VALUE
18+
*/
19+
const val LOWEST_PRECEDENCE = Int.MAX_VALUE
20+
}
21+
22+
/**
23+
* Get the order value of this object.
24+
*
25+
* Higher values are interpreted as lower priority. As a consequence,
26+
* the object with the lowest value has the highest priority.
27+
*
28+
* Same order values will result in arbitrary sort positions for the
29+
* affected objects.
30+
* @return the order value
31+
* @see .HIGHEST_PRECEDENCE
32+
*
33+
* @see .LOWEST_PRECEDENCE
34+
*/
35+
val order: Int get() = HIGHEST_PRECEDENCE
36+
937
/**
1038
* Process to invoke before handling any query, command or notification
1139
*

projects/kediatr-core/src/test/kotlin/com/trendyol/kediatr/MediatorTests.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,13 @@ class MediatorTests : MediatorUseCases() {
3131
ExceptionPipelineBehavior(),
3232
LoggingPipelineBehavior(),
3333
InheritedPipelineBehaviour(),
34-
ParameterizedQueryHandler<Long, String>()
34+
ParameterizedQueryHandler<Long, String>(),
35+
FirstPipelineBehaviour(),
36+
SecondPipelineBehaviour(),
37+
ThirdPipelineBehaviour(),
38+
CommandHandlerThatPassesThroughOrderedPipelineBehaviours(),
39+
QueryHandlerThatPassesThroughOrderedPipelineBehaviours(),
40+
NotificationHandlerThatPassesThroughOrderedPipelineBehaviours()
3541
)
3642
)
3743

projects/kediatr-core/src/testFixtures/kotlin/com/trendyol/kediatr/testing/MediatorUseCases.kt

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,4 +208,37 @@ abstract class MediatorUseCases : MediatorTestConvention() {
208208
result shouldBe "60"
209209
query.invocationCount() shouldBe 1
210210
}
211+
212+
@Test
213+
fun ordered_pipeline_behaviours_should_be_executed_in_order_for_command() = runTest {
214+
val command = CommandThatPassesThroughOrderedPipelineBehaviours()
215+
testMediator.send(command)
216+
command.visitedPipelines() shouldBe listOf(
217+
FirstPipelineBehaviour::class.simpleName,
218+
SecondPipelineBehaviour::class.simpleName,
219+
ThirdPipelineBehaviour::class.simpleName
220+
)
221+
}
222+
223+
@Test
224+
fun ordered_pipeline_behaviours_should_be_executed_in_order_for_query() = runTest {
225+
val query = QueryThatPassesThroughOrderedPipelineBehaviours()
226+
testMediator.send(query)
227+
query.visitedPipelines() shouldBe listOf(
228+
FirstPipelineBehaviour::class.simpleName,
229+
SecondPipelineBehaviour::class.simpleName,
230+
ThirdPipelineBehaviour::class.simpleName
231+
)
232+
}
233+
234+
@Test
235+
fun ordered_pipeline_behaviours_should_be_executed_in_order_for_notification() = runTest {
236+
val notification = NotificationThatPassesThroughOrderedPipelineBehaviours()
237+
testMediator.publish(notification)
238+
notification.visitedPipelines() shouldBe listOf(
239+
FirstPipelineBehaviour::class.simpleName,
240+
SecondPipelineBehaviour::class.simpleName,
241+
ThirdPipelineBehaviour::class.simpleName
242+
)
243+
}
211244
}

projects/kediatr-core/src/testFixtures/kotlin/com/trendyol/kediatr/testing/models.kt

Lines changed: 108 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,12 @@ class TestCommandWithResultCommandHandler(val mediator: MediatorAccessor) : Comm
152152
}
153153
}
154154

155-
class CommandThatPassesThroughPipelineBehaviours : Command, EnrichedWithMetadata()
155+
class CommandThatPassesThroughPipelineBehaviours :
156+
Command,
157+
EnrichedWithMetadata(),
158+
CanPassLoggingPipelineBehaviour,
159+
CanPassExceptionPipelineBehaviour,
160+
CanPassInheritedPipelineBehaviour
156161

157162
class TestPipelineCommandHandler(
158163
private val mediator: MediatorAccessor
@@ -163,7 +168,12 @@ class TestPipelineCommandHandler(
163168
}
164169
}
165170

166-
class CommandForWithoutInjectionThatPassesThroughPipelineBehaviours : Command, EnrichedWithMetadata()
171+
class CommandForWithoutInjectionThatPassesThroughPipelineBehaviours :
172+
Command,
173+
EnrichedWithMetadata(),
174+
CanPassLoggingPipelineBehaviour,
175+
CanPassExceptionPipelineBehaviour,
176+
CanPassInheritedPipelineBehaviour
167177

168178
class TestPipelineCommandHandlerWithoutInjection : CommandHandler<CommandForWithoutInjectionThatPassesThroughPipelineBehaviours> {
169179
override suspend fun handle(command: CommandForWithoutInjectionThatPassesThroughPipelineBehaviours) {
@@ -293,42 +303,135 @@ class ParameterizedQueryHandler<TParam, TResponse> : QueryHandler<ParameterizedQ
293303
/**
294304
* Pipeline Behaviors
295305
*/
306+
interface CanPassExceptionPipelineBehaviour
296307

297308
class ExceptionPipelineBehavior : PipelineBehavior {
298309
override suspend fun <TRequest, TResponse> handle(
299310
request: TRequest,
300311
next: RequestHandlerDelegate<TRequest, TResponse>
301312
): TResponse = try {
302313
when (request) {
303-
is EnrichedWithMetadata -> request.visitedPipeline(this::class.java.simpleName)
314+
is CanPassExceptionPipelineBehaviour -> {
315+
request as EnrichedWithMetadata
316+
request.visitedPipeline(this::class.java.simpleName)
317+
}
304318
}
305319
next(request)
306320
} catch (ex: Exception) {
307321
throw ex
308322
}
309323
}
310324

325+
interface CanPassLoggingPipelineBehaviour
326+
311327
class LoggingPipelineBehavior : PipelineBehavior {
312328
override suspend fun <TRequest, TResponse> handle(
313329
request: TRequest,
314330
next: RequestHandlerDelegate<TRequest, TResponse>
315331
): TResponse {
316332
when (request) {
317-
is EnrichedWithMetadata -> request.visitedPipeline(this::class.java.simpleName)
333+
is CanPassLoggingPipelineBehaviour -> {
334+
request as EnrichedWithMetadata
335+
request.visitedPipeline(this::class.java.simpleName)
336+
}
318337
}
319338
return next(request)
320339
}
321340
}
322341

323342
abstract class MyBasePipelineBehaviour : PipelineBehavior
324343

344+
interface CanPassInheritedPipelineBehaviour
345+
325346
class InheritedPipelineBehaviour : MyBasePipelineBehaviour() {
326347
override suspend fun <TRequest, TResponse> handle(
327348
request: TRequest,
328349
next: RequestHandlerDelegate<TRequest, TResponse>
329350
): TResponse {
330351
when (request) {
331-
is EnrichedWithMetadata -> request.visitedPipeline(this::class.java.simpleName)
352+
is CanPassInheritedPipelineBehaviour -> {
353+
request as EnrichedWithMetadata
354+
request.visitedPipeline(this::class.java.simpleName)
355+
}
356+
}
357+
return next(request)
358+
}
359+
}
360+
361+
interface OrderedPipelineUseCase
362+
363+
class CommandThatPassesThroughOrderedPipelineBehaviours : Command, EnrichedWithMetadata(), OrderedPipelineUseCase
364+
365+
class QueryThatPassesThroughOrderedPipelineBehaviours : Query<String>, EnrichedWithMetadata(), OrderedPipelineUseCase
366+
367+
class NotificationThatPassesThroughOrderedPipelineBehaviours : Notification, EnrichedWithMetadata(), OrderedPipelineUseCase
368+
369+
class CommandHandlerThatPassesThroughOrderedPipelineBehaviours : CommandHandler<CommandThatPassesThroughOrderedPipelineBehaviours> {
370+
override suspend fun handle(command: CommandThatPassesThroughOrderedPipelineBehaviours) {
371+
command.incrementInvocationCount()
372+
}
373+
}
374+
375+
class QueryHandlerThatPassesThroughOrderedPipelineBehaviours : QueryHandler<QueryThatPassesThroughOrderedPipelineBehaviours, String> {
376+
override suspend fun handle(query: QueryThatPassesThroughOrderedPipelineBehaviours): String {
377+
query.incrementInvocationCount()
378+
return "hello"
379+
}
380+
}
381+
382+
class NotificationHandlerThatPassesThroughOrderedPipelineBehaviours :
383+
NotificationHandler<NotificationThatPassesThroughOrderedPipelineBehaviours> {
384+
override suspend fun handle(notification: NotificationThatPassesThroughOrderedPipelineBehaviours) {
385+
notification.incrementInvocationCount()
386+
}
387+
}
388+
389+
class FirstPipelineBehaviour : PipelineBehavior {
390+
override val order: Int = 1
391+
392+
override suspend fun <TRequest, TResponse> handle(
393+
request: TRequest,
394+
next: RequestHandlerDelegate<TRequest, TResponse>
395+
): TResponse {
396+
when (request) {
397+
is OrderedPipelineUseCase -> {
398+
request as EnrichedWithMetadata
399+
request.visitedPipeline(this::class.java.simpleName)
400+
}
401+
}
402+
return next(request)
403+
}
404+
}
405+
406+
class SecondPipelineBehaviour : PipelineBehavior {
407+
override val order: Int = 2
408+
409+
override suspend fun <TRequest, TResponse> handle(
410+
request: TRequest,
411+
next: RequestHandlerDelegate<TRequest, TResponse>
412+
): TResponse {
413+
when (request) {
414+
is OrderedPipelineUseCase -> {
415+
request as EnrichedWithMetadata
416+
request.visitedPipeline(this::class.java.simpleName)
417+
}
418+
}
419+
return next(request)
420+
}
421+
}
422+
423+
class ThirdPipelineBehaviour : PipelineBehavior {
424+
override val order: Int = 3
425+
426+
override suspend fun <TRequest, TResponse> handle(
427+
request: TRequest,
428+
next: RequestHandlerDelegate<TRequest, TResponse>
429+
): TResponse {
430+
when (request) {
431+
is OrderedPipelineUseCase -> {
432+
request as EnrichedWithMetadata
433+
request.visitedPipeline(this::class.java.simpleName)
434+
}
332435
}
333436
return next(request)
334437
}

projects/kediatr-koin-starter/src/test/kotlin/com/trendyol/kediatr/koin/MediatorTests.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,16 @@ class MediatorTests : KoinTest, MediatorUseCases() {
1414
modules(
1515
module {
1616
single { KediatRKoin.getMediator() }
17+
18+
// Pipeline behaviours
1719
single { InheritedPipelineBehaviour() }
1820
single { ExceptionPipelineBehavior() }
1921
single { LoggingPipelineBehavior() }
22+
single { FirstPipelineBehaviour() }
23+
single { SecondPipelineBehaviour() }
24+
single { ThirdPipelineBehaviour() }
25+
26+
// Handlers
2027
single { TestCommandHandler(get()) }
2128
single { TestCommandWithResultCommandHandler(get()) } bind CommandWithResultHandler::class
2229
single { TestQueryHandler(get()) } bind QueryHandler::class
@@ -40,6 +47,11 @@ class MediatorTests : KoinTest, MediatorUseCases() {
4047
single { TestPipelineCommandHandlerWithoutInjection() } bind CommandHandler::class
4148
single { TestPipelineCommandHandlerThatFails() } bind CommandHandler::class
4249
single { ParameterizedQueryHandler<Long, String>() } bind QueryHandler::class
50+
single { CommandHandlerThatPassesThroughOrderedPipelineBehaviours() } bind CommandHandler::class
51+
single { QueryHandlerThatPassesThroughOrderedPipelineBehaviours() } bind QueryHandler::class
52+
single { NotificationHandlerThatPassesThroughOrderedPipelineBehaviours() } bind NotificationHandler::class
53+
54+
// Extra
4355
single<MediatorAccessor> { { get<Mediator>() } }
4456
}
4557
)

projects/kediatr-quarkus-starter/src/test/kotlin/com/trendyol/kediatr/quarkus/MediatorTests.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,22 @@ class MediatorTests : MediatorUseCases() {
9090

9191
@Produces
9292
fun <T, R> handler22() = ParameterizedQueryHandler<T, R>()
93+
94+
@Produces
95+
fun pipeline4() = FirstPipelineBehaviour()
96+
97+
@Produces
98+
fun pipeline5() = SecondPipelineBehaviour()
99+
100+
@Produces
101+
fun pipeline6() = ThirdPipelineBehaviour()
102+
103+
@Produces
104+
fun handler23() = CommandHandlerThatPassesThroughOrderedPipelineBehaviours()
105+
106+
@Produces
107+
fun handler24() = QueryHandlerThatPassesThroughOrderedPipelineBehaviours()
108+
109+
@Produces
110+
fun handler25() = NotificationHandlerThatPassesThroughOrderedPipelineBehaviours()
93111
}

projects/kediatr-spring-boot-2x-starter/src/test/kotlin/com/trendyol/kediatr/spring/MediatorTests.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,13 @@ import org.springframework.context.annotation.*
3535
TestPipelineCommandHandlerWithoutInjection::class,
3636
TestPipelineCommandHandlerThatFails::class,
3737
InheritedPipelineBehaviour::class,
38-
ParameterizedQueryHandler::class
38+
ParameterizedQueryHandler::class,
39+
FirstPipelineBehaviour::class,
40+
SecondPipelineBehaviour::class,
41+
ThirdPipelineBehaviour::class,
42+
CommandHandlerThatPassesThroughOrderedPipelineBehaviours::class,
43+
QueryHandlerThatPassesThroughOrderedPipelineBehaviours::class,
44+
NotificationHandlerThatPassesThroughOrderedPipelineBehaviours::class
3945
]
4046
)
4147
class MediatorTests : MediatorUseCases() {

projects/kediatr-spring-boot-3x-starter/src/test/kotlin/com/trendyol/kediatr/spring/MediatorTests.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,13 @@ import org.springframework.context.annotation.*
3535
TestPipelineCommandHandlerWithoutInjection::class,
3636
TestPipelineCommandHandlerThatFails::class,
3737
InheritedPipelineBehaviour::class,
38-
ParameterizedQueryHandler::class
38+
ParameterizedQueryHandler::class,
39+
FirstPipelineBehaviour::class,
40+
SecondPipelineBehaviour::class,
41+
ThirdPipelineBehaviour::class,
42+
CommandHandlerThatPassesThroughOrderedPipelineBehaviours::class,
43+
QueryHandlerThatPassesThroughOrderedPipelineBehaviours::class,
44+
NotificationHandlerThatPassesThroughOrderedPipelineBehaviours::class
3945
]
4046
)
4147
class MediatorTests : MediatorUseCases() {

0 commit comments

Comments
 (0)