-
Notifications
You must be signed in to change notification settings - Fork 26
Home
Tomas Fabian edited this page May 31, 2022
·
10 revisions
Welcome to the Kafka.DotNet.ksqlDB wiki!
Install with NuGet package manager:
Install-Package ksqlDB.RestApi.Client
//Program.cs
using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDB.RestApi.Client.WorkerService;
using ksqlDB.RestApi.Client.WorkerService.ksqlDB;
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
var ksqlDbUrl = @"http:\\localhost:8088"; // for local dev. provide your IP address
services.AddDbContext<IMoviesKSqlDbContext, MoviesKSqlDbContext>(
options =>
{
var setupParameters = options.UseKSqlDb(ksqlDbUrl);
setupParameters.SetAutoOffsetReset(AutoOffsetReset.Earliest);
}, ServiceLifetime.Transient);
services.AddHostedService<Worker>();
})
.Build();
await host.RunAsync();
//Worker.cs
using System.Reactive.Linq;
using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.WorkerService.ksqlDB;
using ksqlDB.RestApi.Client.WorkerService.Models;
namespace ksqlDB.RestApi.Client.WorkerService;
public class Worker : IHostedService //BackgroundService
{
private readonly IMoviesKSqlDbContext context;
private readonly ILogger<Worker> logger;
public Worker(IMoviesKSqlDbContext context, ILogger<Worker> logger)
{
this.context = context;
this.logger = logger;
}
public Task StartAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Worker started at: {time}", DateTimeOffset.Now);
SubscribeToMovies();
return Task.CompletedTask;
}
private IDisposable subscription = null!;
private void SubscribeToMovies()
{
subscription = context.CreateQueryStream<Movie>()
.Where( c => !c.Title.StartsWith("Star"))
.Subscribe(onNext: movie =>
{
Console.WriteLine($"{nameof(Movie)}: {movie.Id} - {movie.Title} - {movie.RowTime}");
Console.WriteLine();
}, onError: error => { Console.WriteLine($"Exception: {error.Message}"); }, onCompleted: () => Console.WriteLine("Completed"));
}
public Task StopAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Stopping.");
using (subscription)
{
}
return Task.CompletedTask;
}
}
//MoviesKSqlDbContext.cs
using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.WorkerService.Models;
namespace ksqlDB.RestApi.Client.WorkerService.ksqlDB;
internal class MoviesKSqlDbContext : KSqlDBContext, IMoviesKSqlDbContext
{
public MoviesKSqlDbContext(string ksqlDbUrl, ILoggerFactory loggerFactory = null)
: base(ksqlDbUrl, loggerFactory)
{
}
public MoviesKSqlDbContext(KSqlDBContextOptions contextOptions, ILoggerFactory loggerFactory = null)
: base(contextOptions, loggerFactory)
{
}
public IQbservable<Movie> Movies => CreateQueryStream<Movie>();
}
public interface IMoviesKSqlDbContext : IKSqlDBContext
{
IQbservable<Movie> Movies { get; }
}
//Movie.cs
using ksqlDB.RestApi.Client.KSql.Query;
namespace ksqlDB.RestApi.Client.WorkerService.Models;
public class Movie : Record
{
public string Title { get; set; }
public int Id { get; set; }
public int Release_Year { get; set; }
}