-
Notifications
You must be signed in to change notification settings - Fork 27
ksqlDB.RestApi.Client workshop
- Visual Studio 2022
- .NET 5.0 or higher
- create a new solution and worker service project
mkdir ksqlDB.RestApi.Client.Sensors
cd ksqlDB.RestApi.Client.Sensors
Add a nuget.config
file in the root of your project repository:
dotnet new nugetconfig
In case that you don't have installed the latest .NET preview version add a global.json
file with an overridden sdk version of your choice:
dotnet --list-sdks
The following example shows the global.json
syntax:
{
"sdk": {
"version": "6.0.302"
}
}
Now we are ready to create our solution:
dotnet new sln -n SensorsStreamProcessor
dotnet new worker -n ksqlDB.RestApi.Client.Sensors
dotnet sln add ksqlDB.RestApi.Client.Sensors
Add a reference to the ksqlDB.RestApi.Client NuGet package.
dotnet add ksqlDB.RestApi.Client.Sensors package ksqlDB.RestApi.Client --version 2.6.0
We prepared the basic structure of the solution and therefore we can open it now with Visual Studio:
./SensorsStreamProcessor.sln
Note: most of the above mentioned commands could have been achieved directly from Visual Studio.
Add 2 new folders named KSqlDb
and Model
where we want to place all our database related code. In VS right click the project with name KSqlDB.RestApi.Client.Sensors
and select Add->New Folder.
Add a new file Sensor.cs
in the Model directory using the following code:
namespace ksqlDB.RestApi.Client.Sensors.Model;
public record Sensor
{
public int Id { get; init; }
public string Name { get; init; } = null!;
public bool IsAvailable { get; init; }
}
Next add a file named SensorsKSqlDbContext.cs
in the KSqlDb
directory using the following code:
using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.Sensors.Model;
namespace ksqlDB.RestApi.Client.Sensors.KSqlDb;
internal interface ISensorsKSqlDbContext : IKSqlDBContext
{
IQbservable<Sensor> Sensors { get; }
}
internal class SensorsKSqlDbContext : KSqlDBContext, ISensorsKSqlDbContext
{
public SensorsKSqlDbContext(string ksqlDbUrl, ILoggerFactory? loggerFactory = null)
: base(ksqlDbUrl, loggerFactory)
{
}
public SensorsKSqlDbContext(KSqlDBContextOptions contextOptions, ILoggerFactory? loggerFactory = null)
: base(contextOptions, loggerFactory)
{
}
public IQbservable<Sensor> Sensors => CreateQueryStream<Sensor>();
}
SensorsKSqlDbContext
can be registered now with the IServiceCollection
. Open Program.cs
file and edit it with the bellow code:
using ksqlDB.RestApi.Client.Sensors;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDB.RestApi.Client.Sensors.KSqlDb;
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
var ksqlDbUrl = @"http:\\localhost:8088";
services.AddDbContext<ISensorsKSqlDbContext, SensorsKSqlDbContext>(
options =>
{
var setupParameters = options.UseKSqlDb(ksqlDbUrl);
setupParameters.SetAutoOffsetReset(AutoOffsetReset.Earliest);
}, ServiceLifetime.Transient, restApiLifetime: ServiceLifetime.Transient);
services.AddHostedService<Worker>();
})
.Build();
await host.RunAsync();
We are now ready to prepare our first KSqlDb push query. Replace the content of the Worker.cs
class with the following code:
using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.RestApi;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements;
using ksqlDB.RestApi.Client.Sensors.KSqlDb;
using ksqlDB.RestApi.Client.Sensors.Model;
namespace ksqlDB.RestApi.Client.Sensors;
internal class Worker : IHostedService
{
private readonly ISensorsKSqlDbContext kSqlDbContext;
private readonly IKSqlDbRestApiClient kSqlDbRestApiClient;
private readonly ILogger<Worker> logger;
public Worker(ISensorsKSqlDbContext kSqlDbContext, IKSqlDbRestApiClient kSqlDbRestApiClient, ILogger<Worker> logger)
{
this.kSqlDbContext = kSqlDbContext;
this.kSqlDbRestApiClient = kSqlDbRestApiClient;
this.logger = logger;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Worker started at: {time}", DateTimeOffset.Now);
EntityCreationMetadata metadata = new()
{
KafkaTopic = nameof(Sensor),
Partitions = 3,
Replicas = 1
};
var httpResponseMessage = await kSqlDbRestApiClient.CreateStreamAsync<Sensor>(metadata, ifNotExists: true, cancellationToken: cancellationToken).ConfigureAwait(false);
SubscribeToSensors();
}
private IDisposable subscription = null!;
private void SubscribeToSensors()
{
subscription = kSqlDbContext.Sensors
.Where(c => c.IsAvailable)
.Subscribe(onNext: sensor =>
{
Console.WriteLine($"{nameof(Sensor)}: {sensor.Id} - {sensor.Name}");
Console.WriteLine();
}, onError: error => { Console.WriteLine($"Exception: {error.Message}"); }, onCompleted: () => Console.WriteLine("Completed"));
}
public Task StopAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Stopping.");
using (subscription)
{
}
return Task.CompletedTask;
}
}
Create a local KSqlDb server with Docker-desktop or use your existing cluster.
Run the application by pressing F5 in Visual Studio. The application will try to create a sensors stream and subscribe to it.
Congratulations! You've just successfully created your stream processor with ksqlDB.RestApi.Client.