Skip to content

Commit 06d0784

Browse files
authored
Merge pull request #3449 from BalmungSan/flow-processor
Add `interop.flow.pipeToProcessor` & `interop.flow.processorToPipe`
2 parents 344ae49 + 64194f0 commit 06d0784

File tree

11 files changed

+396
-29
lines changed

11 files changed

+396
-29
lines changed

core/shared/src/main/scala/fs2/Stream.scala

Lines changed: 96 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import fs2.internal._
4040
import org.typelevel.scalaccompat.annotation._
4141
import Pull.StreamPullOps
4242

43-
import java.util.concurrent.Flow.{Publisher, Subscriber}
43+
import java.util.concurrent.Flow.{Publisher, Processor, Subscriber}
4444

4545
/** A stream producing output of type `O` and which may evaluate `F` effects.
4646
*
@@ -2853,7 +2853,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
28532853
* @param subscriber the [[Subscriber]] that will receive the elements of the stream.
28542854
*/
28552855
def subscribe[F2[x] >: F[x]: Async, O2 >: O](subscriber: Subscriber[O2]): Stream[F2, Nothing] =
2856-
interop.flow.subscribeAsStream[F2, O2](this, subscriber)
2856+
interop.flow.StreamSubscription.subscribe[F2, O2](this, subscriber)
28572857

28582858
/** Emits all elements of the input except the first one.
28592859
*
@@ -3001,7 +3001,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
30013001

30023002
/** @see [[toPublisher]] */
30033003
def toPublisherResource[F2[x] >: F[x]: Async, O2 >: O]: Resource[F2, Publisher[O2]] =
3004-
interop.flow.toPublisher(this)
3004+
interop.flow.StreamPublisher(this)
30053005

30063006
/** Translates effect type from `F` to `G` using the supplied `FunctionK`.
30073007
*/
@@ -3883,6 +3883,56 @@ object Stream extends StreamLowPriority {
38833883
await
38843884
}
38853885

3886+
/** Creates a [[Stream]] from a `subscribe` function;
3887+
* analogous to a `Publisher`, but effectual.
3888+
*
3889+
* This function is useful when you actually need to provide a subscriber to a third-party.
3890+
*
3891+
* @example {{{
3892+
* scala> import cats.effect.IO
3893+
* scala> import java.util.concurrent.Flow.{Publisher, Subscriber}
3894+
* scala>
3895+
* scala> def thirdPartyLibrary(subscriber: Subscriber[Int]): Unit = {
3896+
* | def somePublisher: Publisher[Int] = ???
3897+
* | somePublisher.subscribe(subscriber)
3898+
* | }
3899+
* scala>
3900+
* scala> // Interop with the third party library.
3901+
* scala> Stream.fromPublisher[IO, Int](chunkSize = 16) { subscriber =>
3902+
* | IO.println("Subscribing!") >>
3903+
* | IO.delay(thirdPartyLibrary(subscriber)) >>
3904+
* | IO.println("Subscribed!")
3905+
* | }
3906+
* res0: Stream[IO, Int] = Stream(..)
3907+
* }}}
3908+
*
3909+
* @note The subscribe function will not be executed until the stream is run.
3910+
*
3911+
* @see the overload that only requires a [[Publisher]].
3912+
*
3913+
* @param chunkSize setup the number of elements asked each time from the [[Publisher]].
3914+
* A high number may be useful if the publisher is triggering from IO,
3915+
* like requesting elements from a database.
3916+
* A high number will also lead to more elements in memory.
3917+
* The stream will not emit new element until,
3918+
* either the `Chunk` is filled or the publisher finishes.
3919+
* @param subscribe The effectual function that will be used to initiate the consumption process,
3920+
* it receives a [[Subscriber]] that should be used to subscribe to a [[Publisher]].
3921+
* The `subscribe` operation must be called exactly once.
3922+
*/
3923+
def fromPublisher[F[_], A](
3924+
chunkSize: Int
3925+
)(
3926+
subscribe: Subscriber[A] => F[Unit]
3927+
)(implicit
3928+
F: Async[F]
3929+
): Stream[F, A] =
3930+
Stream
3931+
.eval(interop.flow.StreamSubscriber[F, A](chunkSize))
3932+
.flatMap { subscriber =>
3933+
subscriber.stream(subscribe(subscriber))
3934+
}
3935+
38863936
/** Creates a [[Stream]] from a [[Publisher]].
38873937
*
38883938
* @example {{{
@@ -3900,8 +3950,6 @@ object Stream extends StreamLowPriority {
39003950
*
39013951
* @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run.
39023952
*
3903-
* @see the `toStream` extension method added to `Publisher`
3904-
*
39053953
* @param publisher The [[Publisher]] to consume.
39063954
* @param chunkSize setup the number of elements asked each time from the [[Publisher]].
39073955
* A high number may be useful if the publisher is triggering from IO,
@@ -3911,7 +3959,7 @@ object Stream extends StreamLowPriority {
39113959
* either the `Chunk` is filled or the publisher finishes.
39123960
*/
39133961
def fromPublisher[F[_]]: interop.flow.syntax.FromPublisherPartiallyApplied[F] =
3914-
interop.flow.fromPublisher
3962+
new interop.flow.syntax.FromPublisherPartiallyApplied(dummy = true)
39153963

39163964
/** Like `emits`, but works for any G that has a `Foldable` instance.
39173965
*/
@@ -4697,7 +4745,7 @@ object Stream extends StreamLowPriority {
46974745
def unsafeToPublisher()(implicit
46984746
runtime: IORuntime
46994747
): Publisher[A] =
4700-
interop.flow.unsafeToPublisher(self)
4748+
interop.flow.StreamPublisher.unsafe(self)
47014749
}
47024750

47034751
/** Projection of a `Stream` providing various ways to get a `Pull` from the `Stream`. */
@@ -5541,6 +5589,47 @@ object Stream extends StreamLowPriority {
55415589
/** Transforms the right input of the given `Pipe2` using a `Pipe`. */
55425590
def attachR[I0, O2](p: Pipe2[F, I0, O, O2]): Pipe2[F, I0, I, O2] =
55435591
(l, r) => p(l, self(r))
5592+
5593+
/** Creates a flow [[Processor]] from this [[Pipe]].
5594+
*
5595+
* You are required to manually subscribe this [[Processor]] to an upstream [[Publisher]], and have at least one downstream [[Subscriber]] subscribe to the [[Consumer]].
5596+
*
5597+
* Closing the [[Resource]] means not accepting new subscriptions,
5598+
* but waiting for all active ones to finish consuming.
5599+
* Canceling the [[Resource.use]] means gracefully shutting down all active subscriptions.
5600+
* Thus, no more elements will be published.
5601+
*
5602+
* @param chunkSize setup the number of elements asked each time from the upstream [[Publisher]].
5603+
* A high number may be useful if the publisher is triggering from IO,
5604+
* like requesting elements from a database.
5605+
* A high number will also lead to more elements in memory.
5606+
*/
5607+
def toProcessor(
5608+
chunkSize: Int
5609+
)(implicit
5610+
F: Async[F]
5611+
): Resource[F, Processor[I, O]] =
5612+
interop.flow.StreamProcessor.fromPipe(pipe = self, chunkSize)
5613+
}
5614+
5615+
/** Provides operations on IO pipes for syntactic convenience. */
5616+
implicit final class IOPipeOps[I, O](private val self: Pipe[IO, I, O]) extends AnyVal {
5617+
5618+
/** Creates a [[Processor]] from this [[Pipe]].
5619+
*
5620+
* You are required to manually subscribe this [[Processor]] to an upstream [[Publisher]], and have at least one downstream [[Subscriber]] subscribe to the [[Consumer]].
5621+
*
5622+
* @param chunkSize setup the number of elements asked each time from the upstream [[Publisher]].
5623+
* A high number may be useful if the publisher is triggering from IO,
5624+
* like requesting elements from a database.
5625+
* A high number will also lead to more elements in memory.
5626+
*/
5627+
def unsafeToProcessor(
5628+
chunkSize: Int
5629+
)(implicit
5630+
runtime: IORuntime
5631+
): Processor[I, O] =
5632+
interop.flow.StreamProcessor.unsafeFromPipe(pipe = self, chunkSize)
55445633
}
55455634

55465635
/** Provides operations on pure pipes for syntactic convenience. */

core/shared/src/main/scala/fs2/fs2.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
2020
*/
2121

22+
import java.util.concurrent.Flow.Processor
23+
import cats.effect.Async
24+
2225
package object fs2 {
2326

2427
/** A stream transformation represented as a function from stream to stream.
@@ -27,6 +30,36 @@ package object fs2 {
2730
*/
2831
type Pipe[F[_], -I, +O] = Stream[F, I] => Stream[F, O]
2932

33+
object Pipe {
34+
final class FromProcessorPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal {
35+
def apply[I, O](
36+
processor: Processor[I, O],
37+
chunkSize: Int
38+
)(implicit
39+
F: Async[F]
40+
): Pipe[F, I, O] =
41+
new interop.flow.ProcessorPipe(processor, chunkSize)
42+
}
43+
44+
/** Creates a [[Pipe]] from the given [[Processor]].
45+
*
46+
* The input stream won't be consumed until you request elements from the output stream,
47+
* and thus the processor is not initiated until then.
48+
*
49+
* @note The [[Pipe]] can be reused multiple times as long as the [[Processor]] can be reused.
50+
* Each invocation of the pipe will create and manage its own internal [[Publisher]] and [[Subscriber]],
51+
* and use them to subscribe to and from the [[Processor]] respectively.
52+
*
53+
* @param [[processor]] the [[Processor]] that represents the [[Pipe]] logic.
54+
* @param chunkSize setup the number of elements asked each time from the upstream [[Publisher]].
55+
* A high number may be useful if the publisher is triggering from IO,
56+
* like requesting elements from a database.
57+
* A high number will also lead to more elements in memory.
58+
*/
59+
def fromProcessor[F[_]]: FromProcessorPartiallyApplied[F] =
60+
new FromProcessorPartiallyApplied[F](dummy = true)
61+
}
62+
3063
/** A stream transformation that combines two streams in to a single stream,
3164
* represented as a function from two streams to a single stream.
3265
*
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (c) 2013 Functional Streams for Scala
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
* this software and associated documentation files (the "Software"), to deal in
6+
* the Software without restriction, including without limitation the rights to
7+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8+
* the Software, and to permit persons to whom the Software is furnished to do so,
9+
* subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in all
12+
* copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16+
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18+
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19+
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
package fs2
23+
package interop
24+
package flow
25+
26+
import cats.syntax.all.*
27+
28+
import java.util.concurrent.Flow
29+
import cats.effect.Async
30+
31+
private[fs2] final class ProcessorPipe[F[_], I, O](
32+
processor: Flow.Processor[I, O],
33+
chunkSize: Int
34+
)(implicit
35+
F: Async[F]
36+
) extends Pipe[F, I, O] {
37+
override def apply(stream: Stream[F, I]): Stream[F, O] =
38+
(
39+
Stream.resource(StreamPublisher[F, I](stream)),
40+
Stream.eval(StreamSubscriber[F, O](chunkSize))
41+
).flatMapN { (publisher, subscriber) =>
42+
val initiateUpstreamProduction = F.delay(publisher.subscribe(processor))
43+
val initiateDownstreamConsumption = F.delay(processor.subscribe(subscriber))
44+
45+
subscriber.stream(
46+
subscribe = initiateUpstreamProduction >> initiateDownstreamConsumption
47+
)
48+
}
49+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright (c) 2013 Functional Streams for Scala
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
* this software and associated documentation files (the "Software"), to deal in
6+
* the Software without restriction, including without limitation the rights to
7+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8+
* the Software, and to permit persons to whom the Software is furnished to do so,
9+
* subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in all
12+
* copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16+
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18+
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19+
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
package fs2
23+
package interop
24+
package flow
25+
26+
import java.util.concurrent.Flow
27+
import cats.effect.{Async, IO, Resource}
28+
import cats.effect.unsafe.IORuntime
29+
30+
private[fs2] final class StreamProcessor[F[_], I, O](
31+
streamSubscriber: StreamSubscriber[F, I],
32+
streamPublisher: StreamPublisher[F, O]
33+
) extends Flow.Processor[I, O] {
34+
override def onSubscribe(subscription: Flow.Subscription): Unit =
35+
streamSubscriber.onSubscribe(subscription)
36+
37+
override def onNext(i: I): Unit =
38+
streamSubscriber.onNext(i)
39+
40+
override def onError(ex: Throwable): Unit =
41+
streamSubscriber.onError(ex)
42+
43+
override def onComplete(): Unit =
44+
streamSubscriber.onComplete()
45+
46+
override def subscribe(subscriber: Flow.Subscriber[? >: O]): Unit =
47+
streamPublisher.subscribe(subscriber)
48+
}
49+
50+
private[fs2] object StreamProcessor {
51+
def fromPipe[F[_], I, O](
52+
pipe: Pipe[F, I, O],
53+
chunkSize: Int
54+
)(implicit
55+
F: Async[F]
56+
): Resource[F, StreamProcessor[F, I, O]] =
57+
for {
58+
streamSubscriber <- Resource.eval(StreamSubscriber[F, I](chunkSize))
59+
inputStream = streamSubscriber.stream(subscribe = F.unit)
60+
outputStream = pipe(inputStream)
61+
streamPublisher <- StreamPublisher(outputStream)
62+
} yield new StreamProcessor(
63+
streamSubscriber,
64+
streamPublisher
65+
)
66+
67+
def unsafeFromPipe[I, O](
68+
pipe: Pipe[IO, I, O],
69+
chunkSize: Int
70+
)(implicit
71+
runtime: IORuntime
72+
): StreamProcessor[IO, I, O] = {
73+
val streamSubscriber = StreamSubscriber.unsafe[IO, I](chunkSize)
74+
val inputStream = streamSubscriber.stream(subscribe = IO.unit)
75+
val outputStream = pipe(inputStream)
76+
val streamPublisher = StreamPublisher.unsafe(outputStream)
77+
new StreamProcessor(
78+
streamSubscriber,
79+
streamPublisher
80+
)
81+
}
82+
}

core/shared/src/main/scala/fs2/interop/flow/StreamPublisher.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ package fs2
2323
package interop
2424
package flow
2525

26-
import cats.effect.IO
27-
import cats.effect.kernel.{Async, Resource}
26+
import cats.effect.{Async, IO, Resource}
2827
import cats.effect.std.Dispatcher
2928
import cats.effect.unsafe.IORuntime
3029

@@ -42,7 +41,7 @@ import scala.util.control.NoStackTrace
4241
*
4342
* @see [[https://github.com/reactive-streams/reactive-streams-jvm#1-publisher-code]]
4443
*/
45-
private[flow] sealed abstract class StreamPublisher[F[_], A] private (
44+
private[fs2] sealed abstract class StreamPublisher[F[_], A] private (
4645
stream: Stream[F, A]
4746
)(implicit
4847
F: Async[F]
@@ -65,7 +64,7 @@ private[flow] sealed abstract class StreamPublisher[F[_], A] private (
6564
}
6665
}
6766

68-
private[flow] object StreamPublisher {
67+
private[fs2] object StreamPublisher {
6968
private final class DispatcherStreamPublisher[F[_], A](
7069
stream: Stream[F, A],
7170
dispatcher: Dispatcher[F]

0 commit comments

Comments
 (0)