Skip to content

Cleanup and sync statistics over IPC calls #388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions aikido_zen/background_process/cloud_connection_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from aikido_zen.storage.users import Users
from aikido_zen.storage.hostnames import Hostnames
from ..realtime.start_polling_for_changes import start_polling_for_changes
from ..statistics import Statistics
from ...storage.statistics import Statistics

# Import functions :
from .on_detected_attack import on_detected_attack
Expand Down Expand Up @@ -45,9 +45,7 @@
)
self.users = Users(1000)
self.packages = {}
self.statistics = Statistics(
max_perf_samples_in_mem=5000, max_compressed_stats_in_mem=100
)
self.statistics = Statistics()
self.middleware_installed = False

if isinstance(serverless, str) and len(serverless) == 0:
Expand All @@ -71,12 +69,8 @@
This is run 1m after startup, and checks if we should send out
a preliminary heartbeat with some stats.
"""
data_is_available = not (
self.statistics.is_empty() and len(self.routes.routes) <= 0
)
should_report_initial_stats = (
data_is_available and not self.conf.received_any_stats
)
data_present = not self.statistics.empty() or len(self.routes.routes) > 0
should_report_initial_stats = data_present and not self.conf.received_any_stats

Check warning on line 73 in aikido_zen/background_process/cloud_connection_manager/__init__.py

View check run for this annotation

Codecov / codecov/patch

aikido_zen/background_process/cloud_connection_manager/__init__.py#L72-L73

Added lines #L72 - L73 were not covered by tests
if should_report_initial_stats:
self.send_heartbeat()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from aikido_zen.storage.users import Users
from aikido_zen.storage.hostnames import Hostnames
from aikido_zen.ratelimiting.rate_limiter import RateLimiter
from aikido_zen.background_process.statistics import Statistics
from aikido_zen.storage.statistics import Statistics
from . import CloudConnectionManager


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
if not connection_manager.token:
return
logger.debug("Aikido CloudConnectionManager : Sending out heartbeat")
stats = connection_manager.statistics.get_stats()
stats = connection_manager.statistics.get_record()

Check warning on line 14 in aikido_zen/background_process/cloud_connection_manager/send_heartbeat.py

View check run for this annotation

Codecov / codecov/patch

aikido_zen/background_process/cloud_connection_manager/send_heartbeat.py#L14

Added line #L14 was not covered by tests
users = connection_manager.users.as_array()
routes = list(connection_manager.routes)
outgoing_domains = connection_manager.hostnames.as_array()

connection_manager.statistics.reset()
connection_manager.statistics.clear()

Check warning on line 19 in aikido_zen/background_process/cloud_connection_manager/send_heartbeat.py

View check run for this annotation

Codecov / codecov/patch

aikido_zen/background_process/cloud_connection_manager/send_heartbeat.py#L19

Added line #L19 was not covered by tests
connection_manager.users.clear()
connection_manager.routes.clear()
connection_manager.hostnames.clear()
Expand Down
2 changes: 0 additions & 2 deletions aikido_zen/background_process/commands/attack.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,3 @@ def process_attack(connection_manager, data, queue):
Expected data object : [injection_results, context, blocked_or_not, stacktrace]
"""
queue.put(data)
if connection_manager and connection_manager.statistics:
connection_manager.statistics.on_detected_attack(blocked=data[2])
12 changes: 0 additions & 12 deletions aikido_zen/background_process/commands/attack_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,6 @@ def test_process_attack_adds_data_to_queue():
assert queue.get() == data


def test_process_attack_statistics_called_when_enabled():
queue = Queue()
connection_manager = MockCloudConnectionManager()
data = ("injection_results", "context", True, "stacktrace") # Example data
process_attack(connection_manager, data, queue)

# Check if on_detected_attack was called
connection_manager.statistics.on_detected_attack.assert_called_once_with(
blocked=True
)


def test_process_attack_statistics_not_called_when_disabled():
queue = Queue()
connection_manager = MockCloudConnectionManager()
Expand Down
12 changes: 7 additions & 5 deletions aikido_zen/background_process/commands/sync_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ def process_sync_data(connection_manager, data, conn, queue=None):
"""
Synchronizes data between the thread-local cache (with a TTL of usually 1 minute) and the
background thread. Which data gets synced?
Thread -> BG Process : Hits, request statistics, api specs, hostnames
BG Process -> Thread : Routes, endpoints, bypasssed ip's, blocked users
Thread -> BG Process : Routes, Hostnames, Users, Stats & middleware installed
BG Process -> Thread : Routes and config
"""

# Sync routes
routes = connection_manager.routes
for route in data.get("current_routes", {}).values():
route_metadata = {"method": route["method"], "route": route["path"]}
Expand All @@ -25,9 +27,6 @@ def process_sync_data(connection_manager, data, conn, queue=None):
# Update API Spec :
update_route_info(route["apispec"], existing_route)

# Save request data :
connection_manager.statistics.requests["total"] += data.get("reqs", 0)

# Save middleware installed :
if data.get("middleware_installed", False):
connection_manager.middleware_installed = True
Expand All @@ -44,6 +43,9 @@ def process_sync_data(connection_manager, data, conn, queue=None):
for user_entry in data.get("users", list()):
connection_manager.users.add_user_from_entry(user_entry)

# Sync stats
connection_manager.statistics.import_from_record(data.get("stats", {}))

if connection_manager.conf.last_updated_at > 0:
# Only report data if the config has been fetched.
return {
Expand Down
73 changes: 65 additions & 8 deletions aikido_zen/background_process/commands/sync_data_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from aikido_zen.background_process.routes import Routes
from aikido_zen.helpers.iplist import IPList
from ...storage.hostnames import Hostnames
from ...storage.statistics import Statistics


@pytest.fixture
Expand All @@ -20,7 +21,7 @@ def setup_connection_manager():
connection_manager.conf.bypassed_ips.add("192.168.1.1")
connection_manager.conf.blocked_uids = ["user1", "user2"]
connection_manager.conf.last_updated_at = 200
connection_manager.statistics.requests = {"total": 0} # Initialize total requests
connection_manager.statistics = Statistics()
connection_manager.middleware_installed = False
return connection_manager

Expand All @@ -47,7 +48,18 @@ def test_process_sync_data_initialization(setup_connection_manager):
"apispec": {"info": "API spec for resource"},
},
},
"reqs": 10, # Total requests to be added
"stats": {
"startedAt": 1,
"endedAt": 1,
"requests": {
"total": 10,
"aborted": 0,
"attacksDetected": {
"total": 5,
"blocked": 0,
},
},
},
"middleware_installed": False,
"hostnames": test_hostnames.as_array(),
}
Expand All @@ -70,7 +82,11 @@ def test_process_sync_data_initialization(setup_connection_manager):
)

# Check that the total requests were updated
assert connection_manager.statistics.requests["total"] == 10
assert connection_manager.statistics.get_record()["requests"] == {
"aborted": 0,
"attacksDetected": {"blocked": 0, "total": 5},
"total": 10,
}

# Check that the return value is correct
assert result["routes"] == dict(connection_manager.routes.routes)
Expand Down Expand Up @@ -101,7 +117,18 @@ def test_process_sync_data_with_last_updated_at_below_zero(setup_connection_mana
"apispec": {"info": "API spec for resource"},
},
},
"reqs": 10, # Total requests to be added
"stats": {
"startedAt": 1,
"endedAt": 1,
"requests": {
"total": 10,
"aborted": 0,
"attacksDetected": {
"total": 5,
"blocked": 0,
},
},
},
"middleware_installed": True,
}

Expand All @@ -123,7 +150,11 @@ def test_process_sync_data_with_last_updated_at_below_zero(setup_connection_mana
)

# Check that the total requests were updated
assert connection_manager.statistics.requests["total"] == 10
assert connection_manager.statistics.get_record()["requests"] == {
"aborted": 0,
"attacksDetected": {"blocked": 0, "total": 5},
"total": 10,
}
assert connection_manager.middleware_installed == True
assert len(connection_manager.hostnames.as_array()) == 0
# Check that the return value is correct
Expand All @@ -150,7 +181,18 @@ def test_process_sync_data_existing_route_and_hostnames(setup_connection_manager
"apispec": {"info": "API spec for resource"},
}
},
"reqs": 5, # Total requests to be added
"stats": {
"startedAt": 1,
"endedAt": 1,
"requests": {
"total": 5,
"aborted": 0,
"attacksDetected": {
"total": 5,
"blocked": 0,
},
},
},
"hostnames": hostnames_sync.as_array(),
}

Expand All @@ -167,7 +209,18 @@ def test_process_sync_data_existing_route_and_hostnames(setup_connection_manager
"apispec": {"info": "Updated API spec for resource"},
}
},
"reqs": 15, # Additional requests to be added
"stats": {
"startedAt": 1,
"endedAt": 1,
"requests": {
"total": 15,
"aborted": 0,
"attacksDetected": {
"total": 5,
"blocked": 0,
},
},
},
}

result = process_sync_data(connection_manager, data_update, None)
Expand All @@ -181,7 +234,11 @@ def test_process_sync_data_existing_route_and_hostnames(setup_connection_manager
)

# Check that the total requests were updated
assert connection_manager.statistics.requests["total"] == 20 # 5 + 15
assert connection_manager.statistics.get_record()["requests"] == {
"aborted": 0,
"attacksDetected": {"blocked": 0, "total": 10},
"total": 20,
}
assert connection_manager.middleware_installed == False
assert connection_manager.hostnames.as_array() == [
{"hits": 215, "hostname": "example.com", "port": 443},
Expand Down
80 changes: 0 additions & 80 deletions aikido_zen/background_process/statistics/__init__.py

This file was deleted.

43 changes: 0 additions & 43 deletions aikido_zen/background_process/statistics/compress_perf_samples.py

This file was deleted.

Loading