Skip to content

Generic Pull query with filters #97

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
fsimonovskiii opened this issue Oct 18, 2024 · 5 comments
Open

Generic Pull query with filters #97

fsimonovskiii opened this issue Oct 18, 2024 · 5 comments

Comments

@fsimonovskiii
Copy link

Hi,

I have a use case where I need to reuse a pull query for multiple topics with different types of columns to be filtered by. My requirement is that the same call to Kafka is generic and reused for different types of data.
For example I have a materialized view for Sports ("view_sports") which contains records with values

[
    { "SportId": 1, "SportName": "Soccer" },
    { "SportId": 2, "SportName": "Basketball" }
]

And another materialized view for Categories ("view_categories"):

[
    { "SportId": 1, "CategoryId": "1", "CategoryName": "England" },
    { "SportId": 1, "CategoryId": "2", "CategoryName": "Spain" },
    { "SportId": 2, "CategoryId": "3", "CategoryName": "USA" }
]

I am trying to have a pull query which should be a single generic call to kafka, which can query data from both (or more) of these materialized views, and also be able to take corresponding filters for their respective column names.
For example, to be able to query all data when needed like the following

SELECT * FROM view_categories;

as well as add different column name parameters as filters (for different topics):

SELECT * FROM view_categories WHERE CategoryId = '1';

or

SELECT * FROM view_sports WHERE Sport = '2';

So basically I would need to be able to return a List from multiple topics and be able to filter the different topics dynamically.

I have tried something like declaring different KSqlDbStatement and hardcoding them, while adding SessionVariables and using a switch to change the statements, like the following:

string categoryId = "1";
string sportId = "1";
KSqlDbStatement statementCategories = new("SELECT * FROM view_categories WHERE CategoryId = ${categoryId};")
{
    SessionVariables = new Dictionary<string, object> { { "categoryId", categoryId } }
};

KSqlDbStatement statementSports = new("SELECT * FROM view_sports WHERE SportId = ${sportId};")
{
    SessionVariables = new Dictionary<string, object> { { "sportId", sportId} }
};

var response = await kSqlDbRestApiClient.ExecuteStatementAsync(statementCategories);

var content = await response.Content.ReadAsStringAsync();

var list = System.Text.Json.JsonSerializer.Deserialize<IEnumerable<JsonElement>>(content);

The following piece of code returns a BadRequest in the response object. Am I maybe sending the variable wrong?

Alternatively, going with the KSqlDbContext object instead of KSqlDbRestApiClient, I saw some examples that show this is possible, however because they are strongly typed this does not fit my use case. Is there a way to define multiple column names which would be hardcoded, but ultimately return a List<object> instead of a strongly typed object?

string sportId = null; // for ex would be null if we do not wanna filter by this column
string categoryId = "1";
string materializedView = "view_categories";

List<object> list = await context.CreatePullQuery<object>(materializedView)
    .Where(sportId == null || "SportId" == sportId) // does not compile, just for clarity what I am trying to achieve
    .Where(categoryId == null || "CategoryId" == categoryId) // does not compile, just for clarity what I am trying to achieve
    .GetManyAsync()
    .ToListAsync();

Any help is appreciated, thanks in advance!

@tomasfabian
Copy link
Owner

Hello @fsimonovskiii,

Why isn’t it feasible for you to switch between different type-safe queries?

context.CreatePullQuery<Category>
context.CreatePullQuery<Sport>

Here’s a workaround you can try in the meantime, but you will need to handle the raw data in the response:

PropertyInfo? propertyInfo = typeof(KSqlDbStatement).GetProperty("EndpointType", BindingFlags.NonPublic | BindingFlags.Instance);

propertyInfo?.SetValue(statementCategories, EndpointType.Query);

@fsimonovskiii
Copy link
Author

Hello @tomasfabian

Thank you for the quick reply. I like the CreatePullQuery option better too, however due to the nature of this endpoint needing to be dynamic, I would need to define the different models per topic, and then create different contexts' depending on the filters needed. I was thinking of a wat where I could go with just a single definition of the context that would generate a dynamic object.

I was just wondering if there is some easy way to achieve this that I was not seeing clearly.

@tomasfabian
Copy link
Owner

tomasfabian commented Oct 18, 2024

Another option would be to expose a new function for the KSqlDbRestApiClient:

Task<HttpResponseMessage> ExecuteQueryAsync(
        KSqlDbQuery ksqlDbQuery,
        CancellationToken cancellationToken = default (CancellationToken));

Or something like this (just a prototype):

public class QueryResult ExecuteQueryAsync(KSqlDbQuery ksqlDbQuery, CancellationToken cancellationToken = default(CancellationToken));

public class QueryResult : HttpResponseMessage
{
  public IEnumerable<Row> Rows { get; }
}

public record Row : String
{
}

It would be great if you could share both your insights and your approach as well.

@tomasfabian
Copy link
Owner

tomasfabian commented Oct 18, 2024

Or perhaps it would be better to extend the KSqldDbContext, as the KSqlDbRestApiClient is already quite large:

public class QueryResult ExecutePullQueryAsync(KSqlDbQuery ksqlDbQuery, CancellationToken cancellationToken = default(CancellationToken));

@fsimonovskiii
Copy link
Author

fsimonovskiii commented Oct 18, 2024

Hey,

The ExecutePullQueryAsync sounds good, it would be good if possibly KSqlDbQuery could take some dynamic parameters, something similar to the Dapper approach. Maybe in this way we would be able to do a rename on the selected columns from the SELECT statement with an AS keyword or so. If this gets implemented as a new method in the future, I will explore this option as a potential refactor, since I do not say any other way to achieve what I want.

As for right now, I will probably up going with what you suggested:

context.CreatePullQuery<Category>
context.CreatePullQuery<Sport>

and make multiple strongly typed methods which should help me with the filters.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants