Skip to content

ksqlDB.RestApi.Client workshop

Tomas Fabian edited this page Jul 16, 2022 · 28 revisions

Create a new ksqlDB.RestApi.Client service project

Prerequisites

  • Visual Studio 2022
  • .NET 5.0 or higher
  1. create a new solution and worker service project
mkdir ksqlDB.RestApi.Client.Sensors
cd ksqlDB.RestApi.Client.Sensors

Add a nuget.config file in the root of your project repository:

dotnet new nugetconfig

In case that you don't have installed the latest .NET preview version add a global.json file with an overridden sdk version of your joyce:

dotnet --list-sdks

The following example shows the global.json syntax:

{
  "sdk": {
    "version": "6.0.302"
  }
}

Now we are ready to create our solution:

dotnet new sln -n SensorsStreamProcessor
dotnet new worker -n ksqlDB.RestApi.Client.Sensors
dotnet sln add ksqlDB.RestApi.Client.Sensors

Add a reference to the ksqlDB.RestApi.Client NuGet package.

dotnet add ksqlDB.RestApi.Client.Sensors package ksqlDB.RestApi.Client --version 2.1.3

We prepared the basic structure of the solution and therefore we can open it now with Visual Studio:

./SensorsStreamProcessor.sln

Register the KsqlDbContext

Add 2 new folders KSqlDb and Data where we want to place all our database related code.

mkdir KSqlDB.RestApi.Client.Sensors/KSqlDb
mkdir KSqlDB.RestApi.Client.Sensors/Models

Add a new file Sensor.cs in the Models directory using the following code:

namespace ksqlDB.RestApi.Client.Sensors.Models;

public record Sensor
{
  public int Id { get; init; }

  public string Name { get; init; } = null!;

  public bool IsAvailable { get; init; }
}

and a file named SensorsKSqlDbContext.cs in the KSqldb directory using the following code:

using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.WorkerService.Models;

namespace ksqlDB.RestApi.Client.WorkerService.ksqlDB;

internal class MoviesKSqlDbContext : KSqlDBContext, IMoviesKSqlDbContext
{
  public MoviesKSqlDbContext(string ksqlDbUrl, ILoggerFactory loggerFactory = null)
    : base(ksqlDbUrl, loggerFactory)
  {
  }

  public MoviesKSqlDbContext(KSqlDBContextOptions contextOptions, ILoggerFactory loggerFactory = null)
    : base(contextOptions, loggerFactory)
  {
  }

  public IQbservable<Movie> Movies => CreateQueryStream<Movie>();
}

SensorsKSqlDbContext can be registered now with the IServiceCollection. Open Program.cs file and edit it with the bellow code:

using ksqlDB.RestApi.Client.Sensors;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDB.RestApi.Client.Sensors.KSqlDb;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
      var ksqlDbUrl = @"http:\\localhost:8088";

      services.AddDbContext<ISensorsKSqlDbContext, SensorsKSqlDbContext>(
        options =>
        {
          var setupParameters = options.UseKSqlDb(ksqlDbUrl);

          setupParameters.SetAutoOffsetReset(AutoOffsetReset.Earliest);

        }, ServiceLifetime.Transient);

      services.AddHostedService<Worker>();
    })
    .Build();

await host.RunAsync();

We are now ready to prepare our first KSqlDb push query: Replace the content of the Worker.cs class with the following code:

Create a local KSqlDb server with Docker-desktop or use your existing cluster.

TODO: continue

Congratulations! You've just successfully created your stream processor with ksqlDB.RestApi.Client.

Clone this wiki locally