|
17 | 17 | V1ConfigMapVolumeSource, V1Secret, V1LocalObjectReference
|
18 | 18 | from kubernetes import client, config
|
19 | 19 | from kubernetes.client.rest import ApiException
|
| 20 | +from assemblyline.odm.messages.changes import Operation |
20 | 21 |
|
21 | 22 | from assemblyline.odm.models.service import DockerConfig
|
| 23 | +from assemblyline.remote.datatypes.events import EventSender |
22 | 24 | from assemblyline.remote.datatypes.hash import Hash
|
23 | 25 | from assemblyline_core.scaler.controllers.kubernetes_ctl import create_docker_auth_config
|
24 | 26 | from assemblyline_core.server_base import CoreBase
|
@@ -370,6 +372,7 @@ def __init__(self, redis_persist=None, redis=None, logger=None, datastore=None):
|
370 | 372 |
|
371 | 373 | self.container_update: Hash[dict[str, Any]] = Hash('container-update', self.redis_persist)
|
372 | 374 | self.latest_service_tags: Hash[dict[str, str]] = Hash('service-tags', self.redis_persist)
|
| 375 | + self.service_events = EventSender('changes.services', host=self.redis) |
373 | 376 |
|
374 | 377 | # Prepare a single threaded scheduler
|
375 | 378 | self.scheduler = sched.scheduler()
|
@@ -431,6 +434,10 @@ def container_updates(self):
|
431 | 434 | operations = [(self.datastore.service_delta.UPDATE_SET, 'version', latest_tag)]
|
432 | 435 | if self.datastore.service_delta.update(service_name, operations):
|
433 | 436 | # Update completed, cleanup
|
| 437 | + self.service_events.send(service_name, { |
| 438 | + 'operation': Operation.Modified, |
| 439 | + 'name': service_name |
| 440 | + }) |
434 | 441 | self.log.info(f"Service {service_name} update successful!")
|
435 | 442 | else:
|
436 | 443 | self.log.error(f"Service {service_name} has failed to update because it cannot set "
|
|
0 commit comments