@@ -26,17 +26,18 @@ class KinesisSourceGraphIntegrationSpec
26
26
" process all messages of a stream with one worker" in new withKinesisConfForApp(" 1worker" ) {
27
27
val result = Kinesis
28
28
.source(consumerConf = consumerConf())
29
- .take(TestStreamNumberOfShards * TestStreamNrOfMessagesPerShard )
29
+ .takeWhile(_.payload.payloadAsString().toLong < TestStreamNrOfMessagesPerShard ,
30
+ inclusive = true )
30
31
.map { event =>
31
32
event.commit()
32
- event.payload.payload
33
+ event.payload.payloadAsString()
33
34
}
34
35
.runWith(Sink .seq)
35
36
36
37
val grouped = result.futureValue.groupBy(identity)
37
38
result.futureValue.distinct should have size TestStreamNrOfMessagesPerShard
38
39
grouped should have size TestStreamNrOfMessagesPerShard
39
- grouped.values.foreach(_ should have size TestStreamNumberOfShards )
40
+ grouped.values.foreach(_. size.toLong shouldBe >= ( TestStreamNumberOfShards ) )
40
41
}
41
42
42
43
" process all messages of a stream with 2 workers" in new withKinesisConfForApp(" 2worker" ) {
@@ -48,44 +49,18 @@ class KinesisSourceGraphIntegrationSpec
48
49
val source2 = Kinesis .source(consumerConf = consumerConf())
49
50
val result = source1
50
51
.merge(source2)
51
- .take(TestStreamNrOfMessagesPerShard * TestStreamNumberOfShards )
52
+ .takeWhile(_.payload.payloadAsString().toLong < TestStreamNrOfMessagesPerShard ,
53
+ inclusive = true )
52
54
.map { event =>
53
55
event.commit()
54
- event.payload.payload
56
+ event.payload.payloadAsString()
55
57
}
56
58
.runWith(Sink .seq)
57
59
58
60
val grouped = result.futureValue.groupBy(identity)
59
61
result.futureValue.distinct should have size TestStreamNrOfMessagesPerShard
60
62
grouped should have size TestStreamNrOfMessagesPerShard
61
- grouped.values.foreach(_ should have size TestStreamNumberOfShards )
62
- }
63
-
64
- " process all messages of a stream with 4 workers" in new withKinesisConfForApp(" 4worker" ) {
65
- // Please note: since all sources are started simultaneously, all will assume there is no other worker.
66
- // During register all except one will fail and not read any message until retry
67
- // Depending on timing one or multiple sources will read all events
68
- val batchSize = TestStreamNrOfMessagesPerShard
69
- val source1 = Kinesis .source(consumerConf = consumerConf())
70
- val source2 = Kinesis .source(consumerConf = consumerConf())
71
- val source3 = Kinesis .source(consumerConf = consumerConf())
72
- val source4 = Kinesis .source(consumerConf = consumerConf())
73
- val result = source1
74
- .merge(source2)
75
- .merge(source3)
76
- .merge(source4)
77
- // Since only 2 clients can take batchSize messages, an overall take is needed here to end the stream
78
- .take(TestStreamNrOfMessagesPerShard * TestStreamNumberOfShards )
79
- .map { event =>
80
- event.commit()
81
- event.payload.payload
82
- }
83
- .runWith(Sink .seq)
84
-
85
- val grouped = result.futureValue.groupBy(identity)
86
- result.futureValue.distinct should have size TestStreamNrOfMessagesPerShard
87
- grouped should have size TestStreamNrOfMessagesPerShard
88
- grouped.values.foreach(_ should have size TestStreamNumberOfShards )
63
+ grouped.values.foreach(_.size.toLong shouldBe >= (TestStreamNumberOfShards ))
89
64
}
90
65
91
66
" maintain the read position in the stream correctly" in new withKinesisConfForApp(
@@ -98,15 +73,16 @@ class KinesisSourceGraphIntegrationSpec
98
73
// - dies after one batch
99
74
// We expect to get all messages by n reads (which means, that the read position was stored correctly)
100
75
val result =
101
- for (_ <- 1
76
+ for (iteration <- 1
102
77
.to((TestStreamNumberOfShards * TestStreamNrOfMessagesPerShard / batchSize).toInt))
103
78
yield {
104
79
Kinesis
105
- .source(consumerConf = consumerConf())
106
- .take(batchSize)
80
+ .source(consumerConf = consumerConf(batchSize = batchSize))
81
+ .takeWhile(_.payload.payloadAsString().toLong < batchSize * iteration,
82
+ inclusive = true )
107
83
.map { event =>
108
84
event.commit()
109
- event.payload.payload
85
+ event.payload
110
86
}
111
87
.runWith(Sink .seq)
112
88
.futureValue
@@ -117,38 +93,39 @@ class KinesisSourceGraphIntegrationSpec
117
93
val grouped = allMessages.groupBy(identity)
118
94
allMessages.distinct should have size TestStreamNrOfMessagesPerShard
119
95
grouped should have size TestStreamNrOfMessagesPerShard
96
+ grouped.values.foreach(_.size.toLong shouldBe >= (TestStreamNumberOfShards ))
120
97
}
121
98
122
99
" not commit the position, if the event is not committed" in new withKinesisConfForApp(
123
100
" not_committed"
124
101
) {
125
- val batchSize = TestStreamNrOfMessagesPerShard / 2 // 2 * NrOfShards batches needed
126
-
127
- // This worker will read batchSize events and will not commit
102
+ // This worker will read all events and will not commit
128
103
// We expect that the read position will not change
129
104
val uncommitted = Kinesis
130
105
.source(consumerConf())
131
- .take(batchSize)
106
+ .takeWhile(_.payload.payloadAsString().toLong < TestStreamNrOfMessagesPerShard ,
107
+ inclusive = true )
132
108
.runWith(Sink .seq)
133
109
.futureValue
134
110
135
111
// This worker will read all available events.
136
112
// This works only, if the first worker has not committed anything
137
113
val committed = Kinesis
138
114
.source(consumerConf = consumerConf())
139
- .take(TestStreamNumberOfShards * TestStreamNrOfMessagesPerShard )
115
+ .takeWhile(_.payload.payloadAsString().toLong < TestStreamNrOfMessagesPerShard ,
116
+ inclusive = true )
140
117
.map { event =>
141
118
event.commit()
142
- event.payload.payload
119
+ event.payload.payloadAsString()
143
120
}
144
121
.runWith(Sink .seq)
145
122
.futureValue
146
123
147
- uncommitted should have size batchSize
124
+ uncommitted should have size TestStreamNrOfMessagesPerShard
148
125
val grouped = committed.groupBy(identity)
149
126
committed.distinct should have size TestStreamNrOfMessagesPerShard
150
127
grouped should have size TestStreamNrOfMessagesPerShard
151
- grouped.values.foreach(_ should have size TestStreamNumberOfShards )
128
+ grouped.values.foreach(_. size.toLong shouldBe >= ( TestStreamNumberOfShards ) )
152
129
}
153
130
}
154
131
}
0 commit comments