- 
                Notifications
    You must be signed in to change notification settings 
- Fork 3.1k
[exporter/kafka_exporter]: Add topic-based partitioning support for grouping same topics on the same partition #42925
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| 
 
 | 
d215b99    to
    fc26e10      
    Compare
  
    8a58549    to
    354914c      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @karakayasemi. Adding the topic to the hash sounds good, but I think it needs to be additive rather than replacing the resource attributes hash.
| if e.config.TopicFromAttribute != "" { | ||
| if rv, ok := resourceLogs.Resource().Attributes().Get(e.config.TopicFromAttribute); ok && rv.Str() != "" { | ||
| tmpMap := pcommon.NewMap() | ||
| tmpMap.PutBool(rv.Str(), true) | ||
| hash = pdatautil.MapHash(tmpMap) | ||
| } | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem quite right: this will completely ignore the resource attributes, as you're overwriting hash. Can we combine them?
How about this?
m := resourceLogs.Resource().Attributes()
if e.config.TopicFromAttribute != "" {
	if rv, ok := resourceLogs.Resource().Attributes().Get(e.config.TopicFromAttribute); ok && rv.Str() != "" {
		m2 := pcommon.NewMap()
		m2.PutStr("topic", rv.Str())
		m.CopyTo(m2.PutEmptyMap("resource"))
		m = m2
	}
}
hash := pdatautil.MapHash(m)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @axw, thank you for the review. I simplified the logic mainly to resolve the wrong-topic issue.
When partitioning is enabled, the topic is already included in the resource attributes, so the hash will naturally differ when the topic value differs. So, I decided not to touch the current partitioning logic.
When partitioning is disabled, the problem mentioned in the issue occurs. Everything goes within a single batch and with a single topic, but in that case, we still need to at least separate by topic. That’s the behavior I am aiming for in the new implementation. Could you please review one more time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@karakayasemi sorry, I think I was a bit confused before and probably read the line above your addition as if e.config.PartitionLogsByResourceAttributes rather than if !...
Reading it again, I'm not sure why setting topic_from_attribute should control partitioning? They should be separate concepts. If you're just trying to fix #37470, I think we can skip the hashing and just yield one result per ResourceX.
I'll leave a new comment to show what I mean.
6c1109a    to
    ff25471      
    Compare
  
    …rtition by topic for correct topic routing.
ff25471    to
    b197464      
    Compare
  
    | hash := pdatautil.ValueHash(pcommon.NewValueStr("")) | ||
| if rv, ok := resourceLogs.Resource().Attributes().Get(e.config.TopicFromAttribute); ok && rv.Str() != "" { | ||
| hash = pdatautil.ValueHash(pcommon.NewValueStr(rv.Str())) | ||
| } | ||
| newLogs := plog.NewLogs() | ||
| resourceLogs.CopyTo(newLogs.ResourceLogs().AppendEmpty()) | ||
| if !yield(hash[:], newLogs) { | ||
| return | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| hash := pdatautil.ValueHash(pcommon.NewValueStr("")) | |
| if rv, ok := resourceLogs.Resource().Attributes().Get(e.config.TopicFromAttribute); ok && rv.Str() != "" { | |
| hash = pdatautil.ValueHash(pcommon.NewValueStr(rv.Str())) | |
| } | |
| newLogs := plog.NewLogs() | |
| resourceLogs.CopyTo(newLogs.ResourceLogs().AppendEmpty()) | |
| if !yield(hash[:], newLogs) { | |
| return | |
| } | |
| newLogs := plog.NewLogs() | |
| resourceLogs.CopyTo(newLogs.ResourceLogs().AppendEmpty()) | |
| if !yield(nil, newLogs) { | |
| return | |
| } | 
I think this is all that's needed? i.e. rather than yielding the original ld, yield one per ResourceLogs within it.
| This PR was marked stale due to lack of activity. It will be closed in 14 days. | 
Description
When TopicFromAttribute is configured, messages with the same topic attribute value are grouped on the same partition for correct topic routing.
Link to tracking issue
Fixes #37470
Testing
Documentation