Skip to content
Tomas Fabian edited this page May 31, 2022 · 10 revisions

Welcome to the Kafka.DotNet.ksqlDB wiki!

Install with NuGet package manager:

Install-Package ksqlDB.RestApi.Client

//Program.cs

using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDB.RestApi.Client.WorkerService;
using ksqlDB.RestApi.Client.WorkerService.ksqlDB;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
      var ksqlDbUrl = @"http:\\localhost:8088"; // for local dev. provide your IP address

      services.AddDbContext<IMoviesKSqlDbContext, MoviesKSqlDbContext>(
        options =>
        {
          var setupParameters = options.UseKSqlDb(ksqlDbUrl);

          setupParameters.SetAutoOffsetReset(AutoOffsetReset.Earliest);

        }, ServiceLifetime.Transient);

      services.AddHostedService<Worker>();
    })
    .Build();

await host.RunAsync();

//Worker.cs

using System.Reactive.Linq;
using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.WorkerService.ksqlDB;
using ksqlDB.RestApi.Client.WorkerService.Models;

namespace ksqlDB.RestApi.Client.WorkerService;

public class Worker : IHostedService //BackgroundService
{
  private readonly IMoviesKSqlDbContext context;
  private readonly ILogger<Worker> logger;

  public Worker(IMoviesKSqlDbContext context, ILogger<Worker> logger)
  {
    this.context = context;
    this.logger = logger;
  }

  public  Task StartAsync(CancellationToken cancellationToken)
  {
    logger.LogInformation("Worker started at: {time}", DateTimeOffset.Now);

    SubscribeToMovies();

    return Task.CompletedTask;
  }

  private IDisposable subscription = null!;

  private void SubscribeToMovies()
  {
    subscription = context.CreateQueryStream<Movie>()
      .Where( c => !c.Title.StartsWith("Star"))
      .Subscribe(onNext: movie =>
      {
        Console.WriteLine($"{nameof(Movie)}: {movie.Id} - {movie.Title} - {movie.RowTime}");
        Console.WriteLine();
      }, onError: error => { Console.WriteLine($"Exception: {error.Message}"); }, onCompleted: () => Console.WriteLine("Completed"));
  }

  public Task StopAsync(CancellationToken cancellationToken)
  {
    logger.LogInformation("Stopping.");

    using (subscription)
    {
    }

    return Task.CompletedTask;
  }
}

//MoviesKSqlDbContext.cs

using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.WorkerService.Models;

namespace ksqlDB.RestApi.Client.WorkerService.ksqlDB;

internal class MoviesKSqlDbContext : KSqlDBContext, IMoviesKSqlDbContext
{
  public MoviesKSqlDbContext(string ksqlDbUrl, ILoggerFactory loggerFactory = null)
    : base(ksqlDbUrl, loggerFactory)
  {
  }

  public MoviesKSqlDbContext(KSqlDBContextOptions contextOptions, ILoggerFactory loggerFactory = null)
    : base(contextOptions, loggerFactory)
  {
  }

  public IQbservable<Movie> Movies => CreateQueryStream<Movie>();
}

public interface IMoviesKSqlDbContext : IKSqlDBContext
{
  IQbservable<Movie> Movies { get; }
}

//Movie.cs

using ksqlDB.RestApi.Client.KSql.Query;

namespace ksqlDB.RestApi.Client.WorkerService.Models;

public class Movie : Record
{
  public string Title { get; set; }
  public int Id { get; set; }
  public int Release_Year { get; set; }
}
Clone this wiki locally