DataCollector
in a functional way
#1089
Replies: 2 comments
-
Currently There is some work going on to add a simple runtime to I don't have enough time to make your above code functional, but you can always capture something like the above code within an static class Foo<RT>
where RT: struct, HasCancel<RT>
{
public static Aff<RT, Unit> MainAsync(IEnumerable<string> ips) =>
ips.SequenceParallel(Loop)
.Map(_ => unit);
static Aff<RT, Unit> Loop(string ip) =>
Aff<RT, Unit>(rt =>
{
using (var local = CancellationTokenSource.CreateLinkedTokenSource(rt.CancellationToken))
{
_ = await Connect(ip, local.Token, async stream =>
{
// for health check
var task = Task.Run(async () =>
{
while (!local.IsCancellationRequested)
{
await stream.WriteAsync(Encoding.ASCII.GetBytes("||>GET DEVICE.NAME\r\n"), local.Token);
await Task.Delay(TimeSpan.FromSeconds(5), local.Token);
}
}, local.Token);
// call external APIs
await foreach (var reading in ReadingsAsync(stream, timeout: TimeSpan.FromSeconds(10),
ct: local.Token))
{
var timestamp = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
switch (reading)
{
case "||[105]":
goto Disconnect;
case { } s when s.StartsWith("DM"):
await PostConnected(timestamp, ip);
break;
case {Length: > 0 and <= 200}:
await PostScanData(reading, timestamp, ip);
break;
}
}
Disconnect:
local.Cancel();
try
{
await task;
}
catch (TaskCanceledException)
{
}
return unit;
});
}
await PostDisconnected(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), ip);
await Task.Delay(TimeSpan.FromSeconds(5), ct);
return unit;
});
} Obviously that means you spend all your time writing boilerplate like The |
Beta Was this translation helpful? Give feedback.
-
Thanks for the help. When I try to access to
@louthy could you help me figuring out, what could be the problem? public static class DataCollector<RT>
where RT : struct, HasCancel<RT>, HasConsole<RT>
{
public static Aff<RT, Unit> Main(IEnumerable<string> ips) =>
ips.SequenceParallel(Loop)
.Map(_ => unit);
static Aff<RT, Unit> Loop(string ip) =>
use(Eff(() => new TcpClient()), client =>
from ct in cancelToken<RT>()
from _ in client.ConnectAsync(ip, 23, ct).ToUnit().ToAff()
let stream = client.GetStream()
from cancel in fork(Health(stream))
from __ in TcpConnect(stream) | readLine | writeLine
select unit)
| @catch(error => Console<RT>.writeLine(error.Message).ToAff());
static Aff<RT, Unit> Health(Stream stream) =>
from ct in cancelToken<RT>()
let cmd = Encoding.ASCII.GetBytes("||>GET DEVICE.NAME\r\n")
from _ in Aff(() => stream.WriteAsync(cmd, ct).ToUnit()).Repeat(Schedule.spaced(5 * second) | Schedule.RepeatForever)
select unit;
static Producer<RT, TextReader, Unit> TcpConnect(Stream stream) =>
from reader in use<RT, StreamReader>(SuccessEff(new StreamReader(stream)))
from _ in Producer.yield<RT, TextReader>(reader)
select unit;
static Pipe<RT, TextReader, string, Unit> readLine
{
get
{
return from tr in awaiting<TextReader>()
from ct in cancelToken<RT>()
from ln in enumerate2(go(tr))
from __ in yield(ln)
select unit;
static async IAsyncEnumerable<string> go(TextReader reader)
{
while (true)
{
var line = await reader.ReadLineAsync().ConfigureAwait(false);
if(line == null) yield break;
yield return line;
}
}
}
}
static Consumer<RT, string, Unit> writeLine =>
from l in awaiting<string>()
from _ in Console<RT>.writeLine(l)
select unit;
} UpdateUsing static Pipe<RT, TextReader, string, Unit> readLine
{
get
{
return from tr in awaiting<TextReader>()
from ct in cancelToken<RT>()
from ln in enumerate(go(tr))
// from _ in yield(ln)
select unit;
static async IAsyncEnumerable<string> go(TextReader reader)
{
while (true)
{
var line = await reader.ReadLineAsync().ConfigureAwait(false);
if (line == null) yield break;
yield return line;
}
}
}
} |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Dear @louthy,
I would like to discuss on the snippet below.
I would like to rewrite it using functional approach.
My main difficulties that I don't know what is the up-to-date information with the cancellation.
I have bumped into PR where
Eff
also got some kind of cancellation support, how it is possible at all, that a sync process gets cancelled? - Or am I wrong?I know it is much, but would it be possible to show us what would be your approach?
Thank you for all the effort bringing functional programming closer to C#.
Beta Was this translation helpful? Give feedback.
All reactions