LanguageExt.Streaming + MonadIO + Deriving #1465
Closed
louthy
announced in
Announcements
Replies: 1 comment
-
Dupe: #1464 |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
Features:
Source
SourceT
Sink
SinkT
Conduit
ConduitT
MonadIO
Deriving
New streaming library
A seemingly innocuous bug in the
StreamT
type opened up a rabbit hole of problems that needed a fundamental rewrite to fix. In the process more and more thoughts came to my mind about bringing the streaming functionality under one roof. So, now, there's a new language-ext libraryLanguageExt.Streaming
and theLanguageExt.Pipes
library has been deprecated.This is the structure of the
Streaming
library:Transducers are back
Transducers were going to be the big feature of
v5
before I worked out the new trait-system. They were going to be too much effort to bring in + all of the traits, but now with the new streaming functionality they are hella useful again. So, I've re-addedTransducer
and a newTransducerM
(which can work with lifted types). Right now the functionality is relatively limited, but you can extend the set of transducers as much as you like by deriving new types fromTransducer
andTransducerM
.Documentation
The API documentation has some introductory information on the streaming functionality. It's a little light at the moment because I wanted to get the release done, but it's still useful to look at:
The
Streaming
library of language-ext is all about compositional streams. There are two key types of streamingfunctionality: closed-streams and open-streams...
Closed streams
Closed streams are facilitated by the
Pipes
system. The types in thePipes
system are compositionalmonad-transformers that 'fuse' together to produce an
EffectT<M, A>
. This effect is a closed system,meaning that there is no way (from the API) to directly interact with the effect from the outside: it can be executed
and will return a result if it terminates.
The pipeline components are:
ProducerT<OUT, M, A>
PipeT<IN, OUT, M, A>
ConsumerT<IN, M, A>
These are the components that fuse together (using the
|
operator) to make anEffectT<M, A>
. Thetypes are monad-transformers that support lifting monads with the
MonadIO
trait only (which constrainsM
). Thismakes sense, otherwise the closed-system would have no effect other than heating up the CPU.
There are also more specialised versions of the above that only support the lifting of the
Eff<RT, A>
effect-monad:Producer<RT, OUT, A>
Pipe<RT, IN, OUT, A>
Consumer<RT, IN, A>
They all fuse together into an
Effect<RT, A>
Pipes are especially useful if you want to build reusable streaming components that you can glue together ad infinitum.
Pipes are, arguably, less useful for day-to-day stream processing, like handling events, but your mileage may vary.
More details on the
Pipes page
.Open streams
Open streams are closer to what most C# devs have used classically. They are like events or
IObservable
streams.They yield values and (under certain circumstances) accept inputs.
Source
andSourceT
yield values synchronously or asynchronously depending on their construction. Can support multiple readers.Sink
andSinkT
receives values and propagates them through the channel they're attached to. Can support multiple writers.Conduit
andConduitT
provides and input transducer (acts like aSink
), an internal buffer, and an output transducer (acts like aSource
). Supports multiple writers and one reader. But can yield aSource
`SourceT` that allows for multiple readers.Source
Source<A>
is the 'classic stream': you can lift any of the following types into it:System.Threading.Channels.Channel<A>
,IEnumerable<A>
,IAsyncEnumerable<A>
, or singleton values. To process a stream, you need to use one of theReduce
or
ReduceAsync
variants. These takeReducer
delegates as arguments. They are essentially a fold over the stream ofvalues, which results in an aggregated state once the stream has completed. These reducers can be seen to play a similar
role to
Subscribe
inIObservable
streams, but are more principled because they return a value (which we can leverageto carry state for the duration of the stream).
Source
also supports some built-in reducers:Last
- aggregates no state, simply returns the last item yieldedIter
- this forces evaluation of the stream, aggregating no state, and ignoring all yielded values.Collect
- adds all yielded values to aSeq<A>
, which is then returned upon stream completion.SourceT
SourceT<M, A>
is the classic-stream embellished - it turns the stream into a monad-transformer that canlift any
MonadIO
-enabled monad (M
), allowing side effects to be embedded into the stream in a principled way.So, for example, to use the
IO<A>
monad withSourceT
, simply use:SourceT<IO, A>
. Then you can use one of thefollowing
static
methods on theSourceT
type to liftIO<A>
effects into a stream:SourceT.liftM(IO<A> effect)
creates a singleton-streamSourceT.foreverM(IO<A> effect)
creates an infinite stream, repeating the same effect over and overSourceT.liftM(Channel<IO<A>> channel)
lifts aSystem.Threading.Channels.Channel
of effectsSourceT.liftM(IEnumerable<IO<A>> effects)
lifts anIEnumerable
of effectsSourceT.liftM(IAsyncEnumerable<IO<A>> effects)
lifts anIAsyncEnumerable
of effectsSourceT
also supports the same built-in convenience reducers asSource
(Last
,Iter
,Collect
).Sink
Sink<A>
provides a way to accept many input values. The values are buffered until consumed. The sink can bethought of as a
System.Threading.Channels.Channel
(which is the buffer that collects the values) that happens tomanipulate the values being posted to the buffer just before they are stored.
So, to manipulate values coming into the
Sink
, useComap
. It will give you a newSink
with the manipulation 'built-in'.SinkT
SinkT<M, A>
provides a way to accept many input values. The values are buffered until consumed. The sink canbe thought of as a
System.Threading.Channels.Channel
(which is the buffer that collects the values) that happens tomanipulate the values being posted to the buffer just before they are stored.
So, to manipulate values coming into the
SinkT
, useComap
. It will give you a newSinkT
with the manipulation 'built-in'.SinkT
is also a transformer that lifts types ofK<M, A>
.Conduit
Conduit<A, B>
can be pictured as so:A
is posted to theConduit
(viaPost
)Transducer
, mapping theA
value toX
(an internal type you can't see)X
value is then stored in the conduit's internal buffer (aSystem.Threading.Channels.Channel
)Reduce
will force the consumption of the values in the bufferX
through the outputTransducer
So the input and output transducers allow for pre and post-processing of values as they flow through the conduit.
Conduit
is aCoFunctor
, callComap
to manipulate the pre-processing transducer.Conduit
is also aFunctor
, callMap
to manipulate the post-processing transducer. There are other non-trait, but common behaviours, likeFoldWhile
,Filter
,Skip
,Take
, etc.ConduitT
ConduitT<M, A, B>
can be pictured as so:K<M, A>
is posted to theConduit
(viaPost
)TransducerM
, mapping theK<M, A>
value toK<M, X>
(an internal type you can't see)K<M, X>
value is then stored in the conduit's internal buffer (aSystem.Threading.Channels.Channel
)Reduce
will force the consumption of the values in the bufferK<M, A>
through the outputTransducerM
So the input and output transducers allow for pre and post-processing of values as they flow through the conduit.
ConduitT
is aCoFunctor
, callComap
to manipulate the pre-processing transducer.Conduit
is also aFunctor
, callMap
to manipulate the post-processing transducer. There are other non-trait, but common behaviours, likeFoldWhile
,Filter
,Skip
,Take
, etc.Open to closed streams
Clearly, even for 'closed systems' like the
Pipes
system, it would be beneficial to be able to post valuesinto the streams from the outside. And so, the open-stream components can all be converted into
Pipes
componentslike
ProducerT
andConsumerT
.Conduit
andConduitT
supportToProducer
,ToProducerT
,ToConsumer
, andToConsumerT
.Sink
andSinkT
supportsToConsumer
, andToConsumerT
.Source
andSourceT
supportsToProducer
, andToProducerT
.This allows for the ultimate flexibility in your choice of streaming effect. It also allows for efficient concurrency in
the more abstract and compositional world of the pipes. In fact
ProducerT.merge
, which merges many streams into one,uses
ConduitT
internally to collect the values and to merge them into a singleProducerT
.MonadIO
Based on this discuission I have refactored
Monad
,MonadIO
, and created a newMaybe.MonadIO
. This achieves the aims of the makingMonadIO
a useful trait and constraint. The one difference between the proposal and my implementation is that I didn't makeMonadT
inheritMonadIO
.Any monad-transformer must add its own
MonadIO
constraint if allowsIO
to be lifted into the transformer. This is more principled, I think. It allows for some transformers to be explicitly non-IO if necessary.All of the core monad-transformers support
MonadIO
-- so the ultimate goal has been achieved.Deriving
Anybody who's used Haskell knows the
deriving
keyword and its ability to provide trait-implementations automatically (for traits likeFunctor
and the like). This saves writing a load of boilerplate. Well thanks to a suggestion by @micmarsh we can now do the same.The technique uses natural-transformations to convert to and from the wrapper type. You can see this in action in the
CardGame
sample. TheGame
trait-implementation looks like this:The only thing that needs implementing is the
Transform
andCoTransform
methods. They simply unpack the underlying implementation or repack it.Deriving.Monad
simply implementsMonad<M>
in terms ofTransform
andCoTransform
, which means you don't have to write all the boilerplate.Conclusion
Can I also just say a personal note of thanks to @hermanda19 and @micmarsh - well worked out and thoughtful suggestions, like the ones listed above, are manna for a library like this that is trying to push the limits of the language. Thank you!
Finally, I will be working on some more documentation and getting back to my blog as soon as I can. This is the home stretch now. So, there's lots of documentation, unit tests, refinements, etc. as I head toward the full
v5
release. I have a few trips lined up, so it won't be imminent, but hopefully at some point in the summer I'll have the full release out of the door!This discussion was created from the release LanguageExt.Streaming + MonadIO + Deriving.
Beta Was this translation helpful? Give feedback.
All reactions