File tree Expand file tree Collapse file tree 2 files changed +13
-7
lines changed Expand file tree Collapse file tree 2 files changed +13
-7
lines changed Original file line number Diff line number Diff line change @@ -3646,9 +3646,13 @@ object Stream extends StreamLowPriority {
3646
3646
3647
3647
def getNextChunk (i : Iterator [A ]): F [Option [(Chunk [A ], Iterator [A ])]] =
3648
3648
F .suspend(hint) {
3649
- i.take(chunkSize).toVector
3650
- }.map { s =>
3651
- if (s.isEmpty) None else Some ((Chunk .from(s), i))
3649
+ val bldr = Vector .newBuilder[A ]
3650
+ var cnt = 0
3651
+ while (cnt < chunkSize && i.hasNext) {
3652
+ bldr += i.next()
3653
+ cnt += 1
3654
+ }
3655
+ if (cnt == 0 ) None else Some ((Chunk .from(bldr.result()), i))
3652
3656
}
3653
3657
3654
3658
Stream .unfoldChunkEval(iterator)(getNextChunk)
Original file line number Diff line number Diff line change @@ -704,16 +704,18 @@ class StreamCombinatorsSuite extends Fs2Suite {
704
704
}
705
705
706
706
test(" fromIterator" ) {
707
- forAllF { (x : List [Int ], cs : Int ) =>
707
+ // Note: important to use Vector here and not List in order to prevent https://github.com/typelevel/fs2/issues/3415
708
+ forAllF { (x : Vector [Int ], cs : Int ) =>
708
709
val chunkSize = (cs % 4096 ).abs + 1
709
- Stream .fromIterator[IO ](x.iterator, chunkSize).assertEmits(x)
710
+ Stream .fromIterator[IO ](x.iterator, chunkSize).assertEmits(x.toList )
710
711
}
711
712
}
712
713
713
714
test(" fromBlockingIterator" ) {
714
- forAllF { (x : List [Int ], cs : Int ) =>
715
+ // Note: important to use Vector here and not List in order to prevent https://github.com/typelevel/fs2/issues/3415
716
+ forAllF { (x : Vector [Int ], cs : Int ) =>
715
717
val chunkSize = (cs % 4096 ).abs + 1
716
- Stream .fromBlockingIterator[IO ](x.iterator, chunkSize).assertEmits(x)
718
+ Stream .fromBlockingIterator[IO ](x.iterator, chunkSize).assertEmits(x.toList )
717
719
}
718
720
}
719
721
You can’t perform that action at this time.
0 commit comments