Skip to content

Commit 6a7ca28

Browse files
Merge pull request #296 from CybercentreCanada/persistent-service-update
Persistent service update
2 parents 0d01b05 + 02c00f7 commit 6a7ca28

File tree

2 files changed

+32
-12
lines changed

2 files changed

+32
-12
lines changed

assemblyline_core/scaler/scaler_server.py

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class ServiceProfile:
109109
This includes how the service should be run, and conditions related to the scaling of the service.
110110
"""
111111

112-
def __init__(self, name: str, container_config: DockerConfig, config_hash:int=0, min_instances:int=0, max_instances:int=None,
112+
def __init__(self, name: str, container_config: DockerConfig, config_blob:str='', min_instances:int=0, max_instances:int=None,
113113
growth: float = 600, shrink: Optional[float] = None, backlog:int=500, queue=None, shutdown_seconds:int=30):
114114
"""
115115
:param name: Name of the service to manage
@@ -127,7 +127,7 @@ def __init__(self, name: str, container_config: DockerConfig, config_hash:int=0,
127127
self.high_duty_cycle = 0.7
128128
self.low_duty_cycle = 0.5
129129
self.shutdown_seconds = shutdown_seconds
130-
self.config_hash = config_hash
130+
self.config_blob = config_blob
131131

132132
# How many instances we want, and can have
133133
self.min_instances: int = max(0, int(min_instances))
@@ -211,7 +211,7 @@ def __deepcopy__(self, memodict=None):
211211
prof = ServiceProfile(
212212
name=self.name,
213213
container_config=DockerConfig(self.container_config.as_primitives()),
214-
config_hash=self.config_hash,
214+
config_blob=self.config_blob,
215215
min_instances=self.min_instances,
216216
max_instances=self.max_instances,
217217
growth=self.growth_threshold,
@@ -378,7 +378,7 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig:
378378
# noinspection PyBroadException
379379
try:
380380
if service.enabled and (stage == ServiceStage.Off or name not in self.profiles):
381-
# Enable this service's dependencies
381+
# Enable this service's dependencies before trying to launch the service containers
382382
self.controller.prepare_network(service.name, service.docker_config.allow_internet_access)
383383
for _n, dependency in service.dependencies.items():
384384
dependency.container = prepare_container(dependency.container)
@@ -401,14 +401,22 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig:
401401

402402
# Check that all enabled services are enabled
403403
if service.enabled and stage == ServiceStage.Running:
404-
# Compute a hash of service properties not include in the docker config, that
404+
# Compute a blob of service properties not include in the docker config, that
405405
# should still result in a service being restarted when changed
406-
config_hash = hash(str(sorted(service.config.items())))
407-
config_hash = hash((config_hash, str(service.submission_params)))
406+
config_blob = str(sorted(service.config.items()))
407+
config_blob += str(service.submission_params)
408408

409409
# Build the docker config for the service, we are going to either create it or
410410
# update it so we need to know what the current configuration is either way
411411
docker_config = prepare_container(service.docker_config)
412+
config_blob += str(docker_config)
413+
414+
# Build the docker config for the dependencies.
415+
dependency_config = {}
416+
for _n, dependency in service.dependencies.items():
417+
dependency.container = prepare_container(dependency.container)
418+
dependency_config[_n] = dependency
419+
config_blob += str(sorted(dependency_config.items()))
412420

413421
# Add the service to the list of services being scaled
414422
with self.profiles_lock:
@@ -419,7 +427,7 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig:
419427
min_instances=default_settings.min_instances,
420428
growth=default_settings.growth,
421429
shrink=default_settings.shrink,
422-
config_hash=config_hash,
430+
config_blob=config_blob,
423431
backlog=default_settings.backlog,
424432
max_instances=service.licence_count,
425433
container_config=docker_config,
@@ -436,13 +444,25 @@ def prepare_container(docker_config: DockerConfig) -> DockerConfig:
436444
else:
437445
profile._max_instances = service.licence_count
438446

439-
if profile.container_config != docker_config or profile.config_hash != config_hash:
447+
if profile.config_blob != config_blob:
440448
self.log.info(f"Updating deployment information for {name}")
449+
# Update the dependencies. Should do nothing if container spec is the same.
450+
# let kubernetes decide if anything needs to change though.
451+
for _n, dependency in dependency_config.items():
452+
self.controller.start_stateful_container(
453+
service_name=service.name,
454+
container_name=_n,
455+
spec=dependency,
456+
labels={'dependency_for': service.name}
457+
)
458+
459+
# Update the service itself
441460
profile.container_config = docker_config
442-
profile.config_hash = config_hash
461+
profile.config_blob = config_blob
443462
self.controller.restart(profile)
444463
self.log.info(f"Deployment information for {name} replaced")
445464

465+
446466
except Exception:
447467
self.log.exception(f"Error applying service settings from: {service.name}")
448468
self.handle_service_error(service.name)

assemblyline_core/server_base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import sys
1313
import io
1414
import os
15-
from typing import cast, Callable, TYPE_CHECKING
15+
from typing import Callable, TYPE_CHECKING
1616

1717
from assemblyline.remote.datatypes import get_client
1818
from assemblyline.remote.datatypes.hash import Hash
@@ -208,7 +208,7 @@ def __init__(self, component_name: str, logger: logging.Logger = None,
208208
)
209209

210210
# Create a cached service data object, and access to the service status
211-
self.service_info = cast(dict[str, Service], forge.CachedObject(self._get_services))
211+
self.service_info: dict[str, Service] = forge.CachedObject(self._get_services)
212212
self._service_stage_hash = get_service_stage_hash(self.redis)
213213

214214
def _get_services(self):

0 commit comments

Comments
 (0)