Skip to content

Commit 085a870

Browse files
authored
Migrate to OTel (#1590)
* feat: migrate eventing-rabbitmq to otel Signed-off-by: Calum Murray <cmurray@redhat.com> * chore: update deps Signed-off-by: Calum Murray <cmurray@redhat.com> * cleanup: fix formatting Signed-off-by: Calum Murray <cmurray@redhat.com> * drop direct use of go.opencensus.io Signed-off-by: Calum Murray <cmurray@redhat.com> * fix: unit tests, linter Signed-off-by: Calum Murray <cmurray@redhat.com> --------- Signed-off-by: Calum Murray <cmurray@redhat.com>
1 parent 97cb765 commit 085a870

File tree

126 files changed

+5628
-2852
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

126 files changed

+5628
-2852
lines changed

cmd/dispatcher/main.go

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,17 @@ import (
2121
"errors"
2222
"time"
2323

24+
"go.opentelemetry.io/otel"
25+
"go.opentelemetry.io/otel/metric"
2426
"go.uber.org/zap"
2527

26-
"github.com/google/uuid"
2728
"github.com/kelseyhightower/envconfig"
2829

29-
dispatcherstats "knative.dev/eventing-rabbitmq/pkg/broker/dispatcher"
3030
"knative.dev/eventing-rabbitmq/pkg/dispatcher"
3131
"knative.dev/eventing-rabbitmq/pkg/rabbit"
3232
"knative.dev/eventing-rabbitmq/pkg/utils"
3333
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
34-
"knative.dev/pkg/kmeta"
3534
"knative.dev/pkg/logging"
36-
"knative.dev/pkg/metrics"
3735
"knative.dev/pkg/signals"
3836
)
3937

@@ -61,10 +59,16 @@ type envConfig struct {
6159
Namespace string `envconfig:"NAMESPACE"`
6260
}
6361

62+
const (
63+
scopeName = "knative.dev/eventing-rabbitmq/cmd/dispatcher"
64+
)
65+
66+
var (
67+
latencyBounds = []float64{0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10}
68+
)
69+
6470
func main() {
6571
ctx := signals.NewContext()
66-
// Report stats on Go memory usage every 30 seconds.
67-
metrics.MemStatsOrDie(ctx)
6872

6973
var env envConfig
7074
if err := envconfig.Process("", &env); err != nil {
@@ -74,12 +78,15 @@ func main() {
7478
env.SetComponent(dispatcher.ComponentName)
7579
logger := env.GetLogger()
7680
ctx = logging.WithLogger(ctx, logger)
77-
if err := env.SetupTracing(); err != nil {
78-
logger.Errorw("Failed setting up trace publishing", zap.Error(err))
79-
}
80-
if err := env.SetupMetrics(ctx); err != nil {
81-
logger.Errorw("Failed to create the metrics exporter", zap.Error(err))
81+
if err := env.SetupObservability(ctx); err != nil {
82+
logger.Errorw("Failed setting up observability", zap.Error(err))
8283
}
84+
defer func() {
85+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
86+
defer cancel()
87+
88+
env.ShutdownObservability(ctx)
89+
}()
8390

8491
backoffPolicy := utils.SetBackoffPolicy(ctx, env.BackoffPolicy)
8592
if backoffPolicy == "" {
@@ -88,7 +95,6 @@ func main() {
8895
backoffDelay := env.BackoffDelay
8996
logging.FromContext(ctx).Infow("Setting BackoffDelay", zap.Any("backoffDelay", backoffDelay))
9097

91-
reporter := dispatcherstats.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()), env.Namespace)
9298
d := &dispatcher.Dispatcher{
9399
BrokerIngressURL: env.BrokerIngressURL,
94100
SubscriberURL: env.SubscriberURL,
@@ -98,12 +104,24 @@ func main() {
98104
Timeout: env.Timeout,
99105
BackoffPolicy: backoffPolicy,
100106
WorkerCount: env.Parallelism,
101-
Reporter: reporter,
102107
DLX: env.DLX,
103108
DLXName: env.DLXName,
109+
Tracer: otel.GetTracerProvider().Tracer(scopeName),
104110
}
105111

112+
meter := otel.GetMeterProvider().Meter(scopeName)
113+
106114
var err error
115+
d.DispatchDuration, err = meter.Float64Histogram(
116+
"kn.eventing.dispatch.duration",
117+
metric.WithDescription("The time to dispatch the event"),
118+
metric.WithUnit("s"),
119+
metric.WithExplicitBucketBoundaries(latencyBounds...),
120+
)
121+
if err != nil {
122+
logger.Fatalw("failed to set up dispatch metric", zap.Error(err))
123+
}
124+
107125
rmqHelper := rabbit.NewRabbitMQConnectionHandler(5, 1000, logger)
108126
rmqHelper.Setup(ctx, rabbit.VHostHandler(
109127
env.RabbitURL,

cmd/ingress/main.go

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"context"
2021
"errors"
2122
"fmt"
2223
"net/http"
@@ -25,18 +26,19 @@ import (
2526
cloudevents "github.com/cloudevents/sdk-go/v2"
2627
"github.com/cloudevents/sdk-go/v2/binding"
2728
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
28-
"github.com/google/uuid"
2929
"github.com/kelseyhightower/envconfig"
30-
"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
31-
"go.opencensus.io/trace"
30+
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
31+
"go.opentelemetry.io/otel"
32+
"go.opentelemetry.io/otel/metric"
33+
"go.opentelemetry.io/otel/propagation"
34+
"go.opentelemetry.io/otel/trace"
3235
"go.uber.org/zap"
33-
"knative.dev/eventing-rabbitmq/pkg/broker/ingress"
36+
"k8s.io/apimachinery/pkg/types"
3437
"knative.dev/eventing-rabbitmq/pkg/rabbit"
3538
"knative.dev/eventing-rabbitmq/pkg/utils"
3639
"knative.dev/eventing/pkg/kncloudevents"
37-
"knative.dev/pkg/kmeta"
40+
"knative.dev/eventing/pkg/observability"
3841
"knative.dev/pkg/logging"
39-
"knative.dev/pkg/metrics"
4042
"knative.dev/pkg/signals"
4143
)
4244

@@ -47,6 +49,13 @@ const (
4749

4850
// noDuration signals that the dispatch step hasn't started
4951
noDuration = -1
52+
53+
scopeName = "knative.dev/eventing-rabbitmq/cmd/ingress"
54+
)
55+
56+
var (
57+
latencyBounds = []float64{0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10}
58+
propagationContext = propagation.TraceContext{}
5059
)
5160

5261
type envConfig struct {
@@ -64,16 +73,14 @@ type envConfig struct {
6473
BrokerName string `envconfig:"BROKER_NAME"`
6574
BrokerNamespace string `envconfig:"BROKER_NAMESPACE"`
6675

67-
reporter ingress.StatsReporter
76+
tracer trace.Tracer
77+
dispatchDuration metric.Float64Histogram
6878
}
6979

7080
func main() {
7181
ctx := signals.NewContext()
7282
var err error
7383

74-
// Report stats on Go memory usage every 30 seconds.
75-
metrics.MemStatsOrDie(ctx)
76-
7784
var env envConfig
7885
if err = envconfig.Process("", &env); err != nil {
7986
logging.FromContext(ctx).Fatal("Failed to process env var: ", err)
@@ -83,18 +90,33 @@ func main() {
8390
logger := env.GetLogger()
8491
ctx = logging.WithLogger(ctx, logger)
8592

86-
if err = env.SetupTracing(); err != nil {
87-
logger.Errorw("failed setting up trace publishing", zap.Error(err))
93+
err = env.SetupObservability(ctx)
94+
if err != nil {
95+
logger.Errorw("Failed setting up observability", zap.Error(err))
8896
}
97+
defer func() {
98+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
99+
defer cancel()
89100

90-
if err = env.SetupMetrics(ctx); err != nil {
91-
logger.Errorw("failed to create the metrics exporter", zap.Error(err))
92-
}
101+
env.ShutdownObservability(ctx)
102+
}()
93103

94104
env.rmqHelper = rabbit.NewRabbitMQConnectionHandler(5, 1000, logger)
95105
env.rmqHelper.Setup(ctx, rabbit.VHostHandler(env.BrokerURL, env.RabbitMQVhost), rabbit.ChannelConfirm, rabbit.DialWrapper)
96106

97-
env.reporter = ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))
107+
env.tracer = otel.GetTracerProvider().Tracer(scopeName)
108+
109+
meter := otel.GetMeterProvider().Meter(scopeName)
110+
env.dispatchDuration, err = meter.Float64Histogram(
111+
"kn.eventing.dispatch.duration",
112+
metric.WithDescription("The time to dispatch the event"),
113+
metric.WithUnit("s"),
114+
metric.WithExplicitBucketBoundaries(latencyBounds...),
115+
)
116+
if err != nil {
117+
logger.Fatalf("failed to create dispatch metric: %s", err.Error())
118+
}
119+
98120
connectionArgs := kncloudevents.ConnectionArgs{
99121
MaxIdleConns: defaultMaxIdleConnections,
100122
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
@@ -141,29 +163,35 @@ func (env *envConfig) ServeHTTP(writer http.ResponseWriter, request *http.Reques
141163
return
142164
}
143165

144-
span := trace.FromContext(ctx)
145-
defer span.End()
166+
ctx = observability.WithBrokerLabels(ctx, types.NamespacedName{Name: env.BrokerName, Namespace: env.BrokerNamespace})
146167

147-
reporterArgs := &ingress.ReportArgs{
148-
Namespace: env.BrokerNamespace,
149-
BrokerName: env.BrokerName,
150-
EventType: event.Type(),
151-
}
168+
span := trace.SpanFromContext(ctx)
169+
defer func() {
170+
if span.IsRecording() {
171+
ctx = observability.WithEventLabels(ctx, event)
172+
labeler, _ := otelhttp.LabelerFromContext(ctx)
173+
span.SetAttributes(labeler.Get()...)
174+
}
175+
span.End()
176+
}()
152177

153-
statusCode, dispatchTime, err := env.send(event, span)
178+
statusCode, dispatchTime, err := env.send(ctx, event)
154179
if err != nil {
155180
logger.Errorw("failed to send event", zap.Error(err))
156181
}
157182
if dispatchTime > noDuration {
158-
_ = env.reporter.ReportEventDispatchTime(reporterArgs, statusCode, dispatchTime)
183+
labeler, _ := otelhttp.LabelerFromContext(ctx)
184+
env.dispatchDuration.Record(ctx, dispatchTime.Seconds(), metric.WithAttributes(labeler.Get()...))
159185
}
160-
_ = env.reporter.ReportEventCount(reporterArgs, statusCode)
161186

162187
writer.WriteHeader(statusCode)
163188
}
164189

165-
func (env *envConfig) send(event *cloudevents.Event, span *trace.Span) (int, time.Duration, error) {
166-
tp, ts := (&tracecontext.HTTPFormat{}).SpanContextToHeaders(span.SpanContext())
190+
func (env *envConfig) send(ctx context.Context, event *cloudevents.Event) (int, time.Duration, error) {
191+
headerCarrier := propagation.HeaderCarrier{}
192+
propagationContext.Inject(ctx, headerCarrier)
193+
tp := headerCarrier.Get("traceparent")
194+
ts := headerCarrier.Get("tracestate")
167195
channel := env.rmqHelper.GetChannel()
168196
if channel != nil {
169197
start := time.Now()

go.mod

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,31 @@ require (
1212
github.com/rabbitmq/amqp091-go v1.9.0
1313
github.com/rabbitmq/cluster-operator v1.14.0
1414
github.com/rabbitmq/messaging-topology-operator v1.12.0
15-
go.opencensus.io v0.24.0
15+
go.opencensus.io v0.24.0 // indirect
1616
go.uber.org/zap v1.27.0
1717
k8s.io/api v0.33.1
1818
k8s.io/apiextensions-apiserver v0.33.1
1919
k8s.io/apimachinery v0.33.1
2020
k8s.io/client-go v0.33.1
2121
k8s.io/code-generator v0.33.1
2222
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff
23-
knative.dev/eventing v0.45.2
23+
knative.dev/eventing v0.46.0
2424
knative.dev/hack v0.0.0-20250708013849-70d4b00da6ba
2525
knative.dev/pkg v0.0.0-20250716115900-19d3cc2da0b9
2626
knative.dev/reconciler-test v0.0.0-20250716152301-806fb6ae0da7
2727
sigs.k8s.io/controller-runtime v0.19.0
2828
)
2929

3030
require (
31-
github.com/google/uuid v1.6.0
31+
github.com/cloudevents/sdk-go/observability/opentelemetry/v2 v2.16.1
3232
github.com/rickb777/date v1.13.0
33+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0
34+
go.opentelemetry.io/contrib/instrumentation/runtime v0.62.0
35+
go.opentelemetry.io/otel v1.37.0
36+
go.opentelemetry.io/otel/metric v1.37.0
37+
go.opentelemetry.io/otel/sdk v1.37.0
38+
go.opentelemetry.io/otel/sdk/metric v1.37.0
39+
go.opentelemetry.io/otel/trace v1.37.0
3340
k8s.io/utils v0.0.0-20241210054802-24370beab758
3441
)
3542

@@ -68,6 +75,7 @@ require (
6875
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
6976
github.com/golang/protobuf v1.5.4 // indirect
7077
github.com/google/gnostic-models v0.6.9 // indirect
78+
github.com/google/uuid v1.6.0 // indirect
7179
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect
7280
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
7381
github.com/hashicorp/go-retryablehttp v0.7.7 // indirect
@@ -93,20 +101,13 @@ require (
93101
github.com/valyala/bytebufferpool v1.0.0 // indirect
94102
github.com/x448/float16 v0.8.4 // indirect
95103
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
96-
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 // indirect
97-
go.opentelemetry.io/contrib/instrumentation/runtime v0.62.0 // indirect
98-
go.opentelemetry.io/otel v1.37.0 // indirect
99104
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.37.0 // indirect
100105
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.37.0 // indirect
101106
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect
102107
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 // indirect
103108
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0 // indirect
104109
go.opentelemetry.io/otel/exporters/prometheus v0.59.0 // indirect
105110
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.37.0 // indirect
106-
go.opentelemetry.io/otel/metric v1.37.0 // indirect
107-
go.opentelemetry.io/otel/sdk v1.37.0 // indirect
108-
go.opentelemetry.io/otel/sdk/metric v1.37.0 // indirect
109-
go.opentelemetry.io/otel/trace v1.37.0 // indirect
110111
go.opentelemetry.io/proto/otlp v1.7.0 // indirect
111112
go.uber.org/atomic v1.10.0 // indirect
112113
go.uber.org/automaxprocs v1.6.0 // indirect

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ github.com/cloudevents/conformance v0.4.1 h1:MMq+jviOHHNINGWQdvr2yj4g8CdeL59M81O
104104
github.com/cloudevents/conformance v0.4.1/go.mod h1:bOMTKxs/Aqx4rM/dp+AYvdGTHU6tEa9+R5ORr5cJnqc=
105105
github.com/cloudevents/sdk-go/observability/opencensus/v2 v2.15.2 h1:AbtPqiUDzKup5JpTZzO297/QXgL/TAdpdXQCNwLzlaM=
106106
github.com/cloudevents/sdk-go/observability/opencensus/v2 v2.15.2/go.mod h1:ZbYLE+yaEQ2j4vbRc9qzvGmg30A9LhwFt/1bSebNnbU=
107+
github.com/cloudevents/sdk-go/observability/opentelemetry/v2 v2.16.1 h1:bm82HgAOzK9VSrVtC0lWWrrQLJaRYqqAKEIx2jdPQbU=
108+
github.com/cloudevents/sdk-go/observability/opentelemetry/v2 v2.16.1/go.mod h1:H46knbku1a1zLw3lAsaWdtVryvG1/QaNZRCRO69elKA=
107109
github.com/cloudevents/sdk-go/sql/v2 v2.15.2 h1:TNaTeWIbDaci89xgXbmmNVGccawQOvEfWYLWrr7Fk/k=
108110
github.com/cloudevents/sdk-go/sql/v2 v2.15.2/go.mod h1:us+PSk8OXdk8pDbRfvxy5w8ub5goKE7UP9PjKDY7TPw=
109111
github.com/cloudevents/sdk-go/v2 v2.16.1 h1:G91iUdqvl88BZ1GYYr9vScTj5zzXSyEuqbfE63gbu9Q=
@@ -1151,8 +1153,8 @@ k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff h1:/usPimJzUKKu+m+TE36gUy
11511153
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff/go.mod h1:5jIi+8yX4RIb8wk3XwBo5Pq2ccx4FP10ohkbSKCZoK8=
11521154
k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJJI8IUa1AmH/qa0=
11531155
k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
1154-
knative.dev/eventing v0.45.2 h1:FA093S4latA57V4GWoSteemHLEP9XOJIZJuJZKhNuC8=
1155-
knative.dev/eventing v0.45.2/go.mod h1:Fz5VjV/vWVN93UfTX3lzc+uFrvJ9wxiiKrRIPhQxoj8=
1156+
knative.dev/eventing v0.46.0 h1:DGkvUcZagkvvcRxf5jh/usdictP8BWa7kHgIocfT4MY=
1157+
knative.dev/eventing v0.46.0/go.mod h1:w5YnKVzZBuXXTJb8QN3POXTgXhM07d0tihBZYoSNF+I=
11561158
knative.dev/hack v0.0.0-20250708013849-70d4b00da6ba h1:PkOTBI8DRfvUKD8HTvYYT94NJ49J++llrDo3y0/ZAwc=
11571159
knative.dev/hack v0.0.0-20250708013849-70d4b00da6ba/go.mod h1:R0ritgYtjLDO9527h5vb5X6gfvt5LCrJ55BNbVDsWiY=
11581160
knative.dev/pkg v0.0.0-20250716115900-19d3cc2da0b9 h1:P9GFsqTmX4WokSVVUK1IbHOnbJLi8EW1pd4PvbUEodo=

0 commit comments

Comments
 (0)