Skip to content

Commit 81fd844

Browse files
Integration tests keep connections in batches (#2101)
1 parent f4a2d3c commit 81fd844

File tree

4 files changed

+166
-73
lines changed

4 files changed

+166
-73
lines changed

integration-tests/pkg/mock_sensor/expect_conn.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,48 @@ func (s *MockSensor) ExpectSameElementsConnections(t *testing.T, containerID str
113113
}
114114
}
115115

116+
func (s *MockSensor) ExpectSameElementsConnectionsScrapes(t *testing.T, containerID string, timeout time.Duration, expected []types.NetworkInfoBatch) bool {
117+
equal := func(c1, c2 types.NetworkInfoBatch) bool {
118+
if len(c1) != len(c2) {
119+
return false
120+
}
121+
122+
types.SortConnections(c1)
123+
types.SortConnections(c2)
124+
125+
for i := range c2 {
126+
if !c1[i].Equal(c2[i]) {
127+
return false
128+
}
129+
}
130+
131+
return true
132+
}
133+
134+
connections := s.GetConnectionsInBatches(containerID)
135+
if collectorAssert.ElementsMatchFunc(expected, connections, equal) {
136+
return true
137+
}
138+
139+
timer := time.After(timeout)
140+
141+
for {
142+
select {
143+
case <-timer:
144+
connections := s.GetConnectionsInBatches(containerID)
145+
return collectorAssert.AssertElementsMatchFunc(t, expected, connections, equal)
146+
case conn := <-s.LiveConnections():
147+
if conn.GetContainerId() != containerID {
148+
continue
149+
}
150+
connections := s.GetConnectionsInBatches(containerID)
151+
if collectorAssert.ElementsMatchFunc(expected, connections, equal) {
152+
return true
153+
}
154+
}
155+
}
156+
}
157+
116158
// ExpectEndpoints waits up to the timeout for the gRPC server to receive
117159
// the list of expected Endpoints. It will first check to see if the endpoints
118160
// have been received already, and then monitor the live feed of endpoints

integration-tests/pkg/mock_sensor/server.go

Lines changed: 71 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"time"
1212

1313
sensorAPI "github.com/stackrox/rox/generated/internalapi/sensor"
14-
utils "github.com/stackrox/rox/pkg/net"
1514

1615
"github.com/stackrox/rox/generated/storage"
1716
"google.golang.org/grpc"
@@ -47,7 +46,7 @@ type MockSensor struct {
4746
processLineages map[string]LineageMap
4847
processMutex sync.Mutex
4948

50-
connections map[string][]types.NetworkInfo
49+
connections map[string][]types.NetworkInfoBatch
5150
endpoints map[string]EndpointMap
5251
networkMutex sync.Mutex
5352

@@ -65,7 +64,7 @@ func NewMockSensor(test string) *MockSensor {
6564
testName: test,
6665
processes: make(map[string]ProcessMap),
6766
processLineages: make(map[string]LineageMap),
68-
connections: make(map[string][]types.NetworkInfo),
67+
connections: make(map[string][]types.NetworkInfoBatch),
6968
endpoints: make(map[string]EndpointMap),
7069
}
7170
}
@@ -150,26 +149,48 @@ func (m *MockSensor) LiveConnections() <-chan *sensorAPI.NetworkConnection {
150149

151150
// Connections returns a list of all connections that have been received for
152151
// a given container ID
153-
func (m *MockSensor) Connections(containerID string) []types.NetworkInfo {
152+
func (m *MockSensor) GetConnectionsInBatches(containerID string) []types.NetworkInfoBatch {
154153
m.networkMutex.Lock()
155154
defer m.networkMutex.Unlock()
156155

157156
if connections, ok := m.connections[containerID]; ok {
158-
conns := make([]types.NetworkInfo, len(connections))
157+
conns := make([]types.NetworkInfoBatch, len(connections))
159158
copy(conns, connections)
160-
types.SortConnections(conns)
159+
for _, conn := range conns {
160+
types.SortConnections(conn)
161+
}
162+
161163
return conns
162164
}
165+
return make([]types.NetworkInfoBatch, 0)
166+
}
167+
168+
// Connections returns a list of all connections that have been received for
169+
// a given container ID
170+
func (m *MockSensor) Connections(containerID string) []types.NetworkInfo {
171+
m.networkMutex.Lock()
172+
defer m.networkMutex.Unlock()
173+
174+
allConns := make([]types.NetworkInfo, 0)
175+
if connections, ok := m.connections[containerID]; ok {
176+
conns := make([]types.NetworkInfoBatch, len(connections))
177+
copy(conns, connections)
178+
for _, conn := range conns {
179+
allConns = append(allConns, conn...)
180+
}
181+
182+
types.SortConnections(allConns)
183+
184+
return allConns
185+
}
163186
return make([]types.NetworkInfo, 0)
164187
}
165188

166189
// HasConnection returns whether a given connection has been seen for a given
167190
// container ID
168191
func (m *MockSensor) HasConnection(containerID string, conn types.NetworkInfo) bool {
169-
m.networkMutex.Lock()
170-
defer m.networkMutex.Unlock()
171-
172-
if conns, ok := m.connections[containerID]; ok {
192+
conns := m.Connections(containerID)
193+
if len(conns) > 0 {
173194
return slices.ContainsFunc(conns, func(c types.NetworkInfo) bool {
174195
return c.Equal(conn)
175196
})
@@ -271,7 +292,7 @@ func (m *MockSensor) Stop() {
271292

272293
m.processes = make(map[string]ProcessMap)
273294
m.processLineages = make(map[string]LineageMap)
274-
m.connections = make(map[string][]types.NetworkInfo)
295+
m.connections = make(map[string][]types.NetworkInfoBatch)
275296
m.endpoints = make(map[string]EndpointMap)
276297

277298
m.processChannel.Stop()
@@ -327,6 +348,36 @@ func (m *MockSensor) PushSignals(stream sensorAPI.SignalService_PushSignalsServe
327348
}
328349
}
329350

351+
func (m *MockSensor) convertConnection(connection *sensorAPI.NetworkConnection) types.NetworkInfo {
352+
conn := types.NetworkInfo{
353+
LocalAddress: types.TranslateAddress(connection.LocalAddress),
354+
RemoteAddress: types.TranslateAddress(connection.RemoteAddress),
355+
Role: connection.GetRole().String(),
356+
SocketFamily: connection.GetSocketFamily().String(),
357+
CloseTimestamp: connection.GetCloseTimestamp().String(),
358+
}
359+
360+
m.logger.Printf("NetworkInfo: %s, %s\n", connection.GetContainerId(), conn)
361+
362+
return conn
363+
}
364+
365+
func (m *MockSensor) convertToContainerConnsMap(connections []*sensorAPI.NetworkConnection) map[string][]types.NetworkInfo {
366+
containerConnsMap := make(map[string][]types.NetworkInfo)
367+
for _, connection := range connections {
368+
conn := m.convertConnection(connection)
369+
containerID := connection.GetContainerId()
370+
371+
if c, ok := containerConnsMap[containerID]; ok {
372+
containerConnsMap[containerID] = append(c, conn)
373+
} else {
374+
containerConnsMap[containerID] = []types.NetworkInfo{conn}
375+
}
376+
}
377+
378+
return containerConnsMap
379+
}
380+
330381
// PushNetworkConnectionInfo conforms to the Sensor API. It is here that networking
331382
// events (connections and endpoints) are handled and stored/sent to the relevant channel
332383
func (m *MockSensor) PushNetworkConnectionInfo(stream sensorAPI.NetworkConnectionInfoService_PushNetworkConnectionInfoServer) error {
@@ -345,8 +396,9 @@ func (m *MockSensor) PushNetworkConnectionInfo(stream sensorAPI.NetworkConnectio
345396
m.endpointChannel.Write(endpoint)
346397
}
347398

399+
containerConnsMap := m.convertToContainerConnsMap(connections)
400+
m.pushConnections(containerConnsMap)
348401
for _, connection := range connections {
349-
m.pushConnection(connection.GetContainerId(), connection)
350402
m.connectionChannel.Write(connection)
351403
}
352404
}
@@ -410,32 +462,16 @@ func (m *MockSensor) pushLineage(containerID string, process *storage.ProcessSig
410462
}
411463
}
412464

413-
// pushConnection converts a connection event into the test's own structure
414-
// and stores it
415-
func (m *MockSensor) pushConnection(containerID string, connection *sensorAPI.NetworkConnection) {
465+
func (m *MockSensor) pushConnections(containerConnsMap map[string][]types.NetworkInfo) {
416466
m.networkMutex.Lock()
417467
defer m.networkMutex.Unlock()
418468

419-
m.logger.Printf("NetworkInfo: %s %s|%s|%s|%s|%s\n",
420-
connection.GetContainerId(),
421-
m.translateAddress(connection.GetLocalAddress()),
422-
m.translateAddress(connection.GetRemoteAddress()),
423-
connection.GetRole().String(),
424-
connection.GetSocketFamily().String(),
425-
connection.GetCloseTimestamp().String())
426-
427-
conn := types.NetworkInfo{
428-
LocalAddress: m.translateAddress(connection.LocalAddress),
429-
RemoteAddress: m.translateAddress(connection.RemoteAddress),
430-
Role: connection.GetRole().String(),
431-
SocketFamily: connection.GetSocketFamily().String(),
432-
CloseTimestamp: connection.GetCloseTimestamp().String(),
433-
}
434-
435-
if c, ok := m.connections[containerID]; ok {
436-
m.connections[containerID] = append(c, conn)
437-
} else {
438-
m.connections[containerID] = []types.NetworkInfo{conn}
469+
for containerID, connections := range containerConnsMap {
470+
if c, ok := m.connections[containerID]; ok {
471+
m.connections[containerID] = append(c, connections)
472+
} else {
473+
m.connections[containerID] = []types.NetworkInfoBatch{connections}
474+
}
439475
}
440476
}
441477

@@ -485,36 +521,6 @@ func (m *MockSensor) pushEndpoint(containerID string, endpoint *sensorAPI.Networ
485521
}
486522
}
487523

488-
// translateAddress is a helper function for converting binary representations
489-
// of network addresses (in the signals) to usable forms for testing
490-
func (m *MockSensor) translateAddress(addr *sensorAPI.NetworkAddress) string {
491-
peerId := utils.NetworkPeerID{Port: uint16(addr.GetPort())}
492-
addressData := addr.GetAddressData()
493-
if len(addressData) > 0 {
494-
peerId.Address = utils.IPFromBytes(addressData)
495-
return peerId.String()
496-
}
497-
498-
// If there is no address data, this is either the source address or
499-
// IpNetwork should be set and represent a CIDR block or external IP address.
500-
ipNetworkData := addr.GetIpNetwork()
501-
if len(ipNetworkData) == 0 {
502-
return peerId.String()
503-
}
504-
505-
ipNetwork := utils.IPNetworkFromCIDRBytes(ipNetworkData)
506-
prefixLen := ipNetwork.PrefixLen()
507-
// If this is IPv4 and the prefix length is 32 or this is IPv6 and the prefix length
508-
// is 128 this is a regular IP address and not a CIDR block
509-
if (ipNetwork.Family() == utils.IPv4 && prefixLen == byte(32)) ||
510-
(ipNetwork.Family() == utils.IPv6 && prefixLen == byte(128)) {
511-
peerId.Address = ipNetwork.IP()
512-
} else {
513-
peerId.IPNetwork = ipNetwork
514-
}
515-
return peerId.String()
516-
}
517-
518524
func (m *MockSensor) SetTestName(testName string) {
519525
m.testName = testName
520526
}

integration-tests/pkg/types/network.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package types
22

33
import (
4+
"fmt"
45
"sort"
6+
7+
sensorAPI "github.com/stackrox/rox/generated/internalapi/sensor"
8+
utils "github.com/stackrox/rox/pkg/net"
59
)
610

711
const (
@@ -16,6 +20,47 @@ type NetworkInfo struct {
1620
CloseTimestamp string
1721
}
1822

23+
type NetworkInfoBatch []NetworkInfo
24+
25+
// TranslateAddress is a helper function for converting binary representations
26+
// of network addresses (in the signals) to usable forms for testing
27+
func TranslateAddress(addr *sensorAPI.NetworkAddress) string {
28+
peerId := utils.NetworkPeerID{Port: uint16(addr.GetPort())}
29+
addressData := addr.GetAddressData()
30+
if len(addressData) > 0 {
31+
peerId.Address = utils.IPFromBytes(addressData)
32+
return peerId.String()
33+
}
34+
35+
// If there is no address data, this is either the source address or
36+
// IpNetwork should be set and represent a CIDR block or external IP address.
37+
ipNetworkData := addr.GetIpNetwork()
38+
if len(ipNetworkData) == 0 {
39+
return peerId.String()
40+
}
41+
42+
ipNetwork := utils.IPNetworkFromCIDRBytes(ipNetworkData)
43+
prefixLen := ipNetwork.PrefixLen()
44+
// If this is IPv4 and the prefix length is 32 or this is IPv6 and the prefix length
45+
// is 128 this is a regular IP address and not a CIDR block
46+
if (ipNetwork.Family() == utils.IPv4 && prefixLen == byte(32)) ||
47+
(ipNetwork.Family() == utils.IPv6 && prefixLen == byte(128)) {
48+
peerId.Address = ipNetwork.IP()
49+
} else {
50+
peerId.IPNetwork = ipNetwork
51+
}
52+
return peerId.String()
53+
}
54+
55+
func (n *NetworkInfo) String() string {
56+
return fmt.Sprintf("%s|%s|%s|%s|%s",
57+
n.LocalAddress,
58+
n.RemoteAddress,
59+
n.Role,
60+
n.SocketFamily,
61+
n.CloseTimestamp)
62+
}
63+
1964
func (n *NetworkInfo) IsActive() bool {
2065
// no close timestamp means the connection is open, and active
2166
return n.CloseTimestamp == NilTimestamp

integration-tests/suites/runtime_config_file.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -116,32 +116,32 @@ func (s *RuntimeConfigFileTestSuite) TestRuntimeConfigFileEnable() {
116116
// Default configuration is external IPs disabled.
117117
// We expect normalized connections.
118118
assert.AssertNoRuntimeConfig(s.T(), collectorIP)
119-
expectedConnections := []types.NetworkInfo{activeNormalizedConnection}
120-
connectionSuccess := s.Sensor().ExpectSameElementsConnections(s.T(), s.ClientContainer, 10*time.Second, expectedConnections...)
119+
expectedConnections := []types.NetworkInfoBatch{[]types.NetworkInfo{activeNormalizedConnection}}
120+
connectionSuccess := s.Sensor().ExpectSameElementsConnectionsScrapes(s.T(), s.ClientContainer, 10*time.Second, expectedConnections)
121121
s.Require().True(connectionSuccess)
122122

123123
// External IPs enabled.
124124
// Normalized connection must be reported as inactive
125125
// Unnormalized connection will now be reported.
126126
s.setExternalIpsEnabled(runtimeConfigFile, "ENABLED")
127127
assert.AssertExternalIps(s.T(), "ENABLED", collectorIP)
128-
expectedConnections = append(expectedConnections, activeUnnormalizedConnection, inactiveNormalizedConnection)
129-
connectionSuccess = s.Sensor().ExpectSameElementsConnections(s.T(), s.ClientContainer, 10*time.Second, expectedConnections...)
128+
expectedConnections = append(expectedConnections, []types.NetworkInfo{activeUnnormalizedConnection, inactiveNormalizedConnection})
129+
connectionSuccess = s.Sensor().ExpectSameElementsConnectionsScrapes(s.T(), s.ClientContainer, 10*time.Second, expectedConnections)
130130
s.Require().True(connectionSuccess)
131131

132132
// The runtime config file is deleted. This disables external IPs. The normalized connection should be active
133133
// and the unnormalized connection shoul be inactive.
134134
s.deleteFile(runtimeConfigFile)
135135
assert.AssertNoRuntimeConfig(s.T(), collectorIP)
136-
expectedConnections = append(expectedConnections, activeNormalizedConnection, inactiveUnnormalizedConnection)
137-
connectionSuccess = s.Sensor().ExpectSameElementsConnections(s.T(), s.ClientContainer, 10*time.Second, expectedConnections...)
136+
expectedConnections = append(expectedConnections, []types.NetworkInfo{activeNormalizedConnection, inactiveUnnormalizedConnection})
137+
connectionSuccess = s.Sensor().ExpectSameElementsConnectionsScrapes(s.T(), s.ClientContainer, 10*time.Second, expectedConnections)
138138
s.Require().True(connectionSuccess)
139139

140140
// Back to having external IPs enabled.
141141
s.setExternalIpsEnabled(runtimeConfigFile, "ENABLED")
142142
assert.AssertExternalIps(s.T(), "ENABLED", collectorIP)
143-
expectedConnections = append(expectedConnections, activeUnnormalizedConnection, inactiveNormalizedConnection)
144-
connectionSuccess = s.Sensor().ExpectSameElementsConnections(s.T(), s.ClientContainer, 10*time.Second, expectedConnections...)
143+
expectedConnections = append(expectedConnections, []types.NetworkInfo{activeUnnormalizedConnection, inactiveNormalizedConnection})
144+
connectionSuccess = s.Sensor().ExpectSameElementsConnectionsScrapes(s.T(), s.ClientContainer, 10*time.Second, expectedConnections)
145145
s.Require().True(connectionSuccess)
146146
}
147147

0 commit comments

Comments
 (0)