Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/feat_42557.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: "processor/k8sattributesprocessor"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add support for k8s.cronjob.uid attribute in k8sattributesprocessor"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [42557]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
20 changes: 10 additions & 10 deletions processor/k8sattributesprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The processor stores the list of running pods and the associated metadata. When
to the pod from where the datapoint originated, so we can add the relevant pod metadata to the datapoint. By default, it associates the incoming connection IP
to the Pod IP. But for cases where this approach doesn't work (sending through a proxy, etc.), a custom association rule can be specified.

Each association is specified as a list of sources of associations. The maximum number of sources within an association is 4.
Each association is specified as a list of sources of associations. The maximum number of sources within an association is 4.
A source is a rule that matches metadata from the datapoint to pod metadata.
In order to get an association applied, all the sources specified need to match.

Expand Down Expand Up @@ -63,16 +63,16 @@ If Pod association rules are not configured, resources are associated with metad

Which metadata to collect is determined by `metadata` configuration that defines list of resource attributes
to be added. Items in the list called exactly the same as the resource attributes that will be added.
The following attributes are added by default:
The following attributes are added by default:
- k8s.namespace.name
- k8s.pod.name
- k8s.pod.uid
- k8s.pod.start_time
- k8s.deployment.name
- k8s.node.name

These attributes are also available for the use within association rules by default.
The `metadata` section can also be extended with additional attributes which, if present in the `metadata` section,
These attributes are also available for the use within association rules by default.
The `metadata` section can also be extended with additional attributes which, if present in the `metadata` section,
are then also available for the use within association rules. Available attributes are:
- k8s.namespace.name
- k8s.pod.name
Expand Down Expand Up @@ -100,7 +100,7 @@ are then also available for the use within association rules. Available attribut
- [service.instance.id](https://opentelemetry.io/docs/specs/semconv/non-normative/k8s-attributes/#how-serviceinstanceid-should-be-calculated)(cannot be used for source rules in the pod_association)
- Any tags extracted from the pod labels and annotations, as described in [extracting attributes from pod labels and annotations](#extracting-attributes-from-pod-labels-and-annotations)

Not all the attributes are guaranteed to be added. Only attribute names from `metadata` should be used for
Not all the attributes are guaranteed to be added. Only attribute names from `metadata` should be used for
pod_association's `resource_attribute`, because empty or non-existing values will be ignored.

Additional container level attributes can be extracted. If a pod contains more than one container,
Expand Down Expand Up @@ -204,7 +204,7 @@ the processor associates the received trace to the pod, based on the connection
"k8s.pod.name": "telemetrygen-pod",
"k8s.pod.uid": "038e2267-b473-489b-b48c-46bafdb852eb",
"container.image.name": "telemetrygen",
"container.image.tag": "0.112.0",
"container.image.tag": "0.112.0",
"container.image.repo_digests": ["ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:b248ef911f93ae27cbbc85056d1ffacc87fd941bbdc2ffd951b6df8df72b8096"]
}
}
Expand Down Expand Up @@ -262,16 +262,16 @@ extract:
from: node
```

## Configuring recommended resource attributes
## Configuring recommended resource attributes

The processor can be configured to set the
The processor can be configured to set the
[recommended resource attributes](https://opentelemetry.io/docs/specs/semconv/non-normative/k8s-attributes/):

- `otel_annotations` will translate `resource.opentelemetry.io/foo` to the `foo` resource attribute, etc.

```yaml
extract:
otel_annotations: true
otel_annotations: true
metadata:
- service.namespace
- service.name
Expand Down Expand Up @@ -306,7 +306,7 @@ k8sattributes:
- tag_name: app.label.component
key: app.kubernetes.io/component
from: pod
otel_annotations: true
otel_annotations: true
pod_association:
- sources:
# This rule associates all resources containing the 'k8s.pod.ip' attribute with the matching pods. If this attribute is not present in the resource, this rule will not be able to find the matching pod.
Expand Down
6 changes: 6 additions & 0 deletions processor/k8sattributesprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type fakeClient struct {
Deployments map[string]*kube.Deployment
StatefulSets map[string]*kube.StatefulSet
DaemonSets map[string]*kube.DaemonSet
ReplicaSets map[string]*kube.ReplicaSet
Jobs map[string]*kube.Job
StopCh chan struct{}
}
Expand Down Expand Up @@ -90,6 +91,11 @@ func (f *fakeClient) GetDaemonSet(daemonsetUID string) (*kube.DaemonSet, bool) {
return s, ok
}

func (f *fakeClient) GetReplicaSet(replicaSetUID string) (*kube.ReplicaSet, bool) {
rs, ok := f.ReplicaSets[replicaSetUID]
return rs, ok
}

func (f *fakeClient) GetJob(jobUID string) (*kube.Job, bool) {
j, ok := f.Jobs[jobUID]
return j, ok
Expand Down
5 changes: 3 additions & 2 deletions processor/k8sattributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (cfg *Config) Validate() error {
string(conventions.K8SDaemonSetNameKey), string(conventions.K8SDaemonSetUIDKey),
string(conventions.K8SStatefulSetNameKey), string(conventions.K8SStatefulSetUIDKey),
string(conventions.K8SJobNameKey), string(conventions.K8SJobUIDKey),
string(conventions.K8SCronJobNameKey),
string(conventions.K8SCronJobNameKey), string(conventions.K8SCronJobUIDKey),
string(conventions.K8SNodeNameKey), string(conventions.K8SNodeUIDKey),
string(conventions.K8SContainerNameKey), string(conventions.ContainerIDKey),
string(conventions.ContainerImageNameKey), string(conventions.ContainerImageTagKey),
Expand Down Expand Up @@ -139,7 +139,8 @@ type ExtractConfig struct {
// k8s.node.name, k8s.namespace.name, k8s.pod.start_time,
// k8s.replicaset.name, k8s.replicaset.uid,
// k8s.daemonset.name, k8s.daemonset.uid,
// k8s.job.name, k8s.job.uid, k8s.cronjob.name,
// k8s.job.name, k8s.job.uid,
// k8s.cronjob.name, k8s.cronjob.uid,
// k8s.statefulset.name, k8s.statefulset.uid,
// k8s.container.name, container.id, container.image.name,
// container.image.tag, container.image.repo_digests
Expand Down
1 change: 1 addition & 0 deletions processor/k8sattributesprocessor/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
| k8s.cluster.uid | Gives cluster uid identified with kube-system namespace | Any Str | false |
| k8s.container.name | The name of the Container in a Pod template. Requires container.id. | Any Str | false |
| k8s.cronjob.name | The name of the CronJob. | Any Str | false |
| k8s.cronjob.uid | The uid of the CronJob. | Any Str | false |
| k8s.daemonset.name | The name of the DaemonSet. | Any Str | false |
| k8s.daemonset.uid | The UID of the DaemonSet. | Any Str | false |
| k8s.deployment.name | The name of the Deployment. | Any Str | true |
Expand Down
55 changes: 52 additions & 3 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type InformersFactoryList struct {
newInformer InformerProvider
newNamespaceInformer InformerProviderNamespace
newReplicaSetInformer InformerProviderWorkload
newJobInformer InformerProviderWorkload
}

// New initializes a new k8s Client.
Expand Down Expand Up @@ -279,8 +280,24 @@ func New(
c.daemonsetInformer = newDaemonSetSharedInformer(c.kc, c.Filters.Namespace)
}

if c.extractJobLabelsAnnotations() {
c.jobInformer = newJobSharedInformer(c.kc, c.Filters.Namespace)
if c.extractJobLabelsAnnotations() || rules.CronJobUID {
if informersFactory.newJobInformer == nil {
informersFactory.newJobInformer = newJobSharedInformer
}
c.jobInformer = informersFactory.newJobInformer(c.kc, c.Filters.Namespace)
err = c.jobInformer.SetTransform(
func(object any) (any, error) {
originalJob, success := object.(*batch_v1.Job)
if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing
return object, nil
}

return removeUnnecessaryJobData(originalJob), nil
},
)
if err != nil {
return nil, err
}
}

return c, err
Expand Down Expand Up @@ -809,7 +826,8 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
c.Rules.JobUID || c.Rules.JobName ||
c.Rules.StatefulSetUID || c.Rules.StatefulSetName ||
c.Rules.DeploymentName || c.Rules.DeploymentUID ||
c.Rules.CronJobName || c.Rules.ServiceName {
c.Rules.CronJobUID || c.Rules.CronJobName ||
c.Rules.ServiceName {
for _, ref := range pod.OwnerReferences {
switch ref.Kind {
case "ReplicaSet":
Expand Down Expand Up @@ -886,6 +904,13 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
}
}
}
if c.Rules.CronJobUID {
if job, ok := c.GetJob(string(ref.UID)); ok {
if job.CronJob.UID != "" {
tags[string(conventions.K8SCronJobUIDKey)] = job.CronJob.UID
}
}
}
}
}
}
Expand Down Expand Up @@ -1698,6 +1723,16 @@ func (c *WatchClient) addOrUpdateJob(job *batch_v1.Job) {
}
newJob.Attributes = c.extractJobAttributes(job)

for _, ownerReference := range job.OwnerReferences {
if ownerReference.Kind == "CronJob" && ownerReference.Controller != nil && *ownerReference.Controller {
newJob.CronJob = CronJob{
Name: ownerReference.Name,
UID: string(ownerReference.UID),
}
break
}
}

c.m.Lock()
if job.UID != "" {
c.Jobs[string(job.UID)] = newJob
Expand Down Expand Up @@ -1782,6 +1817,20 @@ func removeUnnecessaryReplicaSetData(replicaset *apps_v1.ReplicaSet) *apps_v1.Re
return &transformedReplicaset
}

// This function removes all data from the Job except what is required by extraction rules
func removeUnnecessaryJobData(job *batch_v1.Job) *batch_v1.Job {
transformedJob := batch_v1.Job{
ObjectMeta: meta_v1.ObjectMeta{
Name: job.GetName(),
Namespace: job.GetNamespace(),
UID: job.GetUID(),
},
}

transformedJob.SetOwnerReferences(job.GetOwnerReferences())
return &transformedJob
}

// runInformerWithDependencies starts the given informer. The second argument is a list of other informers that should complete
// before the informer is started. This is necessary e.g. for the pod informer which requires the replica set informer
// to be finished to correctly establish the connection to the replicaset/deployment it belongs to.
Expand Down
Loading
Loading