diff --git a/.dockerignore b/.dockerignore index dc23bcb7..96db15ff 100644 --- a/.dockerignore +++ b/.dockerignore @@ -19,3 +19,5 @@ htmlcov/ .cache/ Dockerfile docker-compose.yml +*.charm +*.rock diff --git a/.github/workflows/playwright.yml b/.github/workflows/playwright.yml index ef27afa4..696637fa 100644 --- a/.github/workflows/playwright.yml +++ b/.github/workflows/playwright.yml @@ -60,6 +60,8 @@ jobs: docker run \ -p 8104:8104 \ -e SECRET_KEY="$SECRET_KEY" \ + -e REDIS_HOST=localhost \ + -e REDIS_PORT=6379 \ -e GH_TOKEN="$GH_TOKEN" \ -e REPO_ORG="$REPO_ORG" \ -e DATABASE_URL="$DATABASE_URL" \ diff --git a/bin/act b/bin/act deleted file mode 100755 index 62aaa450..00000000 Binary files a/bin/act and /dev/null differ diff --git a/entrypoint b/entrypoint old mode 100644 new mode 100755 index db49bea3..df89abbd --- a/entrypoint +++ b/entrypoint @@ -17,6 +17,10 @@ activate() { { activate + # Provision database + # === + flask --app webapp.app db upgrade + RUN_COMMAND="gunicorn webapp.app:app --name $(hostname) --workers=2 --bind $1" if [ -z ${FLASK_DEBUG+x} ]; then diff --git a/webapp/__init__.py b/webapp/__init__.py index 3d175ba3..4d903e15 100644 --- a/webapp/__init__.py +++ b/webapp/__init__.py @@ -32,15 +32,15 @@ def create_app(): app.context_processor(base_context) + # Initialize cache + init_cache(app) + # Initialize database init_db(app) # Initialize SSO init_sso(app) - # Initialize cache - init_cache(app) - # Initialize JIRA init_jira(app) diff --git a/webapp/app.py b/webapp/app.py index 8ab6d939..d6016603 100644 --- a/webapp/app.py +++ b/webapp/app.py @@ -17,7 +17,7 @@ celery_app = init_celery(app) # Initialize scheduled tasks -init_scheduled_tasks() +init_scheduled_tasks(app) # Server-side routes diff --git a/webapp/cache.py b/webapp/cache.py index 99e101f0..b686d89e 100644 --- a/webapp/cache.py +++ b/webapp/cache.py @@ -47,15 +47,12 @@ def connect(self, app): Return an instance of the redis cache. If not available, throw a ConnectionError. """ - self.logger.info("Connecting to Redis cache.") - if url := os.environ.get("REDIS_DB_CONNECT_STRING"): + if app.config.get("REDIS_HOST"): + self.logger.info("Connecting to Redis cache.") + url = app.config.get("REDIS_DB_CONNECT_STRING") r = redis.from_url(url) - else: - host = app.config["REDIS_HOST"] - port = app.config["REDIS_PORT"] - r = redis.Redis(host=host, port=port, db=0) - r.ping() - return r + r.ping() + return r def __get_prefixed_key__(self, key: str): return f"{self.CACHE_PREFIX}_{key}" @@ -152,12 +149,13 @@ def load_from_file(self, key: str): """ Load the JSON data from a file and return the python object. """ + file_path = Path(f"{self.cache_path}/{key}") # Check if the file exists - if not Path(self.cache_path + "/" + key).exists(): + if not file_path.exists(): return None - with open(self.cache_path + "/" + key) as f: + with file_path.open("r") as f: data = f.read() - return json.loads(data) + return json.loads(data) def __get_prefixed_key__(self, key: str): return f"{self.CACHE_PREFIX}_{key}" @@ -180,11 +178,11 @@ def onerror(*args, **kwargs): def init_cache(app: Flask) -> Cache: - try: + if app.config.get("REDIS_HOST"): cache = RedisCache(app) - except Exception as e: + else: cache = FileCache(app) - msg = f"Error: {e} Redis cache is not available." + msg = "Redis cache is not available." " Using FileCache instead." app.logger.info(msg) app.config["CACHE"] = cache diff --git a/webapp/celery.py b/webapp/celery.py index b337626b..4b3f4a6d 100644 --- a/webapp/celery.py +++ b/webapp/celery.py @@ -3,7 +3,6 @@ from celery import Celery, Task from celery.app import Proxy -from celery.schedules import crontab from celery.utils.log import get_task_logger from flask import Flask @@ -20,65 +19,57 @@ def register_celery_task( fn: Callable | None, celery_app: Proxy, ) -> CeleryTask: - """ - Register a celery task. - """ + """Register a celery task.""" fn = celery_app.task()(fn) return fn def run_celery_task( - fn: Callable | None, + fn: Callable, delay: int | None, celery_app: Proxy, args: tuple, kwargs: dict, -) -> CeleryTask: - """ - Run a registered celery task. - """ - fn = register_celery_task(fn, celery_app) - - def _setup_periodic_tasks(sender: Celery, **snkwargs: dict) -> None: - sender.add_periodic_task( - crontab(minute=str(delay)), - fn.s(*args, **kwargs), - name=f"{fn.__name__} every {delay}", - **snkwargs, - ) - +) -> CeleryTask | LocalTask: + """Run a registered celery task.""" if delay: - celery_app.on_after_configure.connect(_setup_periodic_tasks) + # Celery doesn't allow us to add tasks to the beat schedule + # at runtime, so we'll use the non-celery asynchronous + # task decorator to handle periodic tasks + func = LocalTask( + fn=fn, + delay=delay, + ) + else: + func = register_celery_task(fn, celery_app) - return fn + return func -def init_celery(app: Flask) -> Celery: +def init_celery(app: Flask) -> Celery | None: class FlaskTask(Task): def __call__(self, *args: object, **kwargs: object) -> object: with app.app_context(): return self.run(*args, **kwargs) - celery_app = Celery(app.name, task_cls=FlaskTask) # Use redis if available if os.getenv("REDIS_HOST"): + celery_app = Celery(app.name, task_cls=FlaskTask) broker_url = app.config.get("REDIS_DB_CONNECT_STRING") - # Otherwise, use default broke - else: - app.logger.error( - "No Redis host found, celery tasks will not be available.", + app.config.from_mapping( + CELERY={ + "broker_url": broker_url, + "result_backend": broker_url, + "task_ignore_result": True, + }, ) - return None + celery_app.config_from_object(app.config["CELERY"]) + celery_app.set_default() + app.extensions["celery"] = celery_app + return celery_app - app.config.from_mapping( - CELERY={ - "broker_url": broker_url, - "result_backend": broker_url, - "task_ignore_result": True, - }, + app.logger.error( + "No Redis host found, celery tasks will not be available.", ) - celery_app.config_from_object(app.config["CELERY"]) - celery_app.set_default() - app.extensions["celery"] = celery_app - return celery_app + return None diff --git a/webapp/context.py b/webapp/context.py index 34fead6d..8d3607b5 100644 --- a/webapp/context.py +++ b/webapp/context.py @@ -12,8 +12,7 @@ def versioned_static(filename): - """ - Template function for generating URLs to static assets: + """Template function for generating URLs to static assets: Given the path for a static file, output a url path with a hex hash as a query string for versioning """ @@ -40,11 +39,9 @@ def base_context(): def clear_trailing_slash(): - """ - Remove trailing slashes from all routes + """Remove trailing slashes from all routes We like our URLs without slashes """ - parsed_url = urlparse(unquote(request.url)) path = parsed_url.path @@ -58,9 +55,43 @@ def clear_trailing_slash(): @contextmanager -def database_lock() -> Generator: +def site_cloning_lock(site_name: str) -> Generator: + """A context manager for acquiring a lock to control access + to site cloning operations. + + This function creates a distributed lock using the available Cache to + ensure only one process can clone a specific site at a time. If the + lock is already acquired by another process, this will poll every 2 seconds + until the lock is released. + + Args: + site_name: The name of the site to acquire a lock for + + Yields: + The current lock status from the cache + + Example: + with site_cloning_lock("ubuntu.com"): + # Site cloning operations here + ... + """ - A context manager for acquiring a file-based lock to control access + cache = init_cache(current_app) + lock_name = f"{site_name}_lock" + locked = cache.get(lock_name) + while locked: + sleep(2) + locked = cache.get(lock_name) + try: + cache.set(lock_name, 1) + yield cache.get(lock_name) + finally: + cache.set(lock_name, 0) + + +@contextmanager +def database_lock() -> Generator: + """A context manager for acquiring a lock to control access to a shared db. This function creates a distributed lock using the available Cache to @@ -74,6 +105,7 @@ def database_lock() -> Generator: Example: with database_lock(): . . . + """ cache = init_cache(current_app) locked = cache.get(DB_LOCK_NAME) diff --git a/webapp/migrate.py b/webapp/migrate.py new file mode 100644 index 00000000..f4f9f22b --- /dev/null +++ b/webapp/migrate.py @@ -0,0 +1,15 @@ +from flask_migrate import upgrade + +from webapp.app import app +from webapp.context import database_lock + + +def migrate() -> None: + # Use lock to prevent multiple concurrent migrations on startup + # Automatically upgrade to head revision + with app.app_context(), database_lock(): + upgrade() + + +if __name__ == "__main__": + migrate() diff --git a/webapp/models.py b/webapp/models.py index f1dd745b..ac505c50 100644 --- a/webapp/models.py +++ b/webapp/models.py @@ -3,7 +3,7 @@ import yaml from flask import Flask -from flask_migrate import Migrate, upgrade +from flask_migrate import Migrate from flask_sqlalchemy import SQLAlchemy from sqlalchemy import ( Column, @@ -24,8 +24,6 @@ ) from sqlalchemy.orm.session import Session -from webapp.context import database_lock - with open("data/data.yaml") as file: data = yaml.load(file, Loader=yaml.FullLoader) @@ -38,9 +36,8 @@ class Base(DeclarativeBase): def get_or_create(session: Session, model: Base, commit=True, **kwargs): - """ - Return an instance of the specified model if it exists, otherwise create a - new instance. + """Return an instance of the specified model if it exists, otherwise create + a new instance. :param session: The database session to use for querying and committing changes. @@ -202,13 +199,9 @@ class WebpageProduct(db.Model, DateTimeMixin): def init_db(app: Flask): engine = create_engine(app.config["SQLALCHEMY_DATABASE_URI"]) session_factory = sessionmaker(bind=engine) - Migrate(app, db) db.init_app(app) - # Use lock to prevent multiple concurrent migrations on startup - # Automatically upgrade to head revision - with app.app_context(), database_lock(): - upgrade() + Migrate(app, db) @app.before_request def before_request(): diff --git a/webapp/scheduled_tasks.py b/webapp/scheduled_tasks.py index 5a3adebc..1acad84e 100644 --- a/webapp/scheduled_tasks.py +++ b/webapp/scheduled_tasks.py @@ -1,50 +1,54 @@ +import logging import os +from datetime import datetime from pathlib import Path import yaml +from flask import Flask from webapp import create_app +from webapp.context import site_cloning_lock from webapp.models import JiraTask, Project, Webpage, db from webapp.settings import BASE_DIR from webapp.site_repository import SiteRepository from webapp.tasks import register_task +logger = logging.getLogger(__name__) + # Default delay between runs for updating the tree -TASK_DELAY = int(os.getenv("TASK_DELAY", "5")) +TASK_DELAY = int(os.getenv("TASK_DELAY", "30")) # Default delay between runs for updating Jira task statuses UPDATE_STATUS_DELAY = int(os.getenv("UPDATE_STATUS_DELAY", "5")) @register_task(delay=TASK_DELAY) def load_site_trees() -> None: - """ - Load the site trees from the queue. - """ + """Load the site trees from the queue.""" app = create_app() yaml_path = Path(BASE_DIR) / "data/sites.yaml" with app.app_context(), yaml_path.open("r") as f: data = yaml.safe_load(f) for site in data["sites"]: - retries = 3 - while retries > 0: + logger.info(f"Loading site tree for {site}") + # Prevent overlapping cloning operations + with site_cloning_lock(site): try: # Enqueue the sites for setup site_repository = SiteRepository(site, app, db=db) # build the tree from GH source without using cache site_repository.get_tree(no_cache=True) - retries = 0 - except Exception: - retries -= 1 + except Exception as e: + logger.error(e, exc_info=True) @register_task(delay=UPDATE_STATUS_DELAY) def update_jira_statuses() -> None: - """ - Get the status of a Jira task and update it if it changed. + """Get the status of a Jira task and update it if it changed. Args: app (Flask): The Flask application instance. + """ app = create_app() with app.app_context(): @@ -66,7 +70,7 @@ def update_jira_statuses() -> None: webpage = Webpage.query.filter_by( id=task.webpage_id, ).first() - if webpage.project_id not in project_ids: + if webpage and webpage.project_id not in project_ids: project_ids.append(webpage.project_id) db.session.commit() @@ -74,11 +78,27 @@ def update_jira_statuses() -> None: # have changed status for project_id in project_ids: project = Project.query.filter_by(id=project_id).first() - site_repository = SiteRepository(project.name, app) - # clean the cache for a new Jira task to appgear in the tree - site_repository.invalidate_cache() + if project: + site_repository = SiteRepository(project.name, app) + # clean the cache for a new Jira task to appear in the tree + site_repository.invalidate_cache() + + +@register_task(delay=1) +def scheduled_tasks_alert() -> None: + """Run every second to test the task scheduler.""" + app = create_app() + with app.app_context(): + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + message = f"[ALERT][{timestamp}] Scheduled tasks running successfully." + logger.debug(message) -def init_scheduled_tasks() -> None: - load_site_trees() - update_jira_statuses() +def init_scheduled_tasks(app: Flask) -> None: + @app.before_request + def start_tasks(): + # only run this task once + app.before_request_funcs[None].remove(start_tasks) + update_jira_statuses() + load_site_trees() + scheduled_tasks_alert() diff --git a/webapp/site_repository.py b/webapp/site_repository.py index f3e3bbf6..7f0a1621 100644 --- a/webapp/site_repository.py +++ b/webapp/site_repository.py @@ -171,6 +171,7 @@ def get_tree_from_disk(self): self.setup_site_repository() templates_folder = Path(self.repo_path + "/templates") + templates_folder.mkdir(parents=True, exist_ok=True) # Check if a background task is active. if so, wait until it completes # with a timeout of 30s diff --git a/webapp/tasklib.py b/webapp/tasklib.py index 5a28998f..47561af5 100644 --- a/webapp/tasklib.py +++ b/webapp/tasklib.py @@ -1,6 +1,5 @@ import atexit import contextlib -import functools import logging import os import signal @@ -21,54 +20,40 @@ class Task: def __init__( self, fn: Callable, - args: tuple, - kwargs: dict, delay: int | None = None, ) -> None: def _start_process( *fn_args: tuple, **fn_kwargs: dict, ) -> None: - process = Process( - target=fn, - args=fn_args, - kwargs=fn_kwargs, - ) - process.start() - task_queue.put(process.pid) if delay: p = Process( target=scheduled_process, - args=(fn, delay, *args), - kwargs={**kwargs}, + args=(fn, delay, *fn_args), + kwargs={**fn_kwargs}, ) else: p = Process( target=local_process, - args=(fn, *args), - kwargs=kwargs, + args=(fn, *fn_args), + kwargs=fn_kwargs, ) task_queue.put(p.pid) p.start() - self.delay = functools.partial(_start_process, *args, **kwargs) - self.s = functools.partial(_start_process, *args, **kwargs) - self.run = functools.partial(_start_process, *args, **kwargs) + self.delay = _start_process + self.s = _start_process + self.run = _start_process -def scheduled_process( - func: Callable, - schedule: int = 1, - *args: tuple, - **kwargs: dict, -) -> None: +def local_process(func: Callable, *args: tuple, **kwargs: dict) -> None: """ Wrapper for tasks that are added to the task queue. Args: func (function): The function to be added to the queue. """ - while True: + try: if len(args) > 0 and len(kwargs) > 0: func(*args, **kwargs) elif len(args) > 0 and len(kwargs) == 0: @@ -77,61 +62,56 @@ def scheduled_process( func(**kwargs) else: func() - time.sleep(schedule) + except Exception: + logger.info( + f"Error in local process: {func.__name__} args:{args}, " + f"kwargs:{kwargs}", + ) + raise -def local_process(func: Callable, *args: tuple, **kwargs: dict) -> None: +def scheduled_process( + func: Callable, + schedule: int = 1, + *args: tuple, + **kwargs: dict, +) -> None: """ Wrapper for tasks that are added to the task queue. Args: func (function): The function to be added to the queue. """ - try: - if len(args) > 0 and len(kwargs) > 0: - func(*args, **kwargs) - elif len(args) > 0 and len(kwargs) == 0: - func(*args) - elif len(args) == 0 and len(kwargs) > 0: - func(**kwargs) - else: - func() - - except Exception: - print( - f"Error in local process: {func.__name__} args:{args}, " - f"kwargs:{kwargs}", - ) - raise + while True: + local_process(func, *args, **kwargs) + time.sleep(schedule * 60) # Convert to minutes def close_background_tasks() -> None: """ Close all background tasks. """ - print("Closing background tasks...") - print(traceback.format_exc()) + logger.info("Closing background tasks...") + final_trace = traceback.format_exc() + if final_trace: + logger.error(traceback.format_exc()) with contextlib.suppress(Exception): for pid in task_queue.get(block=False): - print(f"Task pid: {pid}") + logger.info(f"Task pid: {pid}") os.kill(pid, signal.SIGTERM) def register_local_task( func: Callable, delay: int | None, - args: tuple, - kwargs: dict, ) -> Task: """ Register a local task. """ - print("INFO [Registered task]", func.__name__) + logger.info("INFO [Registered task]", func.__name__) return Task( fn=func, - args=args, - kwargs=kwargs, delay=delay, ) diff --git a/webapp/tasks.py b/webapp/tasks.py index e48c167c..9be1a705 100644 --- a/webapp/tasks.py +++ b/webapp/tasks.py @@ -36,8 +36,6 @@ def wrapper(*args: tuple, **kwargs: dict) -> Any: task = register_local_task( func, delay=delay, - args=args, - kwargs=kwargs, ) # Start task