Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/feat_42557.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: "processor/k8sattributesprocessor"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add support for k8s.cronjob.uid attribute in k8sattributesprocessor"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [42557]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
5 changes: 2 additions & 3 deletions pkg/xk8stest/k8s_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package xk8stest // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"bytes"
"maps"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -39,9 +40,7 @@ func CreateCollectorObjects(t *testing.T, client *K8sClient, testID, manifestsDi
"HostEndpoint": host,
"TestID": testID,
}
for key, value := range templateValues {
defaultTemplateValues[key] = value
}
maps.Copy(defaultTemplateValues, templateValues)
require.NoError(t, tmpl.Execute(manifest, defaultTemplateValues))
obj, err := CreateObject(client, manifest.Bytes())
require.NoErrorf(t, err, "failed to create collector object from manifest %s", manifestFile.Name())
Expand Down
80 changes: 78 additions & 2 deletions pkg/xk8stest/k8s_telemetrygen.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package xk8stest // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"bytes"
"fmt"
"os"
"path/filepath"
"testing"
Expand All @@ -31,6 +32,78 @@ type TelemetrygenCreateOpts struct {
DataTypes []string
}

// getPodLabelSelectors returns labels used to select pods created by the workload.
// - Deployment/StatefulSet/DaemonSet: spec.selector.matchLabels (fallback to template.metadata.labels)
// - Job: spec.template.metadata.labels
// - CronJob: spec.jobTemplate.spec.template.metadata.labels
func getPodLabelSelectors(obj *unstructured.Unstructured) (map[string]any, error) {
o := obj.Object
spec, ok := o["spec"].(map[string]any)
if !ok || spec == nil {
return nil, fmt.Errorf("%s/%s missing spec", obj.GetKind(), obj.GetName())
}

switch obj.GetKind() {
case "Deployment", "StatefulSet", "DaemonSet":
if sel, ok := spec["selector"].(map[string]any); ok && sel != nil {
if ml, ok := sel["matchLabels"].(map[string]any); ok && ml != nil {
return ml, nil
}
}
// fallback — uncommon but robust
if tmpl, ok := spec["template"].(map[string]any); ok && tmpl != nil {
if meta, ok := tmpl["metadata"].(map[string]any); ok && meta != nil {
if ml, ok := meta["labels"].(map[string]any); ok && ml != nil {
return ml, nil
}
}
}
return nil, fmt.Errorf("%s/%s missing selector.matchLabels and template.metadata.labels", obj.GetKind(), obj.GetName())

case "Job":
if tmpl, ok := spec["template"].(map[string]any); ok && tmpl != nil {
if meta, ok := tmpl["metadata"].(map[string]any); ok && meta != nil {
if ml, ok := meta["labels"].(map[string]any); ok && ml != nil {
return ml, nil
}
}
}
// last resort if API server already defaulted it
if sel, ok := spec["selector"].(map[string]any); ok && sel != nil {
if ml, ok := sel["matchLabels"].(map[string]any); ok && ml != nil {
return ml, nil
}
}
return nil, fmt.Errorf("Job/%s missing template.metadata.labels (and selector.matchLabels)", obj.GetName())

case "CronJob":
jt, ok := spec["jobTemplate"].(map[string]any)
if !ok || jt == nil {
return nil, fmt.Errorf("CronJob/%s missing spec.jobTemplate", obj.GetName())
}
jts, ok := jt["spec"].(map[string]any)
if !ok || jts == nil {
return nil, fmt.Errorf("CronJob/%s missing spec.jobTemplate.spec", obj.GetName())
}
tmpl, ok := jts["template"].(map[string]any)
if !ok || tmpl == nil {
return nil, fmt.Errorf("CronJob/%s missing spec.jobTemplate.spec.template", obj.GetName())
}
meta, ok := tmpl["metadata"].(map[string]any)
if !ok || meta == nil {
return nil, fmt.Errorf("CronJob/%s missing spec.jobTemplate.spec.template.metadata", obj.GetName())
}
ml, ok := meta["labels"].(map[string]any)
if !ok || ml == nil {
return nil, fmt.Errorf("CronJob/%s missing spec.jobTemplate.spec.template.metadata.labels", obj.GetName())
}
return ml, nil

default:
return nil, fmt.Errorf("unsupported kind %q", obj.GetKind())
}
}

func CreateTelemetryGenObjects(t *testing.T, client *K8sClient, createOpts *TelemetrygenCreateOpts) ([]*unstructured.Unstructured, []*TelemetrygenObjInfo) {
telemetrygenObjInfos := make([]*TelemetrygenObjInfo, 0)
manifestFiles, err := os.ReadDir(createOpts.ManifestsDir)
Expand All @@ -48,10 +121,13 @@ func CreateTelemetryGenObjects(t *testing.T, client *K8sClient, createOpts *Tele
}))
obj, err := CreateObject(client, manifest.Bytes())
require.NoErrorf(t, err, "failed to create telemetrygen object from manifest %s", manifestFile.Name())
selector := obj.Object["spec"].(map[string]any)["selector"]

podLabels, err := getPodLabelSelectors(obj)
require.NoErrorf(t, err, "failed to extract pod label selectors for %s %s", obj.GetKind(), obj.GetName())

telemetrygenObjInfos = append(telemetrygenObjInfos, &TelemetrygenObjInfo{
Namespace: obj.GetNamespace(),
PodLabelSelectors: selector.(map[string]any)["matchLabels"].(map[string]any),
PodLabelSelectors: podLabels,
DataType: dataType,
Workload: obj.GetKind(),
})
Expand Down
22 changes: 11 additions & 11 deletions processor/k8sattributesprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The processor stores the list of running pods and the associated metadata. When
to the pod from where the datapoint originated, so we can add the relevant pod metadata to the datapoint. By default, it associates the incoming connection IP
to the Pod IP. But for cases where this approach doesn't work (sending through a proxy, etc.), a custom association rule can be specified.

Each association is specified as a list of sources of associations. The maximum number of sources within an association is 4.
Each association is specified as a list of sources of associations. The maximum number of sources within an association is 4.
A source is a rule that matches metadata from the datapoint to pod metadata.
In order to get an association applied, all the sources specified need to match.

Expand Down Expand Up @@ -63,16 +63,16 @@ If Pod association rules are not configured, resources are associated with metad

Which metadata to collect is determined by `metadata` configuration that defines list of resource attributes
to be added. Items in the list called exactly the same as the resource attributes that will be added.
The following attributes are added by default:
The following attributes are added by default:
- k8s.namespace.name
- k8s.pod.name
- k8s.pod.uid
- k8s.pod.start_time
- k8s.deployment.name
- k8s.node.name

These attributes are also available for the use within association rules by default.
The `metadata` section can also be extended with additional attributes which, if present in the `metadata` section,
These attributes are also available for the use within association rules by default.
The `metadata` section can also be extended with additional attributes which, if present in the `metadata` section,
are then also available for the use within association rules. Available attributes are:
- k8s.namespace.name
- k8s.pod.name
Expand Down Expand Up @@ -100,7 +100,7 @@ are then also available for the use within association rules. Available attribut
- [service.instance.id](https://opentelemetry.io/docs/specs/semconv/non-normative/k8s-attributes/#how-serviceinstanceid-should-be-calculated)(cannot be used for source rules in the pod_association)
- Any tags extracted from the pod labels and annotations, as described in [extracting attributes from pod labels and annotations](#extracting-attributes-from-pod-labels-and-annotations)

Not all the attributes are guaranteed to be added. Only attribute names from `metadata` should be used for
Not all the attributes are guaranteed to be added. Only attribute names from `metadata` should be used for
pod_association's `resource_attribute`, because empty or non-existing values will be ignored.

Additional container level attributes can be extracted. If a pod contains more than one container,
Expand Down Expand Up @@ -204,7 +204,7 @@ the processor associates the received trace to the pod, based on the connection
"k8s.pod.name": "telemetrygen-pod",
"k8s.pod.uid": "038e2267-b473-489b-b48c-46bafdb852eb",
"container.image.name": "telemetrygen",
"container.image.tag": "0.112.0",
"container.image.tag": "0.112.0",
"container.image.repo_digests": ["ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:b248ef911f93ae27cbbc85056d1ffacc87fd941bbdc2ffd951b6df8df72b8096"]
}
}
Expand Down Expand Up @@ -262,16 +262,16 @@ extract:
from: node
```

## Configuring recommended resource attributes
## Configuring recommended resource attributes

The processor can be configured to set the
The processor can be configured to set the
[recommended resource attributes](https://opentelemetry.io/docs/specs/semconv/non-normative/k8s-attributes/):

- `otel_annotations` will translate `resource.opentelemetry.io/foo` to the `foo` resource attribute, etc.

```yaml
extract:
otel_annotations: true
otel_annotations: true
metadata:
- service.namespace
- service.name
Expand Down Expand Up @@ -306,7 +306,7 @@ k8sattributes:
- tag_name: app.label.component
key: app.kubernetes.io/component
from: pod
otel_annotations: true
otel_annotations: true
pod_association:
- sources:
# This rule associates all resources containing the 'k8s.pod.ip' attribute with the matching pods. If this attribute is not present in the resource, this rule will not be able to find the matching pod.
Expand All @@ -325,7 +325,7 @@ k8sattributes:

## Cluster-scoped RBAC

If you'd like to set up the k8sattributesprocessor to receive telemetry from across namespaces, it will need `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.name` (which is enabled by default) or `k8s.deployment.uid` the processor also needs `get`, `watch` and `list` permissions for `replicasets` resources. When using `k8s.node.uid` or extracting metadata from `node`, the processor needs `get`, `watch` and `list` permissions for `nodes` resources.
If you'd like to set up the k8sattributesprocessor to receive telemetry from across namespaces, it will need `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.name` (which is enabled by default) or `k8s.deployment.uid` the processor also needs `get`, `watch` and `list` permissions for `replicasets` resources. When using `k8s.node.uid` or extracting metadata from `node`, the processor needs `get`, `watch` and `list` permissions for `nodes` resources. When using `k8s.cronjob.uid` the processor also needs `get`, `watch` and `list` permissions for `jobs` resources.

Here is an example of a `ClusterRole` to give a `ServiceAccount` the necessary permissions for all pods, nodes, and namespaces in the cluster (replace `<OTEL_COL_NAMESPACE>` with a namespace where collector is deployed):

Expand Down
6 changes: 6 additions & 0 deletions processor/k8sattributesprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type fakeClient struct {
Deployments map[string]*kube.Deployment
StatefulSets map[string]*kube.StatefulSet
DaemonSets map[string]*kube.DaemonSet
ReplicaSets map[string]*kube.ReplicaSet
Jobs map[string]*kube.Job
StopCh chan struct{}
}
Expand Down Expand Up @@ -90,6 +91,11 @@ func (f *fakeClient) GetDaemonSet(daemonsetUID string) (*kube.DaemonSet, bool) {
return s, ok
}

func (f *fakeClient) GetReplicaSet(replicaSetUID string) (*kube.ReplicaSet, bool) {
rs, ok := f.ReplicaSets[replicaSetUID]
return rs, ok
}

func (f *fakeClient) GetJob(jobUID string) (*kube.Job, bool) {
j, ok := f.Jobs[jobUID]
return j, ok
Expand Down
5 changes: 3 additions & 2 deletions processor/k8sattributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (cfg *Config) Validate() error {
string(conventions.K8SDaemonSetNameKey), string(conventions.K8SDaemonSetUIDKey),
string(conventions.K8SStatefulSetNameKey), string(conventions.K8SStatefulSetUIDKey),
string(conventions.K8SJobNameKey), string(conventions.K8SJobUIDKey),
string(conventions.K8SCronJobNameKey),
string(conventions.K8SCronJobNameKey), string(conventions.K8SCronJobUIDKey),
string(conventions.K8SNodeNameKey), string(conventions.K8SNodeUIDKey),
string(conventions.K8SContainerNameKey), string(conventions.ContainerIDKey),
string(conventions.ContainerImageNameKey), string(conventions.ContainerImageTagKey),
Expand Down Expand Up @@ -139,7 +139,8 @@ type ExtractConfig struct {
// k8s.node.name, k8s.namespace.name, k8s.pod.start_time,
// k8s.replicaset.name, k8s.replicaset.uid,
// k8s.daemonset.name, k8s.daemonset.uid,
// k8s.job.name, k8s.job.uid, k8s.cronjob.name,
// k8s.job.name, k8s.job.uid,
// k8s.cronjob.name, k8s.cronjob.uid,
// k8s.statefulset.name, k8s.statefulset.uid,
// k8s.container.name, container.id, container.image.name,
// container.image.tag, container.image.repo_digests
Expand Down
1 change: 1 addition & 0 deletions processor/k8sattributesprocessor/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
| k8s.cluster.uid | Gives cluster uid identified with kube-system namespace | Any Str | false |
| k8s.container.name | The name of the Container in a Pod template. Requires container.id. | Any Str | false |
| k8s.cronjob.name | The name of the CronJob. | Any Str | false |
| k8s.cronjob.uid | The uid of the CronJob. | Any Str | false |
| k8s.daemonset.name | The name of the DaemonSet. | Any Str | false |
| k8s.daemonset.uid | The UID of the DaemonSet. | Any Str | false |
| k8s.deployment.name | The name of the Deployment. | Any Str | true |
Expand Down
Loading
Loading