Replies: 2 comments 2 replies
-
Or it could be named |
Beta Was this translation helpful? Give feedback.
0 replies
-
something like this? implicit class StreamOps[F[_], O](private val s: Stream[F, O]) extends AnyVal {
def groupWeighedWithin(timeout: FiniteDuration, limit: Long, weight: O => Long)(implicit
F: Temporal[F]
): Stream[F, Chunk[O]] =
s.pull
.timed { timedPull =>
def go(timedPull: Pull.Timed[F, O], buffer: Chunk[O], bufferSize: Long): Pull[F, O, Unit] =
timedPull.timeout(timeout) >>
timedPull.uncons.flatMap {
case Some((Right(elems), next)) =>
val size = elems.map(weight).sumAll
val nextSize = bufferSize + size
if (nextSize >= limit)
Pull.output(buffer) >> go(next, elems, size)
else
go(next, buffer ++ elems, nextSize)
case Some((Left(_), next)) =>
Pull.output(buffer) >> go(next, Chunk.empty, 0)
case None =>
Pull.output(buffer) >> Pull.done
}
go(timedPull, Chunk.empty, 0)
}
.stream
.chunks
} |
Beta Was this translation helpful? Give feedback.
2 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
This is a question about an idea: introducing
Stream.groupWeighted
. It would output Chunks limited by the weight of the elements. Weight would be provided by a weight function.Similar to Akka's
groupedWeightedWithin
, but without the time config.Or even a
Stream.groupWeightedWithin
method, similar to the already existingStream.groupWithin
.What do you think?
Beta Was this translation helpful? Give feedback.
All reactions