Skip to content

Sync users using process cache, instead of IPC #382

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .update_firewall_lists import update_firewall_lists
from ..api.http_api import ReportingApiHTTP
from ..service_config import ServiceConfig
from ..users import Users
from aikido_zen.storage.users import Users
from aikido_zen.storage.hostnames import Hostnames
from ..realtime.start_polling_for_changes import start_polling_for_changes
from ..statistics import Statistics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from aikido_zen.helpers.token import Token
from aikido_zen.background_process.api.http_api import ReportingApiHTTP
from aikido_zen.background_process.service_config import ServiceConfig
from aikido_zen.background_process.users import Users
from aikido_zen.storage.users import Users
from aikido_zen.storage.hostnames import Hostnames
from aikido_zen.ratelimiting.rate_limiter import RateLimiter
from aikido_zen.background_process.statistics import Statistics
Expand Down
2 changes: 0 additions & 2 deletions aikido_zen/background_process/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from aikido_zen.helpers.logging import logger
from .attack import process_attack
from .read_property import process_read_property
from .user import process_user
from .should_ratelimit import process_should_ratelimit
from .ping import process_ping
from .sync_data import process_sync_data
Expand All @@ -12,7 +11,6 @@
# This maps to a tuple : (function, returns_data?)
# Commands that don't return data :
"ATTACK": (process_attack, False),
"USER": (process_user, False),
# Commands that return data :
"SYNC_DATA": (process_sync_data, True),
"READ_PROPERTY": (process_read_property, True),
Expand Down
4 changes: 4 additions & 0 deletions aikido_zen/background_process/commands/sync_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
hostnames_entry["hits"],
)

# Sync users
for user_entry in data.get("users", list()):
connection_manager.users.add_user_from_entry(user_entry)

Check warning on line 45 in aikido_zen/background_process/commands/sync_data.py

View check run for this annotation

Codecov / codecov/patch

aikido_zen/background_process/commands/sync_data.py#L45

Added line #L45 was not covered by tests

if connection_manager.conf.last_updated_at > 0:
# Only report data if the config has been fetched.
return {
Expand Down
7 changes: 0 additions & 7 deletions aikido_zen/background_process/commands/user.py

This file was deleted.

30 changes: 0 additions & 30 deletions aikido_zen/background_process/commands/user_test.py

This file was deleted.

59 changes: 0 additions & 59 deletions aikido_zen/background_process/users.py

This file was deleted.

71 changes: 0 additions & 71 deletions aikido_zen/background_process/users_test.py

This file was deleted.

13 changes: 10 additions & 3 deletions aikido_zen/context/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
"""

from aikido_zen.helpers.logging import logger
from aikido_zen.background_process import get_comms
from . import get_current_context
import aikido_zen.thread.thread_cache as thread_cache
import aikido_zen.helpers.get_current_unixtime_ms as t


def set_user(user):
Expand All @@ -30,8 +31,14 @@ def set_user(user):
context.user = validated_user

# Send validated_user object to background process :
if get_comms():
get_comms().send_data_to_bg_process("USER", validated_user)
cache = thread_cache.get_cache()
if cache:
cache.users.add_user(
user_id=validated_user["id"],
user_name=validated_user["name"],
user_ip=validated_user["lastIpAddress"],
current_time=t.get_unixtime_ms(),
)


def validate_user(user):
Expand Down
53 changes: 53 additions & 0 deletions aikido_zen/context/users_test.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import time

import pytest

from . import current_context, Context
from .users import validate_user, set_user
from .. import should_block_request
from ..thread.thread_cache import get_cache


@pytest.fixture(autouse=True)
def run_around_tests():
get_cache().reset()
yield
# Make sure to reset context and cache after every test so it does not
# interfere with other tests
current_context.set(None)
get_cache().reset()


def set_context_and_lifecycle():
Expand Down Expand Up @@ -115,6 +120,13 @@ def test_set_valid_user():
"lastIpAddress": "198.51.100.23",
}

assert len(get_cache().users.as_array()) == 1
user_1 = get_cache().users.as_array()[0]
assert user_1["id"] == "456"
assert user_1["lastIpAddress"] == "198.51.100.23"
assert user_1["name"] == "Bob"
assert user_1["firstSeenAt"] == user_1["lastSeenAt"]


def test_re_set_valid_user():
context1 = set_context_and_lifecycle()
Expand All @@ -128,6 +140,12 @@ def test_re_set_valid_user():
"name": "Bob",
"lastIpAddress": "198.51.100.23",
}
assert len(get_cache().users.as_array()) == 1
user_1 = get_cache().users.as_array()[0]
assert user_1["id"] == "456"
assert user_1["lastIpAddress"] == "198.51.100.23"
assert user_1["name"] == "Bob"
assert user_1["firstSeenAt"] == user_1["lastSeenAt"]

user = {"id": "1000", "name": "Alice"}
set_user(user)
Expand All @@ -138,6 +156,14 @@ def test_re_set_valid_user():
"lastIpAddress": "198.51.100.23",
}

assert len(get_cache().users.as_array()) == 2
assert get_cache().users.as_array()[0] == user_1
user_2 = get_cache().users.as_array()[1]
assert user_2["id"] == "1000"
assert user_2["lastIpAddress"] == "198.51.100.23"
assert user_2["name"] == "Alice"
assert user_2["firstSeenAt"] == user_1["lastSeenAt"]


def test_after_middleware(caplog):
context1 = set_context_and_lifecycle()
Expand All @@ -154,3 +180,30 @@ def test_after_middleware(caplog):
"name": "Bob",
"lastIpAddress": "198.51.100.23",
}


def test_set_valid_user_twice():
context1 = set_context_and_lifecycle()
assert context1.user is None

user = {"id": 456, "name": "Bob"}
set_user(user)

assert context1.user == {
"id": "456",
"name": "Bob",
"lastIpAddress": "198.51.100.23",
}
assert len(get_cache().users.as_array()) == 1
first_seen_at = int(get_cache().users.as_array()[0]["firstSeenAt"])

time.sleep(1)
set_user(user) # 2nd time

assert len(get_cache().users.as_array()) == 1
user_1 = get_cache().users.as_array()[0]
assert user_1["id"] == "456"
assert user_1["lastIpAddress"] == "198.51.100.23"
assert user_1["name"] == "Bob"
assert user_1["firstSeenAt"] != user_1["lastSeenAt"]
assert user_1["firstSeenAt"] == first_seen_at
42 changes: 42 additions & 0 deletions aikido_zen/storage/users.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
class Users:
def __init__(self, max_entries=1000):
self.max_entries = max_entries
self.users = {}

def add_user(self, user_id, user_name, user_ip, current_time):
self.ensure_max_entries()

first_seen_at = current_time
if self.users.get(user_id):
# Use the first_seen_at timestamp of the existing user
first_seen_at = self.users.get(user_id).get("firstSeenAt")

self.users[user_id] = {
"id": user_id,
"name": user_name,
"lastIpAddress": user_ip,
"firstSeenAt": first_seen_at,
"lastSeenAt": current_time,
}

def add_user_from_entry(self, user_entry):
self.ensure_max_entries()

existing_user = self.users.get(user_entry["id"])
if existing_user:
# Use the firstSeenAt timestamp of the existing user
user_entry["firstSeenAt"] = existing_user["firstSeenAt"]

self.users[user_entry["id"]] = user_entry

def ensure_max_entries(self):
if len(self.users) >= self.max_entries:
# Remove the first added user (FIFO)
first_added_key = next(iter(self.users))
del self.users[first_added_key]

def as_array(self):
return [dict(user) for user in self.users.values()]

def clear(self):
self.users.clear()
Loading
Loading