Skip to content

ksqlDB.RestApi.Client workshop

Tomas Fabian edited this page Jul 16, 2022 · 28 revisions

Create a new ksqlDB.RestApi.Client service project

Prerequisites

  • Visual Studio 2022
  • .NET 5.0 or higher
  1. create a new solution and worker service project
mkdir ksqlDB.RestApi.Client.Sensors
cd ksqlDB.RestApi.Client.Sensors

Add a nuget.config file in the root of your project repository:

dotnet new nugetconfig

In case that you don't have installed the latest .NET preview version add a global.json file with an overridden sdk version of your joyce:

dotnet --list-sdks

The following example shows the global.json syntax:

{
  "sdk": {
    "version": "6.0.302"
  }
}

Now we are ready to create our solution:

dotnet new sln -n SensorsStreamProcessor
dotnet new worker -n ksqlDB.RestApi.Client.Sensors
dotnet sln add ksqlDB.RestApi.Client.Sensors

Add a reference to the ksqlDB.RestApi.Client NuGet package.

dotnet add ksqlDB.RestApi.Client.Sensors package ksqlDB.RestApi.Client --version 2.1.4

We prepared the basic structure of the solution and therefore we can open it now with Visual Studio:

./SensorsStreamProcessor.sln

Register the KsqlDbContext

Add 2 new folders KSqlDb and Data where we want to place all our database related code.

mkdir KSqlDB.RestApi.Client.Sensors/KSqlDb
mkdir KSqlDB.RestApi.Client.Sensors/Models

Add a new file Sensor.cs in the Models directory using the following code:

namespace ksqlDB.RestApi.Client.Sensors.Models;

public record Sensor
{
  public int Id { get; init; }

  public string Name { get; init; } = null!;

  public bool IsAvailable { get; init; }
}

and a file named SensorsKSqlDbContext.cs in the KSqldb directory using the following code:

using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.WorkerService.Models;

namespace ksqlDB.RestApi.Client.WorkerService.ksqlDB;

internal class MoviesKSqlDbContext : KSqlDBContext, IMoviesKSqlDbContext
{
  public MoviesKSqlDbContext(string ksqlDbUrl, ILoggerFactory loggerFactory = null)
    : base(ksqlDbUrl, loggerFactory)
  {
  }

  public MoviesKSqlDbContext(KSqlDBContextOptions contextOptions, ILoggerFactory loggerFactory = null)
    : base(contextOptions, loggerFactory)
  {
  }

  public IQbservable<Movie> Movies => CreateQueryStream<Movie>();
}

SensorsKSqlDbContext can be registered now with the IServiceCollection. Open Program.cs file and edit it with the bellow code:

using ksqlDB.RestApi.Client.Sensors;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDB.RestApi.Client.Sensors.KSqlDb;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
      var ksqlDbUrl = @"http:\\localhost:8088";

      services.AddDbContext<ISensorsKSqlDbContext, SensorsKSqlDbContext>(
        options =>
        {
          var setupParameters = options.UseKSqlDb(ksqlDbUrl);

          setupParameters.SetAutoOffsetReset(AutoOffsetReset.Earliest);

        }, ServiceLifetime.Transient);

      services.AddHostedService<Worker>();
    })
    .Build();

await host.RunAsync();

We are now ready to prepare our first KSqlDb push query: Replace the content of the Worker.cs class with the following code:

using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.RestApi;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements;
using ksqlDB.RestApi.Client.Sensors.KSqlDb;
using ksqlDB.RestApi.Client.Sensors.Models;

namespace ksqlDB.RestApi.Client.Sensors;

internal class Worker : IHostedService
{
  private readonly ISensorsKSqlDbContext kSqlDbContext;
  private readonly IKSqlDbRestApiClient kSqlDbRestApiClient;
  private readonly ILogger<Worker> logger;

  public Worker(ISensorsKSqlDbContext kSqlDbContext, IKSqlDbRestApiClient kSqlDbRestApiClient, ILogger<Worker> logger)
  {
    this.kSqlDbContext = kSqlDbContext;
    this.kSqlDbRestApiClient = kSqlDbRestApiClient;
    this.logger = logger;
  }

  public async Task StartAsync(CancellationToken cancellationToken)
  {
    logger.LogInformation("Worker started at: {time}", DateTimeOffset.Now);
    
    EntityCreationMetadata metadata = new()
    {
      KafkaTopic = nameof(Sensor),
      Partitions = 3,
      Replicas = 1
    };

    var httpResponseMessage = await kSqlDbRestApiClient.CreateStreamAsync<Sensor>(metadata, ifNotExists: true, cancellationToken: cancellationToken).ConfigureAwait(false);
   
    SubscribeToSensors();
  }

  private IDisposable subscription = null!;

  private void SubscribeToSensors()
  {
    subscription = kSqlDbContext.Sensors
      .Where(c => c.IsAvailable)
      .Subscribe(onNext: sensor =>
      {
        Console.WriteLine($"{nameof(Sensor)}: {sensor.Id} - {sensor.Name}");
        Console.WriteLine();
      }, onError: error => { Console.WriteLine($"Exception: {error.Message}"); }, onCompleted: () => Console.WriteLine("Completed"));
  }

  public Task StopAsync(CancellationToken cancellationToken)
  {
    logger.LogInformation("Stopping.");

    using (subscription)
    {
    }

    return Task.CompletedTask;
  }
}

Create a local KSqlDb server with Docker-desktop or use your existing cluster.

TODO: continue

Congratulations! You've just successfully created your stream processor with ksqlDB.RestApi.Client.

Clone this wiki locally