From b6e48f4098e8cd869ce3c8a5ab451378c3f1e84d Mon Sep 17 00:00:00 2001 From: Luca Bruno Date: Thu, 6 Jun 2024 17:55:34 +0100 Subject: [PATCH 1/4] Prepare getContainerPorts for being extracted to a common file --- internal/manifests/collector/container.go | 19 +++---- .../manifests/collector/container_test.go | 51 ++++++++++++++----- 2 files changed, 48 insertions(+), 22 deletions(-) diff --git a/internal/manifests/collector/container.go b/internal/manifests/collector/container.go index ee85ccab5..be6a552ac 100644 --- a/internal/manifests/collector/container.go +++ b/internal/manifests/collector/container.go @@ -34,14 +34,7 @@ func Container(cfg config.Config, logger logr.Logger, agent v1alpha1.AmazonCloud image = cfg.CollectorImage() } - ports := getContainerPorts(logger, agent.Spec.Config) - for _, p := range agent.Spec.Ports { - ports[p.Name] = corev1.ContainerPort{ - Name: p.Name, - ContainerPort: p.Port, - Protocol: p.Protocol, - } - } + ports := getContainerPorts(logger, agent.Spec.Config, agent.Spec.Ports) var volumeMounts []corev1.VolumeMount argsMap := agent.Spec.Args @@ -125,7 +118,7 @@ func getVolumeMounts(os string) corev1.VolumeMount { return volumeMount } -func getContainerPorts(logger logr.Logger, cfg string) map[string]corev1.ContainerPort { +func getContainerPorts(logger logr.Logger, cfg string, specPorts []corev1.ServicePort) map[string]corev1.ContainerPort { ports := map[string]corev1.ContainerPort{} var servicePorts []corev1.ServicePort config, err := adapters.ConfigStructFromJSONString(cfg) @@ -155,6 +148,14 @@ func getContainerPorts(logger logr.Logger, cfg string) map[string]corev1.Contain Protocol: p.Protocol, } } + + for _, p := range specPorts { + ports[p.Name] = corev1.ContainerPort{ + Name: p.Name, + ContainerPort: p.Port, + Protocol: p.Protocol, + } + } return ports } diff --git a/internal/manifests/collector/container_test.go b/internal/manifests/collector/container_test.go index 662d46c1f..bfc352e64 100644 --- a/internal/manifests/collector/container_test.go +++ b/internal/manifests/collector/container_test.go @@ -14,7 +14,7 @@ import ( func TestStatsDGetContainerPorts(t *testing.T) { cfg := getJSONStringFromFile("./test-resources/statsDAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) assert.Equal(t, 4, len(containerPorts)) assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) @@ -29,7 +29,7 @@ func TestStatsDGetContainerPorts(t *testing.T) { func TestDefaultStatsDGetContainerPorts(t *testing.T) { cfg := getJSONStringFromFile("./test-resources/statsDDefaultAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) assert.Equal(t, 4, len(containerPorts)) assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) @@ -44,7 +44,7 @@ func TestDefaultStatsDGetContainerPorts(t *testing.T) { func TestCollectDGetContainerPorts(t *testing.T) { cfg := getJSONStringFromFile("./test-resources/collectDAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) assert.Equal(t, 4, len(containerPorts)) assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) @@ -58,7 +58,7 @@ func TestCollectDGetContainerPorts(t *testing.T) { func TestDefaultCollectDGetContainerPorts(t *testing.T) { cfg := getJSONStringFromFile("./test-resources/collectDDefaultAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) assert.Equal(t, 4, len(containerPorts)) assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) @@ -73,7 +73,7 @@ func TestDefaultCollectDGetContainerPorts(t *testing.T) { func TestEMFGetContainerPorts(t *testing.T) { cfg := getJSONStringFromFile("./test-resources/emfAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) assert.Equal(t, 4, len(containerPorts)) assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) @@ -87,7 +87,7 @@ func TestEMFGetContainerPorts(t *testing.T) { func TestXrayAndOTLPGetContainerPorts(t *testing.T) { cfg := getJSONStringFromFile("./test-resources/xrayAndOTLPAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) assert.Equal(t, 5, len(containerPorts)) assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) @@ -105,7 +105,7 @@ func TestXrayAndOTLPGetContainerPorts(t *testing.T) { func TestDefaultXRayAndOTLPGetContainerPorts(t *testing.T) { cfg := getJSONStringFromFile("./test-resources/xrayAndOTLPDefaultAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) assert.Equal(t, 5, len(containerPorts)) assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) @@ -123,7 +123,7 @@ func TestDefaultXRayAndOTLPGetContainerPorts(t *testing.T) { func TestXRayGetContainerPorts(t *testing.T) { cfg := getJSONStringFromFile("./test-resources/xrayAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) assert.Equal(t, 5, len(containerPorts)) assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) @@ -142,7 +142,7 @@ func TestXRayGetContainerPorts(t *testing.T) { func TestXRayWithBindAddressDefaultGetContainerPorts(t *testing.T) { cfg := getJSONStringFromFile("./test-resources/xrayAgentConfig.json") strings.Replace(cfg, "2800", "2000", 1) - containerPorts := getContainerPorts(logger, cfg) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) assert.Equal(t, 5, len(containerPorts)) assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) @@ -158,7 +158,7 @@ func TestXRayWithBindAddressDefaultGetContainerPorts(t *testing.T) { func TestXRayWithTCPProxyBindAddressDefaultGetContainerPorts(t *testing.T) { cfg := getJSONStringFromFile("./test-resources/xrayAgentConfig.json") strings.Replace(cfg, "2900", "2000", 1) - containerPorts := getContainerPorts(logger, cfg) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) assert.Equal(t, 5, len(containerPorts)) assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) @@ -173,7 +173,7 @@ func TestXRayWithTCPProxyBindAddressDefaultGetContainerPorts(t *testing.T) { func TestNilMetricsGetContainerPorts(t *testing.T) { cfg := getJSONStringFromFile("./test-resources/nilMetrics.json") - containerPorts := getContainerPorts(logger, cfg) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) assert.Equal(t, 3, len(containerPorts)) assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) @@ -186,7 +186,7 @@ func TestNilMetricsGetContainerPorts(t *testing.T) { func TestMultipleReceiversGetContainerPorts(t *testing.T) { cfg := getJSONStringFromFile("./test-resources/multipleReceiversAgentConfig.json") strings.Replace(cfg, "2900", "2000", 1) - containerPorts := getContainerPorts(logger, cfg) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) assert.Equal(t, 10, len(containerPorts)) assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) @@ -214,10 +214,35 @@ func TestMultipleReceiversGetContainerPorts(t *testing.T) { assert.Equal(t, CWA+OtlpHttp, containerPorts[CWA+OtlpHttp].Name) } +func TestSpecPortsOverrideGetContainerPorts(t *testing.T) { + cfg := getJSONStringFromFile("./test-resources/statsDAgentConfig.json") + specPorts := []corev1.ServicePort{ + { + Name: AppSignalsGrpc, + Port: 12345, + }, + { + Name: AppSignalsProxy, + Port: 12346, + }, + } + containerPorts := getContainerPorts(logger, cfg, specPorts) + assert.Equal(t, 4, len(containerPorts)) + assert.Equal(t, int32(12345), containerPorts[AppSignalsGrpc].ContainerPort) + assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) + assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) + assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) + assert.Equal(t, int32(12346), containerPorts[AppSignalsProxy].ContainerPort) + assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) + assert.Equal(t, int32(8135), containerPorts[CWA+StatsD].ContainerPort) + assert.Equal(t, CWA+StatsD, containerPorts[CWA+StatsD].Name) + assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+StatsD].Protocol) +} + func TestInvalidConfigGetContainerPorts(t *testing.T) { cfg := getJSONStringFromFile("./test-resources/nilMetrics.json") cfg = cfg + "," - containerPorts := getContainerPorts(logger, cfg) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) assert.Equal(t, 3, len(containerPorts)) assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) From 35fb863de884478b886af031340e49d716e6397c Mon Sep 17 00:00:00 2001 From: Luca Bruno Date: Thu, 6 Jun 2024 18:00:45 +0100 Subject: [PATCH 2/4] Move getContainerPorts to a common ports file --- internal/manifests/collector/container.go | 178 +----------- .../manifests/collector/container_test.go | 251 ----------------- internal/manifests/collector/ports.go | 172 ++++++++++++ internal/manifests/collector/ports_test.go | 258 ++++++++++++++++++ 4 files changed, 433 insertions(+), 426 deletions(-) create mode 100644 internal/manifests/collector/ports_test.go diff --git a/internal/manifests/collector/container.go b/internal/manifests/collector/container.go index be6a552ac..0f7a74d4a 100644 --- a/internal/manifests/collector/container.go +++ b/internal/manifests/collector/container.go @@ -4,23 +4,14 @@ package collector import ( - "errors" "fmt" - "regexp" - "sort" - "strconv" - "strings" - - "go.uber.org/zap" - - "github.com/go-logr/logr" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/validation" - "github.com/aws/amazon-cloudwatch-agent-operator/apis/v1alpha1" "github.com/aws/amazon-cloudwatch-agent-operator/internal/config" "github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector/adapters" "github.com/aws/amazon-cloudwatch-agent-operator/internal/naming" + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "sort" ) // maxPortLen allows us to truncate a port name according to what is considered valid port syntax: @@ -118,148 +109,6 @@ func getVolumeMounts(os string) corev1.VolumeMount { return volumeMount } -func getContainerPorts(logger logr.Logger, cfg string, specPorts []corev1.ServicePort) map[string]corev1.ContainerPort { - ports := map[string]corev1.ContainerPort{} - var servicePorts []corev1.ServicePort - config, err := adapters.ConfigStructFromJSONString(cfg) - if err != nil { - logger.Error(err, "error parsing cw agent config") - servicePorts = PortMapToServicePortList(AppSignalsPortToServicePortMap) - } else { - servicePorts = getServicePortsFromCWAgentConfig(logger, config) - } - - for _, p := range servicePorts { - truncName := naming.Truncate(p.Name, maxPortLen) - if p.Name != truncName { - logger.Info("truncating container port name", - zap.String("port.name.prev", p.Name), zap.String("port.name.new", truncName)) - } - nameErrs := validation.IsValidPortName(truncName) - numErrs := validation.IsValidPortNum(int(p.Port)) - if len(nameErrs) > 0 || len(numErrs) > 0 { - logger.Info("dropping invalid container port", zap.String("port.name", truncName), zap.Int32("port.num", p.Port), - zap.Strings("port.name.errs", nameErrs), zap.Strings("num.errs", numErrs)) - continue - } - ports[truncName] = corev1.ContainerPort{ - Name: truncName, - ContainerPort: p.Port, - Protocol: p.Protocol, - } - } - - for _, p := range specPorts { - ports[p.Name] = corev1.ContainerPort{ - Name: p.Name, - ContainerPort: p.Port, - Protocol: p.Protocol, - } - } - return ports -} - -func getServicePortsFromCWAgentConfig(logger logr.Logger, config *adapters.CwaConfig) []corev1.ServicePort { - servicePortsMap := getAppSignalsServicePortsMap() - getMetricsReceiversServicePorts(logger, config, servicePortsMap) - getLogsReceiversServicePorts(logger, config, servicePortsMap) - getTracesReceiversServicePorts(logger, config, servicePortsMap) - return PortMapToServicePortList(servicePortsMap) -} - -func getMetricsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32]corev1.ServicePort) { - if config.Metrics == nil || config.Metrics.MetricsCollected == nil { - return - } - //StatD - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-custom-metrics-statsd.html - if config.Metrics.MetricsCollected.StatsD != nil { - getReceiverServicePort(logger, config.Metrics.MetricsCollected.StatsD.ServiceAddress, StatsD, corev1.ProtocolUDP, servicePortsMap) - } - //CollectD - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-custom-metrics-collectd.html - if config.Metrics.MetricsCollected.CollectD != nil { - getReceiverServicePort(logger, config.Metrics.MetricsCollected.CollectD.ServiceAddress, CollectD, corev1.ProtocolUDP, servicePortsMap) - } -} - -func getReceiverServicePort(logger logr.Logger, serviceAddress string, receiverName string, protocol corev1.Protocol, servicePortsMap map[int32]corev1.ServicePort) { - if serviceAddress != "" { - port, err := portFromEndpoint(serviceAddress) - if err != nil { - logger.Error(err, "error parsing port from endpoint for receiver", zap.String("endpoint", serviceAddress), zap.String("receiver", receiverName)) - } else { - if _, ok := servicePortsMap[port]; ok { - logger.Info("Duplicate port has been configured in Agent Config for port", zap.Int32("port", port)) - } else { - sp := corev1.ServicePort{ - Name: CWA + receiverName, - Port: port, - Protocol: protocol, - } - servicePortsMap[port] = sp - } - } - } else { - if _, ok := servicePortsMap[receiverDefaultPortsMap[receiverName]]; ok { - logger.Info("Duplicate port has been configured in Agent Config for port", zap.Int32("port", receiverDefaultPortsMap[receiverName])) - } else { - sp := corev1.ServicePort{ - Name: receiverName, - Port: receiverDefaultPortsMap[receiverName], - Protocol: protocol, - } - servicePortsMap[receiverDefaultPortsMap[receiverName]] = sp - } - } -} - -func getLogsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32]corev1.ServicePort) { - //EMF - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Generation_CloudWatch_Agent.html - if config.Logs != nil && config.Logs.LogMetricsCollected != nil && config.Logs.LogMetricsCollected.EMF != nil { - if _, ok := servicePortsMap[receiverDefaultPortsMap[EMF]]; ok { - logger.Info("Duplicate port has been configured in Agent Config for port", zap.Int32("port", receiverDefaultPortsMap[EMF])) - } else { - sp := corev1.ServicePort{ - Name: EMF, - Port: receiverDefaultPortsMap[EMF], - } - servicePortsMap[receiverDefaultPortsMap[EMF]] = sp - } - } -} - -func getTracesReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32]corev1.ServicePort) []corev1.ServicePort { - var tracesPorts []corev1.ServicePort - - if config.Traces == nil || config.Traces.TracesCollected == nil { - return tracesPorts - } - //Traces - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html#CloudWatch-Agent-Configuration-File-Tracessection - //OTLP - if config.Traces.TracesCollected.OTLP != nil { - //GRPC - getReceiverServicePort(logger, config.Traces.TracesCollected.OTLP.GRPCEndpoint, OtlpGrpc, corev1.ProtocolTCP, servicePortsMap) - //HTTP - getReceiverServicePort(logger, config.Traces.TracesCollected.OTLP.HTTPEndpoint, OtlpHttp, corev1.ProtocolTCP, servicePortsMap) - - } - //Xray - if config.Traces.TracesCollected.XRay != nil { - getReceiverServicePort(logger, config.Traces.TracesCollected.XRay.BindAddress, XrayTraces, corev1.ProtocolUDP, servicePortsMap) - if config.Traces.TracesCollected.XRay.TCPProxy != nil { - getReceiverServicePort(logger, config.Traces.TracesCollected.XRay.TCPProxy.BindAddress, XrayProxy, corev1.ProtocolTCP, servicePortsMap) - } - } - return tracesPorts -} - -func getAppSignalsServicePortsMap() map[int32]corev1.ServicePort { - servicePortMap := make(map[int32]corev1.ServicePort) - for k, v := range AppSignalsPortToServicePortMap { - servicePortMap[k] = v - } - return servicePortMap -} - func portMapToContainerPortList(portMap map[string]corev1.ContainerPort) []corev1.ContainerPort { ports := make([]corev1.ContainerPort, 0, len(portMap)) for _, p := range portMap { @@ -270,24 +119,3 @@ func portMapToContainerPortList(portMap map[string]corev1.ContainerPort) []corev }) return ports } - -func portFromEndpoint(endpoint string) (int32, error) { - var err error - var port int64 - - r := regexp.MustCompile(":[0-9]+") - - if r.MatchString(endpoint) { - port, err = strconv.ParseInt(strings.Replace(r.FindString(endpoint), ":", "", -1), 10, 32) - - if err != nil { - return 0, err - } - } - - if port == 0 { - return 0, errors.New("port should not be empty") - } - - return int32(port), err -} diff --git a/internal/manifests/collector/container_test.go b/internal/manifests/collector/container_test.go index bfc352e64..23776bbfc 100644 --- a/internal/manifests/collector/container_test.go +++ b/internal/manifests/collector/container_test.go @@ -4,254 +4,11 @@ package collector import ( - "os" - "strings" "testing" "github.com/stretchr/testify/assert" - corev1 "k8s.io/api/core/v1" ) -func TestStatsDGetContainerPorts(t *testing.T) { - cfg := getJSONStringFromFile("./test-resources/statsDAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) - assert.Equal(t, 4, len(containerPorts)) - assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) - assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) - assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) - assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) - assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) - assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) - assert.Equal(t, int32(8135), containerPorts[CWA+StatsD].ContainerPort) - assert.Equal(t, CWA+StatsD, containerPorts[CWA+StatsD].Name) - assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+StatsD].Protocol) -} - -func TestDefaultStatsDGetContainerPorts(t *testing.T) { - cfg := getJSONStringFromFile("./test-resources/statsDDefaultAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) - assert.Equal(t, 4, len(containerPorts)) - assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) - assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) - assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) - assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) - assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) - assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) - assert.Equal(t, int32(8125), containerPorts[StatsD].ContainerPort) - assert.Equal(t, StatsD, containerPorts[StatsD].Name) - assert.Equal(t, corev1.ProtocolUDP, containerPorts[StatsD].Protocol) -} - -func TestCollectDGetContainerPorts(t *testing.T) { - cfg := getJSONStringFromFile("./test-resources/collectDAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) - assert.Equal(t, 4, len(containerPorts)) - assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) - assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) - assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) - assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) - assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) - assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) - assert.Equal(t, int32(25936), containerPorts[CWA+CollectD].ContainerPort) - assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+CollectD].Protocol) -} - -func TestDefaultCollectDGetContainerPorts(t *testing.T) { - cfg := getJSONStringFromFile("./test-resources/collectDDefaultAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) - assert.Equal(t, 4, len(containerPorts)) - assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) - assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) - assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) - assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) - assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) - assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) - assert.Equal(t, int32(25826), containerPorts[CollectD].ContainerPort) - assert.Equal(t, CollectD, containerPorts[CollectD].Name) - assert.Equal(t, corev1.ProtocolUDP, containerPorts[CollectD].Protocol) -} - -func TestEMFGetContainerPorts(t *testing.T) { - cfg := getJSONStringFromFile("./test-resources/emfAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) - assert.Equal(t, 4, len(containerPorts)) - assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) - assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) - assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) - assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) - assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) - assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) - assert.Equal(t, int32(25888), containerPorts[EMF].ContainerPort) - assert.Equal(t, EMF, containerPorts[EMF].Name) -} - -func TestXrayAndOTLPGetContainerPorts(t *testing.T) { - cfg := getJSONStringFromFile("./test-resources/xrayAndOTLPAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) - assert.Equal(t, 5, len(containerPorts)) - assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) - assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) - assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) - assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) - assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) - assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) - assert.Equal(t, int32(4327), containerPorts[CWA+OtlpGrpc].ContainerPort) - assert.Equal(t, CWA+OtlpGrpc, containerPorts[CWA+OtlpGrpc].Name) - assert.Equal(t, corev1.ProtocolTCP, containerPorts[CWA+OtlpGrpc].Protocol) - assert.Equal(t, int32(4328), containerPorts[CWA+OtlpHttp].ContainerPort) - assert.Equal(t, CWA+OtlpHttp, containerPorts[CWA+OtlpHttp].Name) - assert.Equal(t, corev1.ProtocolTCP, containerPorts[CWA+OtlpHttp].Protocol) -} - -func TestDefaultXRayAndOTLPGetContainerPorts(t *testing.T) { - cfg := getJSONStringFromFile("./test-resources/xrayAndOTLPDefaultAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) - assert.Equal(t, 5, len(containerPorts)) - assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) - assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) - assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) - assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) - assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) - assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) - assert.Equal(t, int32(4317), containerPorts[OtlpGrpc].ContainerPort) - assert.Equal(t, OtlpGrpc, containerPorts[OtlpGrpc].Name) - assert.Equal(t, corev1.ProtocolTCP, containerPorts[OtlpGrpc].Protocol) - assert.Equal(t, int32(4318), containerPorts[OtlpHttp].ContainerPort) - assert.Equal(t, OtlpHttp, containerPorts[OtlpHttp].Name) - assert.Equal(t, corev1.ProtocolTCP, containerPorts[OtlpHttp].Protocol) -} - -func TestXRayGetContainerPorts(t *testing.T) { - cfg := getJSONStringFromFile("./test-resources/xrayAgentConfig.json") - containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) - assert.Equal(t, 5, len(containerPorts)) - assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) - assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) - assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) - assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) - assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) - assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) - assert.Equal(t, int32(2800), containerPorts[CWA+XrayTraces].ContainerPort) - assert.Equal(t, CWA+XrayTraces, containerPorts[CWA+XrayTraces].Name) - assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+XrayTraces].Protocol) - assert.Equal(t, int32(2900), containerPorts[CWA+XrayProxy].ContainerPort) - assert.Equal(t, CWA+XrayProxy, containerPorts[CWA+XrayProxy].Name) - assert.Equal(t, corev1.ProtocolTCP, containerPorts[CWA+XrayProxy].Protocol) -} - -func TestXRayWithBindAddressDefaultGetContainerPorts(t *testing.T) { - cfg := getJSONStringFromFile("./test-resources/xrayAgentConfig.json") - strings.Replace(cfg, "2800", "2000", 1) - containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) - assert.Equal(t, 5, len(containerPorts)) - assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) - assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) - assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) - assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) - assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) - assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) - assert.Equal(t, int32(2900), containerPorts[CWA+XrayProxy].ContainerPort) - assert.Equal(t, CWA+XrayProxy, containerPorts[CWA+XrayProxy].Name) - assert.Equal(t, corev1.ProtocolTCP, containerPorts[CWA+XrayProxy].Protocol) -} - -func TestXRayWithTCPProxyBindAddressDefaultGetContainerPorts(t *testing.T) { - cfg := getJSONStringFromFile("./test-resources/xrayAgentConfig.json") - strings.Replace(cfg, "2900", "2000", 1) - containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) - assert.Equal(t, 5, len(containerPorts)) - assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) - assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) - assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) - assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) - assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) - assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) - assert.Equal(t, int32(2800), containerPorts[CWA+XrayTraces].ContainerPort) - assert.Equal(t, CWA+XrayTraces, containerPorts[CWA+XrayTraces].Name) - assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+XrayTraces].Protocol) -} - -func TestNilMetricsGetContainerPorts(t *testing.T) { - cfg := getJSONStringFromFile("./test-resources/nilMetrics.json") - containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) - assert.Equal(t, 3, len(containerPorts)) - assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) - assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) - assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) - assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) - assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) - assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) -} - -func TestMultipleReceiversGetContainerPorts(t *testing.T) { - cfg := getJSONStringFromFile("./test-resources/multipleReceiversAgentConfig.json") - strings.Replace(cfg, "2900", "2000", 1) - containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) - assert.Equal(t, 10, len(containerPorts)) - assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) - assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) - assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) - assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) - assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) - assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) - assert.Equal(t, int32(8135), containerPorts[CWA+StatsD].ContainerPort) - assert.Equal(t, CWA+StatsD, containerPorts[CWA+StatsD].Name) - assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+StatsD].Protocol) - assert.Equal(t, int32(25936), containerPorts[CWA+CollectD].ContainerPort) - assert.Equal(t, CWA+CollectD, containerPorts[CWA+CollectD].Name) - assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+CollectD].Protocol) - assert.Equal(t, int32(25888), containerPorts[EMF].ContainerPort) - assert.Equal(t, EMF, containerPorts[EMF].Name) - assert.Equal(t, int32(2800), containerPorts[CWA+XrayTraces].ContainerPort) - assert.Equal(t, CWA+XrayTraces, containerPorts[CWA+XrayTraces].Name) - assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+XrayTraces].Protocol) - assert.Equal(t, int32(2900), containerPorts[CWA+XrayProxy].ContainerPort) - assert.Equal(t, CWA+XrayProxy, containerPorts[CWA+XrayProxy].Name) - assert.Equal(t, corev1.ProtocolTCP, containerPorts[CWA+XrayProxy].Protocol) - assert.Equal(t, int32(4327), containerPorts[CWA+OtlpGrpc].ContainerPort) - assert.Equal(t, CWA+OtlpGrpc, containerPorts[CWA+OtlpGrpc].Name) - assert.Equal(t, int32(4328), containerPorts[CWA+OtlpHttp].ContainerPort) - assert.Equal(t, CWA+OtlpHttp, containerPorts[CWA+OtlpHttp].Name) -} - -func TestSpecPortsOverrideGetContainerPorts(t *testing.T) { - cfg := getJSONStringFromFile("./test-resources/statsDAgentConfig.json") - specPorts := []corev1.ServicePort{ - { - Name: AppSignalsGrpc, - Port: 12345, - }, - { - Name: AppSignalsProxy, - Port: 12346, - }, - } - containerPorts := getContainerPorts(logger, cfg, specPorts) - assert.Equal(t, 4, len(containerPorts)) - assert.Equal(t, int32(12345), containerPorts[AppSignalsGrpc].ContainerPort) - assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) - assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) - assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) - assert.Equal(t, int32(12346), containerPorts[AppSignalsProxy].ContainerPort) - assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) - assert.Equal(t, int32(8135), containerPorts[CWA+StatsD].ContainerPort) - assert.Equal(t, CWA+StatsD, containerPorts[CWA+StatsD].Name) - assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+StatsD].Protocol) -} - -func TestInvalidConfigGetContainerPorts(t *testing.T) { - cfg := getJSONStringFromFile("./test-resources/nilMetrics.json") - cfg = cfg + "," - containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) - assert.Equal(t, 3, len(containerPorts)) - assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) - assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) - assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) - assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) - assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) - assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) -} - func TestGetVolumeMounts(t *testing.T) { volumeMount := getVolumeMounts("windows") assert.Equal(t, volumeMount.MountPath, "C:\\Program Files\\Amazon\\AmazonCloudWatchAgent\\cwagentconfig") @@ -262,11 +19,3 @@ func TestGetVolumeMounts(t *testing.T) { volumeMount = getVolumeMounts("") assert.Equal(t, volumeMount.MountPath, "/etc/cwagentconfig") } - -func getJSONStringFromFile(path string) string { - buf, err := os.ReadFile(path) - if err != nil { - return "" - } - return string(buf) -} diff --git a/internal/manifests/collector/ports.go b/internal/manifests/collector/ports.go index 8e5a20cdd..38a97ae8b 100644 --- a/internal/manifests/collector/ports.go +++ b/internal/manifests/collector/ports.go @@ -4,7 +4,16 @@ package collector import ( + "errors" + "github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector/adapters" + "github.com/aws/amazon-cloudwatch-agent-operator/internal/naming" + "github.com/go-logr/logr" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/validation" + "regexp" "sort" + "strconv" + "strings" corev1 "k8s.io/api/core/v1" ) @@ -57,3 +66,166 @@ func PortMapToServicePortList(portMap map[int32]corev1.ServicePort) []corev1.Ser }) return ports } + +func getContainerPorts(logger logr.Logger, cfg string, specPorts []corev1.ServicePort) map[string]corev1.ContainerPort { + ports := map[string]corev1.ContainerPort{} + var servicePorts []corev1.ServicePort + config, err := adapters.ConfigStructFromJSONString(cfg) + if err != nil { + logger.Error(err, "error parsing cw agent config") + servicePorts = PortMapToServicePortList(AppSignalsPortToServicePortMap) + } else { + servicePorts = getServicePortsFromCWAgentConfig(logger, config) + } + + for _, p := range servicePorts { + truncName := naming.Truncate(p.Name, maxPortLen) + if p.Name != truncName { + logger.Info("truncating container port name", + zap.String("port.name.prev", p.Name), zap.String("port.name.new", truncName)) + } + nameErrs := validation.IsValidPortName(truncName) + numErrs := validation.IsValidPortNum(int(p.Port)) + if len(nameErrs) > 0 || len(numErrs) > 0 { + logger.Info("dropping invalid container port", zap.String("port.name", truncName), zap.Int32("port.num", p.Port), + zap.Strings("port.name.errs", nameErrs), zap.Strings("num.errs", numErrs)) + continue + } + ports[truncName] = corev1.ContainerPort{ + Name: truncName, + ContainerPort: p.Port, + Protocol: p.Protocol, + } + } + + for _, p := range specPorts { + ports[p.Name] = corev1.ContainerPort{ + Name: p.Name, + ContainerPort: p.Port, + Protocol: p.Protocol, + } + } + return ports +} + +func getServicePortsFromCWAgentConfig(logger logr.Logger, config *adapters.CwaConfig) []corev1.ServicePort { + servicePortsMap := getAppSignalsServicePortsMap() + getMetricsReceiversServicePorts(logger, config, servicePortsMap) + getLogsReceiversServicePorts(logger, config, servicePortsMap) + getTracesReceiversServicePorts(logger, config, servicePortsMap) + return PortMapToServicePortList(servicePortsMap) +} + +func getMetricsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32]corev1.ServicePort) { + if config.Metrics == nil || config.Metrics.MetricsCollected == nil { + return + } + //StatD - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-custom-metrics-statsd.html + if config.Metrics.MetricsCollected.StatsD != nil { + getReceiverServicePort(logger, config.Metrics.MetricsCollected.StatsD.ServiceAddress, StatsD, corev1.ProtocolUDP, servicePortsMap) + } + //CollectD - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-custom-metrics-collectd.html + if config.Metrics.MetricsCollected.CollectD != nil { + getReceiverServicePort(logger, config.Metrics.MetricsCollected.CollectD.ServiceAddress, CollectD, corev1.ProtocolUDP, servicePortsMap) + } +} + +func getReceiverServicePort(logger logr.Logger, serviceAddress string, receiverName string, protocol corev1.Protocol, servicePortsMap map[int32]corev1.ServicePort) { + if serviceAddress != "" { + port, err := portFromEndpoint(serviceAddress) + if err != nil { + logger.Error(err, "error parsing port from endpoint for receiver", zap.String("endpoint", serviceAddress), zap.String("receiver", receiverName)) + } else { + if _, ok := servicePortsMap[port]; ok { + logger.Info("Duplicate port has been configured in Agent Config for port", zap.Int32("port", port)) + } else { + sp := corev1.ServicePort{ + Name: CWA + receiverName, + Port: port, + Protocol: protocol, + } + servicePortsMap[port] = sp + } + } + } else { + if _, ok := servicePortsMap[receiverDefaultPortsMap[receiverName]]; ok { + logger.Info("Duplicate port has been configured in Agent Config for port", zap.Int32("port", receiverDefaultPortsMap[receiverName])) + } else { + sp := corev1.ServicePort{ + Name: receiverName, + Port: receiverDefaultPortsMap[receiverName], + Protocol: protocol, + } + servicePortsMap[receiverDefaultPortsMap[receiverName]] = sp + } + } +} + +func getLogsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32]corev1.ServicePort) { + //EMF - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Generation_CloudWatch_Agent.html + if config.Logs != nil && config.Logs.LogMetricsCollected != nil && config.Logs.LogMetricsCollected.EMF != nil { + if _, ok := servicePortsMap[receiverDefaultPortsMap[EMF]]; ok { + logger.Info("Duplicate port has been configured in Agent Config for port", zap.Int32("port", receiverDefaultPortsMap[EMF])) + } else { + sp := corev1.ServicePort{ + Name: EMF, + Port: receiverDefaultPortsMap[EMF], + } + servicePortsMap[receiverDefaultPortsMap[EMF]] = sp + } + } +} + +func getTracesReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32]corev1.ServicePort) []corev1.ServicePort { + var tracesPorts []corev1.ServicePort + + if config.Traces == nil || config.Traces.TracesCollected == nil { + return tracesPorts + } + //Traces - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html#CloudWatch-Agent-Configuration-File-Tracessection + //OTLP + if config.Traces.TracesCollected.OTLP != nil { + //GRPC + getReceiverServicePort(logger, config.Traces.TracesCollected.OTLP.GRPCEndpoint, OtlpGrpc, corev1.ProtocolTCP, servicePortsMap) + //HTTP + getReceiverServicePort(logger, config.Traces.TracesCollected.OTLP.HTTPEndpoint, OtlpHttp, corev1.ProtocolTCP, servicePortsMap) + + } + //Xray + if config.Traces.TracesCollected.XRay != nil { + getReceiverServicePort(logger, config.Traces.TracesCollected.XRay.BindAddress, XrayTraces, corev1.ProtocolUDP, servicePortsMap) + if config.Traces.TracesCollected.XRay.TCPProxy != nil { + getReceiverServicePort(logger, config.Traces.TracesCollected.XRay.TCPProxy.BindAddress, XrayProxy, corev1.ProtocolTCP, servicePortsMap) + } + } + return tracesPorts +} + +func getAppSignalsServicePortsMap() map[int32]corev1.ServicePort { + servicePortMap := make(map[int32]corev1.ServicePort) + for k, v := range AppSignalsPortToServicePortMap { + servicePortMap[k] = v + } + return servicePortMap +} + +func portFromEndpoint(endpoint string) (int32, error) { + var err error + var port int64 + + r := regexp.MustCompile(":[0-9]+") + + if r.MatchString(endpoint) { + port, err = strconv.ParseInt(strings.Replace(r.FindString(endpoint), ":", "", -1), 10, 32) + + if err != nil { + return 0, err + } + } + + if port == 0 { + return 0, errors.New("port should not be empty") + } + + return int32(port), err +} diff --git a/internal/manifests/collector/ports_test.go b/internal/manifests/collector/ports_test.go new file mode 100644 index 000000000..6de23cdf4 --- /dev/null +++ b/internal/manifests/collector/ports_test.go @@ -0,0 +1,258 @@ +package collector + +import ( + corev1 "k8s.io/api/core/v1" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestStatsDGetContainerPorts(t *testing.T) { + cfg := getJSONStringFromFile("./test-resources/statsDAgentConfig.json") + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) + assert.Equal(t, 4, len(containerPorts)) + assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) + assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) + assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) + assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) + assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) + assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) + assert.Equal(t, int32(8135), containerPorts[CWA+StatsD].ContainerPort) + assert.Equal(t, CWA+StatsD, containerPorts[CWA+StatsD].Name) + assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+StatsD].Protocol) +} + +func TestDefaultStatsDGetContainerPorts(t *testing.T) { + cfg := getJSONStringFromFile("./test-resources/statsDDefaultAgentConfig.json") + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) + assert.Equal(t, 4, len(containerPorts)) + assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) + assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) + assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) + assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) + assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) + assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) + assert.Equal(t, int32(8125), containerPorts[StatsD].ContainerPort) + assert.Equal(t, StatsD, containerPorts[StatsD].Name) + assert.Equal(t, corev1.ProtocolUDP, containerPorts[StatsD].Protocol) +} + +func TestCollectDGetContainerPorts(t *testing.T) { + cfg := getJSONStringFromFile("./test-resources/collectDAgentConfig.json") + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) + assert.Equal(t, 4, len(containerPorts)) + assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) + assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) + assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) + assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) + assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) + assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) + assert.Equal(t, int32(25936), containerPorts[CWA+CollectD].ContainerPort) + assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+CollectD].Protocol) +} + +func TestDefaultCollectDGetContainerPorts(t *testing.T) { + cfg := getJSONStringFromFile("./test-resources/collectDDefaultAgentConfig.json") + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) + assert.Equal(t, 4, len(containerPorts)) + assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) + assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) + assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) + assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) + assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) + assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) + assert.Equal(t, int32(25826), containerPorts[CollectD].ContainerPort) + assert.Equal(t, CollectD, containerPorts[CollectD].Name) + assert.Equal(t, corev1.ProtocolUDP, containerPorts[CollectD].Protocol) +} + +func TestEMFGetContainerPorts(t *testing.T) { + cfg := getJSONStringFromFile("./test-resources/emfAgentConfig.json") + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) + assert.Equal(t, 4, len(containerPorts)) + assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) + assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) + assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) + assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) + assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) + assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) + assert.Equal(t, int32(25888), containerPorts[EMF].ContainerPort) + assert.Equal(t, EMF, containerPorts[EMF].Name) +} + +func TestXrayAndOTLPGetContainerPorts(t *testing.T) { + cfg := getJSONStringFromFile("./test-resources/xrayAndOTLPAgentConfig.json") + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) + assert.Equal(t, 5, len(containerPorts)) + assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) + assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) + assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) + assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) + assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) + assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) + assert.Equal(t, int32(4327), containerPorts[CWA+OtlpGrpc].ContainerPort) + assert.Equal(t, CWA+OtlpGrpc, containerPorts[CWA+OtlpGrpc].Name) + assert.Equal(t, corev1.ProtocolTCP, containerPorts[CWA+OtlpGrpc].Protocol) + assert.Equal(t, int32(4328), containerPorts[CWA+OtlpHttp].ContainerPort) + assert.Equal(t, CWA+OtlpHttp, containerPorts[CWA+OtlpHttp].Name) + assert.Equal(t, corev1.ProtocolTCP, containerPorts[CWA+OtlpHttp].Protocol) +} + +func TestDefaultXRayAndOTLPGetContainerPorts(t *testing.T) { + cfg := getJSONStringFromFile("./test-resources/xrayAndOTLPDefaultAgentConfig.json") + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) + assert.Equal(t, 5, len(containerPorts)) + assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) + assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) + assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) + assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) + assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) + assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) + assert.Equal(t, int32(4317), containerPorts[OtlpGrpc].ContainerPort) + assert.Equal(t, OtlpGrpc, containerPorts[OtlpGrpc].Name) + assert.Equal(t, corev1.ProtocolTCP, containerPorts[OtlpGrpc].Protocol) + assert.Equal(t, int32(4318), containerPorts[OtlpHttp].ContainerPort) + assert.Equal(t, OtlpHttp, containerPorts[OtlpHttp].Name) + assert.Equal(t, corev1.ProtocolTCP, containerPorts[OtlpHttp].Protocol) +} + +func TestXRayGetContainerPorts(t *testing.T) { + cfg := getJSONStringFromFile("./test-resources/xrayAgentConfig.json") + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) + assert.Equal(t, 5, len(containerPorts)) + assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) + assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) + assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) + assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) + assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) + assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) + assert.Equal(t, int32(2800), containerPorts[CWA+XrayTraces].ContainerPort) + assert.Equal(t, CWA+XrayTraces, containerPorts[CWA+XrayTraces].Name) + assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+XrayTraces].Protocol) + assert.Equal(t, int32(2900), containerPorts[CWA+XrayProxy].ContainerPort) + assert.Equal(t, CWA+XrayProxy, containerPorts[CWA+XrayProxy].Name) + assert.Equal(t, corev1.ProtocolTCP, containerPorts[CWA+XrayProxy].Protocol) +} + +func TestXRayWithBindAddressDefaultGetContainerPorts(t *testing.T) { + cfg := getJSONStringFromFile("./test-resources/xrayAgentConfig.json") + strings.Replace(cfg, "2800", "2000", 1) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) + assert.Equal(t, 5, len(containerPorts)) + assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) + assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) + assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) + assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) + assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) + assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) + assert.Equal(t, int32(2900), containerPorts[CWA+XrayProxy].ContainerPort) + assert.Equal(t, CWA+XrayProxy, containerPorts[CWA+XrayProxy].Name) + assert.Equal(t, corev1.ProtocolTCP, containerPorts[CWA+XrayProxy].Protocol) +} + +func TestXRayWithTCPProxyBindAddressDefaultGetContainerPorts(t *testing.T) { + cfg := getJSONStringFromFile("./test-resources/xrayAgentConfig.json") + strings.Replace(cfg, "2900", "2000", 1) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) + assert.Equal(t, 5, len(containerPorts)) + assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) + assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) + assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) + assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) + assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) + assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) + assert.Equal(t, int32(2800), containerPorts[CWA+XrayTraces].ContainerPort) + assert.Equal(t, CWA+XrayTraces, containerPorts[CWA+XrayTraces].Name) + assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+XrayTraces].Protocol) +} + +func TestNilMetricsGetContainerPorts(t *testing.T) { + cfg := getJSONStringFromFile("./test-resources/nilMetrics.json") + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) + assert.Equal(t, 3, len(containerPorts)) + assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) + assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) + assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) + assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) + assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) + assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) +} + +func TestMultipleReceiversGetContainerPorts(t *testing.T) { + cfg := getJSONStringFromFile("./test-resources/multipleReceiversAgentConfig.json") + strings.Replace(cfg, "2900", "2000", 1) + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) + assert.Equal(t, 10, len(containerPorts)) + assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) + assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) + assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) + assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) + assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) + assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) + assert.Equal(t, int32(8135), containerPorts[CWA+StatsD].ContainerPort) + assert.Equal(t, CWA+StatsD, containerPorts[CWA+StatsD].Name) + assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+StatsD].Protocol) + assert.Equal(t, int32(25936), containerPorts[CWA+CollectD].ContainerPort) + assert.Equal(t, CWA+CollectD, containerPorts[CWA+CollectD].Name) + assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+CollectD].Protocol) + assert.Equal(t, int32(25888), containerPorts[EMF].ContainerPort) + assert.Equal(t, EMF, containerPorts[EMF].Name) + assert.Equal(t, int32(2800), containerPorts[CWA+XrayTraces].ContainerPort) + assert.Equal(t, CWA+XrayTraces, containerPorts[CWA+XrayTraces].Name) + assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+XrayTraces].Protocol) + assert.Equal(t, int32(2900), containerPorts[CWA+XrayProxy].ContainerPort) + assert.Equal(t, CWA+XrayProxy, containerPorts[CWA+XrayProxy].Name) + assert.Equal(t, corev1.ProtocolTCP, containerPorts[CWA+XrayProxy].Protocol) + assert.Equal(t, int32(4327), containerPorts[CWA+OtlpGrpc].ContainerPort) + assert.Equal(t, CWA+OtlpGrpc, containerPorts[CWA+OtlpGrpc].Name) + assert.Equal(t, int32(4328), containerPorts[CWA+OtlpHttp].ContainerPort) + assert.Equal(t, CWA+OtlpHttp, containerPorts[CWA+OtlpHttp].Name) +} + +func TestSpecPortsOverrideGetContainerPorts(t *testing.T) { + cfg := getJSONStringFromFile("./test-resources/statsDAgentConfig.json") + specPorts := []corev1.ServicePort{ + { + Name: AppSignalsGrpc, + Port: 12345, + }, + { + Name: AppSignalsProxy, + Port: 12346, + }, + } + containerPorts := getContainerPorts(logger, cfg, specPorts) + assert.Equal(t, 4, len(containerPorts)) + assert.Equal(t, int32(12345), containerPorts[AppSignalsGrpc].ContainerPort) + assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) + assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) + assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) + assert.Equal(t, int32(12346), containerPorts[AppSignalsProxy].ContainerPort) + assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) + assert.Equal(t, int32(8135), containerPorts[CWA+StatsD].ContainerPort) + assert.Equal(t, CWA+StatsD, containerPorts[CWA+StatsD].Name) + assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+StatsD].Protocol) +} + +func TestInvalidConfigGetContainerPorts(t *testing.T) { + cfg := getJSONStringFromFile("./test-resources/nilMetrics.json") + cfg = cfg + "," + containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) + assert.Equal(t, 3, len(containerPorts)) + assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) + assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) + assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) + assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) + assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) + assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) +} + +func getJSONStringFromFile(path string) string { + buf, err := os.ReadFile(path) + if err != nil { + return "" + } + return string(buf) +} From 6dfb8759cc8e1edf4fe765de0643a27ebec049b1 Mon Sep 17 00:00:00 2001 From: Luca Bruno Date: Thu, 6 Jun 2024 19:28:53 +0100 Subject: [PATCH 3/4] Create service ports based on container ports --- internal/manifests/collector/container.go | 8 +++-- internal/manifests/collector/ports.go | 12 ++++---- internal/manifests/collector/ports_test.go | 6 +++- internal/manifests/collector/service.go | 35 +++++++++------------- 4 files changed, 31 insertions(+), 30 deletions(-) diff --git a/internal/manifests/collector/container.go b/internal/manifests/collector/container.go index 0f7a74d4a..72527be74 100644 --- a/internal/manifests/collector/container.go +++ b/internal/manifests/collector/container.go @@ -5,13 +5,15 @@ package collector import ( "fmt" + "sort" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "github.com/aws/amazon-cloudwatch-agent-operator/apis/v1alpha1" "github.com/aws/amazon-cloudwatch-agent-operator/internal/config" "github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector/adapters" "github.com/aws/amazon-cloudwatch-agent-operator/internal/naming" - "github.com/go-logr/logr" - corev1 "k8s.io/api/core/v1" - "sort" ) // maxPortLen allows us to truncate a port name according to what is considered valid port syntax: diff --git a/internal/manifests/collector/ports.go b/internal/manifests/collector/ports.go index 38a97ae8b..48cb99102 100644 --- a/internal/manifests/collector/ports.go +++ b/internal/manifests/collector/ports.go @@ -5,16 +5,18 @@ package collector import ( "errors" - "github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector/adapters" - "github.com/aws/amazon-cloudwatch-agent-operator/internal/naming" - "github.com/go-logr/logr" - "go.uber.org/zap" - "k8s.io/apimachinery/pkg/util/validation" "regexp" "sort" "strconv" "strings" + "github.com/go-logr/logr" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/validation" + + "github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector/adapters" + "github.com/aws/amazon-cloudwatch-agent-operator/internal/naming" + corev1 "k8s.io/api/core/v1" ) diff --git a/internal/manifests/collector/ports_test.go b/internal/manifests/collector/ports_test.go index 6de23cdf4..16a4fd0de 100644 --- a/internal/manifests/collector/ports_test.go +++ b/internal/manifests/collector/ports_test.go @@ -1,11 +1,15 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + package collector import ( - corev1 "k8s.io/api/core/v1" "os" "strings" "testing" + corev1 "k8s.io/api/core/v1" + "github.com/stretchr/testify/assert" ) diff --git a/internal/manifests/collector/service.go b/internal/manifests/collector/service.go index aca1889bb..f7dccdeb8 100644 --- a/internal/manifests/collector/service.go +++ b/internal/manifests/collector/service.go @@ -82,26 +82,7 @@ func Service(params manifests.Params) (*corev1.Service, error) { name := naming.Service(params.OtelCol.Name) labels := manifestutils.Labels(params.OtelCol.ObjectMeta, name, params.OtelCol.Spec.Image, ComponentAmazonCloudWatchAgent, []string{}) - ports := PortMapToServicePortList(AppSignalsPortToServicePortMap) - - if len(params.OtelCol.Spec.Ports) > 0 { - // we should add all the ports from the CR - // there are two cases where problems might occur: - // 1) when the port number is already being used by a receiver - // 2) same, but for the port name - // - // in the first case, we remove the port we inferred from the list - // in the second case, we rename our inferred port to something like "port-%d" - portNumbers, portNames := extractPortNumbersAndNames(params.OtelCol.Spec.Ports) - var resultingInferredPorts []corev1.ServicePort - for _, inferred := range ports { - if filtered := filterPort(params.Log, inferred, portNumbers, portNames); filtered != nil { - resultingInferredPorts = append(resultingInferredPorts, *filtered) - } - } - - ports = append(params.OtelCol.Spec.Ports, resultingInferredPorts...) - } + ports := getContainerPorts(params.Log, params.OtelCol.Spec.Config, params.OtelCol.Spec.Ports) // if we have no ports, we don't need a service if len(ports) == 0 { @@ -126,11 +107,23 @@ func Service(params manifests.Params) (*corev1.Service, error) { InternalTrafficPolicy: &trafficPolicy, Selector: manifestutils.SelectorLabels(params.OtelCol.ObjectMeta, ComponentAmazonCloudWatchAgent), ClusterIP: "", - Ports: ports, + Ports: containerPortsToServicePortList(ports), }, }, nil } +func containerPortsToServicePortList(portMap map[string]corev1.ContainerPort) []corev1.ServicePort { + var ports []corev1.ServicePort + for _, p := range portMap { + ports = append(ports, corev1.ServicePort{ + Name: p.Name, + Port: p.ContainerPort, + Protocol: p.Protocol, + }) + } + return ports +} + func filterPort(logger logr.Logger, candidate corev1.ServicePort, portNumbers map[int32]bool, portNames map[string]bool) *corev1.ServicePort { if portNumbers[candidate.Port] { return nil From 27d6e24039e98cf688f6951b9c49d50709b36c50 Mon Sep 17 00:00:00 2001 From: Luca Bruno Date: Thu, 6 Jun 2024 19:36:40 +0100 Subject: [PATCH 4/4] Support both EMF TCP and UDP ports --- apis/v1alpha1/zz_generated.deepcopy.go | 4 +- apis/v1alpha2/zz_generated.deepcopy.go | 5 +- internal/manifests/collector/ports.go | 54 +++++++++++++--------- internal/manifests/collector/ports_test.go | 18 +++++--- 4 files changed, 49 insertions(+), 32 deletions(-) diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index aa0d4b2fc..f11b337ae 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -7,9 +7,9 @@ package v1alpha1 import ( - "k8s.io/api/autoscaling/v2" + v2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" - "k8s.io/api/networking/v1" + v1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" ) diff --git a/apis/v1alpha2/zz_generated.deepcopy.go b/apis/v1alpha2/zz_generated.deepcopy.go index e840621e9..2833a1325 100644 --- a/apis/v1alpha2/zz_generated.deepcopy.go +++ b/apis/v1alpha2/zz_generated.deepcopy.go @@ -7,9 +7,10 @@ package v1alpha2 import ( - "github.com/aws/amazon-cloudwatch-agent-operator/apis/v1alpha1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" runtime "k8s.io/apimachinery/pkg/runtime" + + "github.com/aws/amazon-cloudwatch-agent-operator/apis/v1alpha1" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. diff --git a/internal/manifests/collector/ports.go b/internal/manifests/collector/ports.go index 48cb99102..eeb80250e 100644 --- a/internal/manifests/collector/ports.go +++ b/internal/manifests/collector/ports.go @@ -31,6 +31,8 @@ const ( AppSignalsHttp = "appsignals-http" AppSignalsProxy = "appsignals-xray" EMF = "emf" + EMFTcp = "emf-tcp" + EMFUdp = "emf-udp" CWA = "cwa-" ) @@ -43,25 +45,27 @@ var receiverDefaultPortsMap = map[string]int32{ EMF: 25888, } -var AppSignalsPortToServicePortMap = map[int32]corev1.ServicePort{ - 4315: { +var AppSignalsPortToServicePortMap = map[int32][]corev1.ServicePort{ + 4315: {{ Name: AppSignalsGrpc, Port: 4315, - }, - 4316: { + }}, + 4316: {{ Name: AppSignalsHttp, Port: 4316, - }, - 2000: { + }}, + 2000: {{ Name: AppSignalsProxy, Port: 2000, - }, + }}, } -func PortMapToServicePortList(portMap map[int32]corev1.ServicePort) []corev1.ServicePort { +func PortMapToServicePortList(portMap map[int32][]corev1.ServicePort) []corev1.ServicePort { ports := make([]corev1.ServicePort, 0, len(portMap)) - for _, p := range portMap { - ports = append(ports, p) + for _, plist := range portMap { + for _, p := range plist { + ports = append(ports, p) + } } sort.Slice(ports, func(i, j int) bool { return ports[i].Name < ports[j].Name @@ -118,7 +122,7 @@ func getServicePortsFromCWAgentConfig(logger logr.Logger, config *adapters.CwaCo return PortMapToServicePortList(servicePortsMap) } -func getMetricsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32]corev1.ServicePort) { +func getMetricsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32][]corev1.ServicePort) { if config.Metrics == nil || config.Metrics.MetricsCollected == nil { return } @@ -132,7 +136,7 @@ func getMetricsReceiversServicePorts(logger logr.Logger, config *adapters.CwaCon } } -func getReceiverServicePort(logger logr.Logger, serviceAddress string, receiverName string, protocol corev1.Protocol, servicePortsMap map[int32]corev1.ServicePort) { +func getReceiverServicePort(logger logr.Logger, serviceAddress string, receiverName string, protocol corev1.Protocol, servicePortsMap map[int32][]corev1.ServicePort) { if serviceAddress != "" { port, err := portFromEndpoint(serviceAddress) if err != nil { @@ -146,7 +150,7 @@ func getReceiverServicePort(logger logr.Logger, serviceAddress string, receiverN Port: port, Protocol: protocol, } - servicePortsMap[port] = sp + servicePortsMap[port] = []corev1.ServicePort{sp} } } } else { @@ -158,27 +162,33 @@ func getReceiverServicePort(logger logr.Logger, serviceAddress string, receiverN Port: receiverDefaultPortsMap[receiverName], Protocol: protocol, } - servicePortsMap[receiverDefaultPortsMap[receiverName]] = sp + servicePortsMap[receiverDefaultPortsMap[receiverName]] = []corev1.ServicePort{sp} } } } -func getLogsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32]corev1.ServicePort) { +func getLogsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32][]corev1.ServicePort) { //EMF - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Generation_CloudWatch_Agent.html if config.Logs != nil && config.Logs.LogMetricsCollected != nil && config.Logs.LogMetricsCollected.EMF != nil { if _, ok := servicePortsMap[receiverDefaultPortsMap[EMF]]; ok { logger.Info("Duplicate port has been configured in Agent Config for port", zap.Int32("port", receiverDefaultPortsMap[EMF])) } else { - sp := corev1.ServicePort{ - Name: EMF, - Port: receiverDefaultPortsMap[EMF], + tcp := corev1.ServicePort{ + Name: EMFTcp, + Port: receiverDefaultPortsMap[EMF], + Protocol: corev1.ProtocolTCP, + } + udp := corev1.ServicePort{ + Name: EMFUdp, + Port: receiverDefaultPortsMap[EMF], + Protocol: corev1.ProtocolUDP, } - servicePortsMap[receiverDefaultPortsMap[EMF]] = sp + servicePortsMap[receiverDefaultPortsMap[EMF]] = []corev1.ServicePort{tcp, udp} } } } -func getTracesReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32]corev1.ServicePort) []corev1.ServicePort { +func getTracesReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32][]corev1.ServicePort) []corev1.ServicePort { var tracesPorts []corev1.ServicePort if config.Traces == nil || config.Traces.TracesCollected == nil { @@ -203,8 +213,8 @@ func getTracesReceiversServicePorts(logger logr.Logger, config *adapters.CwaConf return tracesPorts } -func getAppSignalsServicePortsMap() map[int32]corev1.ServicePort { - servicePortMap := make(map[int32]corev1.ServicePort) +func getAppSignalsServicePortsMap() map[int32][]corev1.ServicePort { + servicePortMap := make(map[int32][]corev1.ServicePort) for k, v := range AppSignalsPortToServicePortMap { servicePortMap[k] = v } diff --git a/internal/manifests/collector/ports_test.go b/internal/manifests/collector/ports_test.go index 16a4fd0de..830d5ee58 100644 --- a/internal/manifests/collector/ports_test.go +++ b/internal/manifests/collector/ports_test.go @@ -75,15 +75,18 @@ func TestDefaultCollectDGetContainerPorts(t *testing.T) { func TestEMFGetContainerPorts(t *testing.T) { cfg := getJSONStringFromFile("./test-resources/emfAgentConfig.json") containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) - assert.Equal(t, 4, len(containerPorts)) + assert.Equal(t, 5, len(containerPorts)) assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name) assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort) assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name) - assert.Equal(t, int32(25888), containerPorts[EMF].ContainerPort) - assert.Equal(t, EMF, containerPorts[EMF].Name) + assert.Equal(t, int32(25888), containerPorts[EMFTcp].ContainerPort) + assert.Equal(t, EMFTcp, containerPorts[EMFTcp].Name) + assert.Equal(t, int32(25888), containerPorts[EMFUdp].ContainerPort) + assert.Equal(t, EMFUdp, containerPorts[EMFUdp].Name) + assert.Equal(t, corev1.ProtocolUDP, containerPorts[EMFUdp].Protocol) } func TestXrayAndOTLPGetContainerPorts(t *testing.T) { @@ -188,7 +191,7 @@ func TestMultipleReceiversGetContainerPorts(t *testing.T) { cfg := getJSONStringFromFile("./test-resources/multipleReceiversAgentConfig.json") strings.Replace(cfg, "2900", "2000", 1) containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{}) - assert.Equal(t, 10, len(containerPorts)) + assert.Equal(t, 11, len(containerPorts)) assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort) assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name) assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort) @@ -201,8 +204,11 @@ func TestMultipleReceiversGetContainerPorts(t *testing.T) { assert.Equal(t, int32(25936), containerPorts[CWA+CollectD].ContainerPort) assert.Equal(t, CWA+CollectD, containerPorts[CWA+CollectD].Name) assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+CollectD].Protocol) - assert.Equal(t, int32(25888), containerPorts[EMF].ContainerPort) - assert.Equal(t, EMF, containerPorts[EMF].Name) + assert.Equal(t, int32(25888), containerPorts[EMFTcp].ContainerPort) + assert.Equal(t, EMFTcp, containerPorts[EMFTcp].Name) + assert.Equal(t, int32(25888), containerPorts[EMFUdp].ContainerPort) + assert.Equal(t, EMFUdp, containerPorts[EMFUdp].Name) + assert.Equal(t, corev1.ProtocolUDP, containerPorts[EMFUdp].Protocol) assert.Equal(t, int32(2800), containerPorts[CWA+XrayTraces].ContainerPort) assert.Equal(t, CWA+XrayTraces, containerPorts[CWA+XrayTraces].Name) assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+XrayTraces].Protocol)