Skip to content

Conversation

@karakayasemi
Copy link
Contributor

@karakayasemi karakayasemi commented Sep 25, 2025

Description

When TopicFromAttribute is configured, messages with the same topic attribute value are grouped on the same partition for correct topic routing.

Link to tracking issue

Fixes #37470

Testing

  • Added topic_partitioning test cases to both logs and metrics partitioning tests
  • Tests verify that the same topic attributes get the same partition keys, different topics get different keys

Documentation

  • Created a changelog entry documenting the enhancement for end users

@karakayasemi karakayasemi requested review from a team, MovieStoreGuy and axw as code owners September 25, 2025 08:58
@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Sep 25, 2025

CLA Signed

The committers listed above are authorized under a signed CLA.

  • ✅ login: karakayasemi / name: Semih Serhat Karakaya (b197464)

@karakayasemi karakayasemi changed the title [exporterkafka_exporter]: Add topic-based partitioning support for grouping same topics on the same partition [exporter/kafka_exporter]: Add topic-based partitioning support for grouping same topics on the same partition Sep 25, 2025
@karakayasemi karakayasemi force-pushed the fix-issue-37470 branch 2 times, most recently from d215b99 to fc26e10 Compare September 25, 2025 09:02
@karakayasemi karakayasemi force-pushed the fix-issue-37470 branch 2 times, most recently from 8a58549 to 354914c Compare September 25, 2025 09:28
Copy link
Contributor

@axw axw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @karakayasemi. Adding the topic to the hash sounds good, but I think it needs to be additive rather than replacing the resource attributes hash.

Comment on lines 279 to 285
if e.config.TopicFromAttribute != "" {
if rv, ok := resourceLogs.Resource().Attributes().Get(e.config.TopicFromAttribute); ok && rv.Str() != "" {
tmpMap := pcommon.NewMap()
tmpMap.PutBool(rv.Str(), true)
hash = pdatautil.MapHash(tmpMap)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem quite right: this will completely ignore the resource attributes, as you're overwriting hash. Can we combine them?

How about this?

m := resourceLogs.Resource().Attributes()
if e.config.TopicFromAttribute != "" {
	if rv, ok := resourceLogs.Resource().Attributes().Get(e.config.TopicFromAttribute); ok && rv.Str() != "" {
		m2 := pcommon.NewMap()
		m2.PutStr("topic", rv.Str())
		m.CopyTo(m2.PutEmptyMap("resource"))
		m = m2
	}
}
hash := pdatautil.MapHash(m)

Copy link
Contributor Author

@karakayasemi karakayasemi Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @axw, thank you for the review. I simplified the logic mainly to resolve the wrong-topic issue.

When partitioning is enabled, the topic is already included in the resource attributes, so the hash will naturally differ when the topic value differs. So, I decided not to touch the current partitioning logic.

When partitioning is disabled, the problem mentioned in the issue occurs. Everything goes within a single batch and with a single topic, but in that case, we still need to at least separate by topic. That’s the behavior I am aiming for in the new implementation. Could you please review one more time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karakayasemi sorry, I think I was a bit confused before and probably read the line above your addition as if e.config.PartitionLogsByResourceAttributes rather than if !...

Reading it again, I'm not sure why setting topic_from_attribute should control partitioning? They should be separate concepts. If you're just trying to fix #37470, I think we can skip the hashing and just yield one result per ResourceX.

I'll leave a new comment to show what I mean.

@karakayasemi karakayasemi force-pushed the fix-issue-37470 branch 4 times, most recently from 6c1109a to ff25471 Compare September 29, 2025 10:32
Comment on lines +277 to +285
hash := pdatautil.ValueHash(pcommon.NewValueStr(""))
if rv, ok := resourceLogs.Resource().Attributes().Get(e.config.TopicFromAttribute); ok && rv.Str() != "" {
hash = pdatautil.ValueHash(pcommon.NewValueStr(rv.Str()))
}
newLogs := plog.NewLogs()
resourceLogs.CopyTo(newLogs.ResourceLogs().AppendEmpty())
if !yield(hash[:], newLogs) {
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
hash := pdatautil.ValueHash(pcommon.NewValueStr(""))
if rv, ok := resourceLogs.Resource().Attributes().Get(e.config.TopicFromAttribute); ok && rv.Str() != "" {
hash = pdatautil.ValueHash(pcommon.NewValueStr(rv.Str()))
}
newLogs := plog.NewLogs()
resourceLogs.CopyTo(newLogs.ResourceLogs().AppendEmpty())
if !yield(hash[:], newLogs) {
return
}
newLogs := plog.NewLogs()
resourceLogs.CopyTo(newLogs.ResourceLogs().AppendEmpty())
if !yield(nil, newLogs) {
return
}

I think this is all that's needed? i.e. rather than yielding the original ld, yield one per ResourceLogs within it.

@github-actions
Copy link
Contributor

This PR was marked stale due to lack of activity. It will be closed in 14 days.

@github-actions github-actions bot added the Stale label Oct 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

topic_from_attribute does not work as expected

4 participants