Skip to content

ksqlDB.RestApi.Client workshop

Tomas Fabian edited this page Jan 12, 2023 · 28 revisions

Create a new ksqlDB.RestApi.Client service project

Prerequisites

  • Visual Studio 2022
  • .NET 5.0 or higher
  1. create a new solution and worker service project
mkdir ksqlDB.RestApi.Client.Sensors
cd ksqlDB.RestApi.Client.Sensors

Add a nuget.config file in the root of your project repository:

dotnet new nugetconfig

In case that you don't have installed the latest .NET preview version add a global.json file with an overridden sdk version of your choice:

dotnet --list-sdks

The following example shows the global.json syntax:

{
  "sdk": {
    "version": "6.0.302"
  }
}

Now we are ready to create our solution:

dotnet new sln -n SensorsStreamProcessor
dotnet new worker -n ksqlDB.RestApi.Client.Sensors
dotnet sln add ksqlDB.RestApi.Client.Sensors

Add a reference to the ksqlDB.RestApi.Client NuGet package.

dotnet add ksqlDB.RestApi.Client.Sensors package ksqlDB.RestApi.Client --version 2.6.0

We prepared the basic structure of the solution and therefore we can open it now with Visual Studio:

./SensorsStreamProcessor.sln

Note: most of the above mentioned commands could have been achieved directly from Visual Studio.

Register the KSqlDbContext

Add 2 new folders named KSqlDb and Model where we want to place all our database related code. In VS right click the project with name KSqlDB.RestApi.Client.Sensors and select Add->New Folder.

Add a new file Sensor.cs in the Model directory using the following code:

namespace ksqlDB.RestApi.Client.Sensors.Model;

public record Sensor
{
  public int Id { get; init; }

  public string Name { get; init; } = null!;

  public bool IsAvailable { get; init; }
}

Next add a file named SensorsKSqlDbContext.cs in the KSqlDb directory using the following code:

using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.Sensors.Model;

namespace ksqlDB.RestApi.Client.Sensors.KSqlDb;

internal interface ISensorsKSqlDbContext : IKSqlDBContext
{
  IQbservable<Sensor> Sensors { get; }
}

internal class SensorsKSqlDbContext : KSqlDBContext, ISensorsKSqlDbContext
{
  public SensorsKSqlDbContext(string ksqlDbUrl, ILoggerFactory? loggerFactory = null) 
    : base(ksqlDbUrl, loggerFactory)
  {
  }

  public SensorsKSqlDbContext(KSqlDBContextOptions contextOptions, ILoggerFactory? loggerFactory = null) 
    : base(contextOptions, loggerFactory)
  {
  }

  public IQbservable<Sensor> Sensors => CreateQueryStream<Sensor>();
}

SensorsKSqlDbContext can be registered now with the IServiceCollection. Open Program.cs file and edit it with the bellow code:

using ksqlDB.RestApi.Client.Sensors;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDB.RestApi.Client.Sensors.KSqlDb;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
      var ksqlDbUrl = @"http:\\localhost:8088";

      services.AddDbContext<ISensorsKSqlDbContext, SensorsKSqlDbContext>(
        options =>
        {
          var setupParameters = options.UseKSqlDb(ksqlDbUrl);

          setupParameters.SetAutoOffsetReset(AutoOffsetReset.Earliest);

        }, ServiceLifetime.Transient, restApiLifetime: ServiceLifetime.Transient);

      services.AddHostedService<Worker>();
    })
    .Build();

await host.RunAsync();

Create a worker with a ksqldb push query

We are now ready to prepare our first KSqlDb push query. Replace the content of the Worker.cs class with the following code:

using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.RestApi;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements;
using ksqlDB.RestApi.Client.Sensors.KSqlDb;
using ksqlDB.RestApi.Client.Sensors.Model;

namespace ksqlDB.RestApi.Client.Sensors;

internal class Worker : IHostedService
{
  private readonly ISensorsKSqlDbContext kSqlDbContext;
  private readonly IKSqlDbRestApiClient kSqlDbRestApiClient;
  private readonly ILogger<Worker> logger;

  public Worker(ISensorsKSqlDbContext kSqlDbContext, IKSqlDbRestApiClient kSqlDbRestApiClient, ILogger<Worker> logger)
  {
    this.kSqlDbContext = kSqlDbContext;
    this.kSqlDbRestApiClient = kSqlDbRestApiClient;
    this.logger = logger;
  }

  public async Task StartAsync(CancellationToken cancellationToken)
  {
    logger.LogInformation("Worker started at: {time}", DateTimeOffset.Now);
    
    EntityCreationMetadata metadata = new()
    {
      KafkaTopic = nameof(Sensor),
      Partitions = 3,
      Replicas = 1
    };

    var httpResponseMessage = await kSqlDbRestApiClient.CreateStreamAsync<Sensor>(metadata, ifNotExists: true, cancellationToken: cancellationToken).ConfigureAwait(false);
   
    SubscribeToSensors();
  }

  private IDisposable subscription = null!;

  private void SubscribeToSensors()
  {
    subscription = kSqlDbContext.Sensors
      .Where(c => c.IsAvailable)
      .Subscribe(
          onNext: sensor =>
          {
            Console.WriteLine($"{nameof(Sensor)}: {sensor.Id} - {sensor.Name}");
            Console.WriteLine();
          },
          onError: error => { Console.WriteLine($"Exception: {error.Message}"); },
          onCompleted: () => Console.WriteLine("Completed"));
  }

  public Task StopAsync(CancellationToken cancellationToken)
  {
    logger.LogInformation("Stopping.");

    using (subscription)
    {
    }

    return Task.CompletedTask;
  }
}

Create a local KSqlDb server with Docker-desktop or use your existing cluster.

Run the application by pressing F5 in Visual Studio. The application will try to create a sensors stream and subscribe to it.

Congratulations! You've just successfully created your stream processor with ksqlDB.RestApi.Client.

Clone this wiki locally