-
I have two Kafka topics:
Is there a way to filter messages from events topic based on the latest message of config topic? That way I've expected the following output:
Another option would be to load the latest message from config topic to something like an enrichment table in-memory, so we could do the filter via VRL, but I didn't find a way to do this. I've put Lua transform as a second option because performance is very important for this application. We're doing a benchmark with some streaming platforms and Vector is a candidate. filter_events:
type: lua
inputs:
- parse_events
- parse_config
version: '2'
source: require('filter-events')
hooks:
process: filter_events
init: init Where function init()
clients = {}
end
function filter_events(event, emit)
-- handle config event and store clients to be filtered in-memory
if event.log.topic == "config" then
clients = {}
for _, config in pairs(event.log.message) do
clients[config.client] = true
end
return
end
-- handle log event and filter based on configured clientes
if clients[event.log.client] then
emit(event)
end
end That way messages from both topics will be handled on the same transform, so we've to check each message and also iterate over the list of clients to update the memory table. It seems that's not the best way to do this. Thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
No, I don't believe there is any way to do this currently in VRL. We have discussed additional sources for things like enrichment tables that could fit your needs (such as database files or Redis), but you would still have to be able to move the config data into that database. Within Vector, the only path for that would be the |
Beta Was this translation helpful? Give feedback.
No, I don't believe there is any way to do this currently in VRL. We have discussed additional sources for things like enrichment tables that could fit your needs (such as database files or Redis), but you would still have to be able to move the config data into that database. Within Vector, the only path for that would be the
lua
transform as you have done.