-
Notifications
You must be signed in to change notification settings - Fork 27
ksqlDB.RestApi.Client workshop
- Visual Studio 2022
- .NET 5.0 or higher
- create a new solution and worker service project
mkdir ksqlDB.RestApi.Client.Sensors
cd ksqlDB.RestApi.Client.Sensors
Add a nuget.config
file in the root of your project repository:
dotnet new nugetconfig
If you haven't installed the latest .NET preview version, you can include a global.json
file and specify a preferred SDK version for overriding:
dotnet --list-sdks
The following example shows the global.json
syntax:
{
"sdk": {
"version": "6.0.302"
}
}
Now we are ready to create our solution:
dotnet new sln -n SensorsStreamProcessor
dotnet new worker -n ksqlDB.RestApi.Client.Sensors
dotnet sln add ksqlDB.RestApi.Client.Sensors
Add a reference to the ksqlDB.RestApi.Client
NuGet package.
dotnet add ksqlDB.RestApi.Client.Sensors package ksqlDB.RestApi.Client --version 3.1.0
We prepared the basic structure of the solution and therefore we can open it now with Visual Studio:
./SensorsStreamProcessor.sln
Note: Most of the above-mentioned commands could have been achieved directly from Visual Studio.
Add two new folders named 'KSqlDb' and 'Model' where we want to place all our database-related code. In Visual Studio right-click the project with the name KSqlDB.RestApi.Client.Sensors
and select Add->New Folder.
Add a new file Sensor.cs
in the 'Model' directory using the following code:
namespace ksqlDB.RestApi.Client.Sensors.Model;
public record Sensor
{
public int Id { get; init; }
public string Name { get; init; } = null!;
public bool IsAvailable { get; init; }
}
Next, add a file named SensorsKSqlDbContext.cs
in the 'KSqlDb' directory using the following code:
using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.Sensors.Model;
namespace ksqlDB.RestApi.Client.Sensors.KSqlDb;
internal interface ISensorsKSqlDbContext : IKSqlDBContext
{
IQbservable<Sensor> Sensors { get; }
}
internal class SensorsKSqlDbContext : KSqlDBContext, ISensorsKSqlDbContext
{
public SensorsKSqlDbContext(string ksqlDbUrl, ILoggerFactory? loggerFactory = null)
: base(ksqlDbUrl, loggerFactory)
{
}
public SensorsKSqlDbContext(KSqlDBContextOptions contextOptions, ILoggerFactory? loggerFactory = null)
: base(contextOptions, loggerFactory)
{
}
public IQbservable<Sensor> Sensors => CreateQueryStream<Sensor>();
}
SensorsKSqlDbContext
can be registered now with the IServiceCollection
. Open Program.cs
file and edit it with the below code:
using ksqlDB.RestApi.Client.Sensors;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDB.RestApi.Client.Sensors.KSqlDb;
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
var ksqlDbUrl = @"http:\\localhost:8088";
services.AddDbContext<ISensorsKSqlDbContext, SensorsKSqlDbContext>(
options =>
{
var setupParameters = options.UseKSqlDb(ksqlDbUrl);
setupParameters.SetAutoOffsetReset(AutoOffsetReset.Earliest);
}, ServiceLifetime.Transient, restApiLifetime: ServiceLifetime.Transient);
services.AddHostedService<Worker>();
})
.Build();
await host.RunAsync();
In this example, the 'Worker' class implements the IHostedService
interface. It performs an asynchronous operation to create a stream and then subscribes to it.
We can now proceed to set up our initial ksqlDB
push query. Simply replace the code within the Worker.cs
class with the following snippet:
using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.RestApi;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements;
using ksqlDB.RestApi.Client.Sensors.KSqlDb;
using ksqlDB.RestApi.Client.Sensors.Model;
namespace ksqlDB.RestApi.Client.Sensors;
internal class Worker : IHostedService
{
private readonly ISensorsKSqlDbContext kSqlDbContext;
private readonly IKSqlDbRestApiClient kSqlDbRestApiClient;
private readonly ILogger<Worker> logger;
public Worker(ISensorsKSqlDbContext kSqlDbContext, IKSqlDbRestApiClient kSqlDbRestApiClient, ILogger<Worker> logger)
{
this.kSqlDbContext = kSqlDbContext;
this.kSqlDbRestApiClient = kSqlDbRestApiClient;
this.logger = logger;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Worker started at: {time}", DateTimeOffset.Now);
EntityCreationMetadata metadata = new()
{
KafkaTopic = nameof(Sensor),
Partitions = 3,
Replicas = 1
};
var httpResponseMessage = await kSqlDbRestApiClient.CreateStreamAsync<Sensor>(metadata, ifNotExists: true, cancellationToken: cancellationToken).ConfigureAwait(false);
SubscribeToSensors();
}
private IDisposable subscription = null!;
private void SubscribeToSensors()
{
subscription = kSqlDbContext.Sensors
.Where(c => c.IsAvailable)
.Subscribe(
onNext: sensor =>
{
Console.WriteLine($"{nameof(Sensor)}: {sensor.Id} - {sensor.Name}");
Console.WriteLine();
},
onError: error => { Console.WriteLine($"Exception: {error.Message}"); },
onCompleted: () => Console.WriteLine("Completed"));
}
public Task StopAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Stopping.");
using (subscription)
{
}
return Task.CompletedTask;
}
}
The StopAsync
method in the IHostedService
interface of .NET is responsible for gracefully stopping the hosted service. It is called when the application is shutting down or when the service needs to be stopped due to certain conditions.
If you are working with an IQbservable<T>
that implements IDisposable
, you can use the using operator to ensure that the observable is properly disposed of when it completes or encounters an error.
In this example, the 'subscription' field holds the subscription object returned by the Subscribe
method of the observable. In the StopAsync
method, the 'subscription' object is disposed of to release any resources associated with it.
Create a local KSqlDb server with Docker-desktop or use your existing cluster.
Run the application by pressing F5 in Visual Studio. The application will try to create a sensor stream and subscribe to it.
Congratulations! You've just successfully created your stream processor with ksqlDB.RestApi.Client.