Skip to content

Expose EMF TCP and UDP ports, and others, in the service #182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions apis/v1alpha2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 32 additions & 22 deletions internal/manifests/collector/ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
AppSignalsHttp = "appsignals-http"
AppSignalsProxy = "appsignals-xray"
EMF = "emf"
EMFTcp = "emf-tcp"
EMFUdp = "emf-udp"
CWA = "cwa-"
)

Expand All @@ -43,25 +45,27 @@ var receiverDefaultPortsMap = map[string]int32{
EMF: 25888,
}

var AppSignalsPortToServicePortMap = map[int32]corev1.ServicePort{
4315: {
var AppSignalsPortToServicePortMap = map[int32][]corev1.ServicePort{
4315: {{
Name: AppSignalsGrpc,
Port: 4315,
},
4316: {
}},
4316: {{
Name: AppSignalsHttp,
Port: 4316,
},
2000: {
}},
2000: {{
Name: AppSignalsProxy,
Port: 2000,
},
}},
}

func PortMapToServicePortList(portMap map[int32]corev1.ServicePort) []corev1.ServicePort {
func PortMapToServicePortList(portMap map[int32][]corev1.ServicePort) []corev1.ServicePort {
ports := make([]corev1.ServicePort, 0, len(portMap))
for _, p := range portMap {
ports = append(ports, p)
for _, plist := range portMap {
for _, p := range plist {
ports = append(ports, p)
}
}
sort.Slice(ports, func(i, j int) bool {
return ports[i].Name < ports[j].Name
Expand Down Expand Up @@ -118,7 +122,7 @@ func getServicePortsFromCWAgentConfig(logger logr.Logger, config *adapters.CwaCo
return PortMapToServicePortList(servicePortsMap)
}

func getMetricsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32]corev1.ServicePort) {
func getMetricsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32][]corev1.ServicePort) {
if config.Metrics == nil || config.Metrics.MetricsCollected == nil {
return
}
Expand All @@ -132,7 +136,7 @@ func getMetricsReceiversServicePorts(logger logr.Logger, config *adapters.CwaCon
}
}

func getReceiverServicePort(logger logr.Logger, serviceAddress string, receiverName string, protocol corev1.Protocol, servicePortsMap map[int32]corev1.ServicePort) {
func getReceiverServicePort(logger logr.Logger, serviceAddress string, receiverName string, protocol corev1.Protocol, servicePortsMap map[int32][]corev1.ServicePort) {
if serviceAddress != "" {
port, err := portFromEndpoint(serviceAddress)
if err != nil {
Expand All @@ -146,7 +150,7 @@ func getReceiverServicePort(logger logr.Logger, serviceAddress string, receiverN
Port: port,
Protocol: protocol,
}
servicePortsMap[port] = sp
servicePortsMap[port] = []corev1.ServicePort{sp}
}
}
} else {
Expand All @@ -158,27 +162,33 @@ func getReceiverServicePort(logger logr.Logger, serviceAddress string, receiverN
Port: receiverDefaultPortsMap[receiverName],
Protocol: protocol,
}
servicePortsMap[receiverDefaultPortsMap[receiverName]] = sp
servicePortsMap[receiverDefaultPortsMap[receiverName]] = []corev1.ServicePort{sp}
}
}
}

func getLogsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32]corev1.ServicePort) {
func getLogsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32][]corev1.ServicePort) {
//EMF - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Generation_CloudWatch_Agent.html
if config.Logs != nil && config.Logs.LogMetricsCollected != nil && config.Logs.LogMetricsCollected.EMF != nil {
if _, ok := servicePortsMap[receiverDefaultPortsMap[EMF]]; ok {
logger.Info("Duplicate port has been configured in Agent Config for port", zap.Int32("port", receiverDefaultPortsMap[EMF]))
} else {
sp := corev1.ServicePort{
Name: EMF,
Port: receiverDefaultPortsMap[EMF],
tcp := corev1.ServicePort{
Name: EMFTcp,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the port Name need to correspond to anything else? Docs say "When considering the endpoints for a Service, this must match the 'name' field in the EndpointPort." Does that apply for us?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spoke with @lisguo offline, we do not create Endpoints in the operator, so there's nothing to worry about here.

Port: receiverDefaultPortsMap[EMF],
Protocol: corev1.ProtocolTCP,
}
udp := corev1.ServicePort{
Name: EMFUdp,
Port: receiverDefaultPortsMap[EMF],
Protocol: corev1.ProtocolUDP,
}
servicePortsMap[receiverDefaultPortsMap[EMF]] = sp
servicePortsMap[receiverDefaultPortsMap[EMF]] = []corev1.ServicePort{tcp, udp}
}
}
}

func getTracesReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32]corev1.ServicePort) []corev1.ServicePort {
func getTracesReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32][]corev1.ServicePort) []corev1.ServicePort {
var tracesPorts []corev1.ServicePort

if config.Traces == nil || config.Traces.TracesCollected == nil {
Expand All @@ -203,8 +213,8 @@ func getTracesReceiversServicePorts(logger logr.Logger, config *adapters.CwaConf
return tracesPorts
}

func getAppSignalsServicePortsMap() map[int32]corev1.ServicePort {
servicePortMap := make(map[int32]corev1.ServicePort)
func getAppSignalsServicePortsMap() map[int32][]corev1.ServicePort {
servicePortMap := make(map[int32][]corev1.ServicePort)
for k, v := range AppSignalsPortToServicePortMap {
servicePortMap[k] = v
}
Expand Down
18 changes: 12 additions & 6 deletions internal/manifests/collector/ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,18 @@ func TestDefaultCollectDGetContainerPorts(t *testing.T) {
func TestEMFGetContainerPorts(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/emfAgentConfig.json")
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 4, len(containerPorts))
assert.Equal(t, 5, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
assert.Equal(t, AppSignalsHttp, containerPorts[AppSignalsHttp].Name)
assert.Equal(t, int32(2000), containerPorts[AppSignalsProxy].ContainerPort)
assert.Equal(t, AppSignalsProxy, containerPorts[AppSignalsProxy].Name)
assert.Equal(t, int32(25888), containerPorts[EMF].ContainerPort)
assert.Equal(t, EMF, containerPorts[EMF].Name)
assert.Equal(t, int32(25888), containerPorts[EMFTcp].ContainerPort)
assert.Equal(t, EMFTcp, containerPorts[EMFTcp].Name)
assert.Equal(t, int32(25888), containerPorts[EMFUdp].ContainerPort)
assert.Equal(t, EMFUdp, containerPorts[EMFUdp].Name)
assert.Equal(t, corev1.ProtocolUDP, containerPorts[EMFUdp].Protocol)
}

func TestXrayAndOTLPGetContainerPorts(t *testing.T) {
Expand Down Expand Up @@ -188,7 +191,7 @@ func TestMultipleReceiversGetContainerPorts(t *testing.T) {
cfg := getJSONStringFromFile("./test-resources/multipleReceiversAgentConfig.json")
strings.Replace(cfg, "2900", "2000", 1)
containerPorts := getContainerPorts(logger, cfg, []corev1.ServicePort{})
assert.Equal(t, 10, len(containerPorts))
assert.Equal(t, 11, len(containerPorts))
assert.Equal(t, int32(4315), containerPorts[AppSignalsGrpc].ContainerPort)
assert.Equal(t, AppSignalsGrpc, containerPorts[AppSignalsGrpc].Name)
assert.Equal(t, int32(4316), containerPorts[AppSignalsHttp].ContainerPort)
Expand All @@ -201,8 +204,11 @@ func TestMultipleReceiversGetContainerPorts(t *testing.T) {
assert.Equal(t, int32(25936), containerPorts[CWA+CollectD].ContainerPort)
assert.Equal(t, CWA+CollectD, containerPorts[CWA+CollectD].Name)
assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+CollectD].Protocol)
assert.Equal(t, int32(25888), containerPorts[EMF].ContainerPort)
assert.Equal(t, EMF, containerPorts[EMF].Name)
assert.Equal(t, int32(25888), containerPorts[EMFTcp].ContainerPort)
assert.Equal(t, EMFTcp, containerPorts[EMFTcp].Name)
assert.Equal(t, int32(25888), containerPorts[EMFUdp].ContainerPort)
assert.Equal(t, EMFUdp, containerPorts[EMFUdp].Name)
assert.Equal(t, corev1.ProtocolUDP, containerPorts[EMFUdp].Protocol)
assert.Equal(t, int32(2800), containerPorts[CWA+XrayTraces].ContainerPort)
assert.Equal(t, CWA+XrayTraces, containerPorts[CWA+XrayTraces].Name)
assert.Equal(t, corev1.ProtocolUDP, containerPorts[CWA+XrayTraces].Protocol)
Expand Down
Loading