Description
Hi,
I have a use case where I need to reuse a pull query
for multiple topics with different types of columns to be filtered by. My requirement is that the same call to Kafka is generic and reused for different types of data.
For example I have a materialized view for Sports
("view_sports") which contains records with values
[
{ "SportId": 1, "SportName": "Soccer" },
{ "SportId": 2, "SportName": "Basketball" }
]
And another materialized view for Categories
("view_categories"):
[
{ "SportId": 1, "CategoryId": "1", "CategoryName": "England" },
{ "SportId": 1, "CategoryId": "2", "CategoryName": "Spain" },
{ "SportId": 2, "CategoryId": "3", "CategoryName": "USA" }
]
I am trying to have a pull query
which should be a single generic call to kafka, which can query data from both (or more) of these materialized views, and also be able to take corresponding filters for their respective column names.
For example, to be able to query all data when needed like the following
SELECT * FROM view_categories;
as well as add different column name parameters as filters (for different topics):
SELECT * FROM view_categories WHERE CategoryId = '1';
or
SELECT * FROM view_sports WHERE Sport = '2';
So basically I would need to be able to return a List from multiple topics and be able to filter the different topics dynamically.
I have tried something like declaring different KSqlDbStatement
and hardcoding them, while adding SessionVariables
and using a switch to change the statements, like the following:
string categoryId = "1";
string sportId = "1";
KSqlDbStatement statementCategories = new("SELECT * FROM view_categories WHERE CategoryId = ${categoryId};")
{
SessionVariables = new Dictionary<string, object> { { "categoryId", categoryId } }
};
KSqlDbStatement statementSports = new("SELECT * FROM view_sports WHERE SportId = ${sportId};")
{
SessionVariables = new Dictionary<string, object> { { "sportId", sportId} }
};
var response = await kSqlDbRestApiClient.ExecuteStatementAsync(statementCategories);
var content = await response.Content.ReadAsStringAsync();
var list = System.Text.Json.JsonSerializer.Deserialize<IEnumerable<JsonElement>>(content);
The following piece of code returns a BadRequest
in the response object. Am I maybe sending the variable wrong?
Alternatively, going with the KSqlDbContext
object instead of KSqlDbRestApiClient
, I saw some examples that show this is possible, however because they are strongly typed this does not fit my use case. Is there a way to define multiple column names which would be hardcoded, but ultimately return a List<object>
instead of a strongly typed object?
string sportId = null; // for ex would be null if we do not wanna filter by this column
string categoryId = "1";
string materializedView = "view_categories";
List<object> list = await context.CreatePullQuery<object>(materializedView)
.Where(sportId == null || "SportId" == sportId) // does not compile, just for clarity what I am trying to achieve
.Where(categoryId == null || "CategoryId" == categoryId) // does not compile, just for clarity what I am trying to achieve
.GetManyAsync()
.ToListAsync();
Any help is appreciated, thanks in advance!