Skip to content

Add function to wait until the schema has converged #155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/+0e61a54f.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added InfrahubClient.schema.wait_until_converged() which allowes you to wait until the schema has converged across all Infrahub workers before proceeding with an operation. The InfrahubClient.schema.load() method has also been updated with a new parameter "wait_until_converged".
3 changes: 3 additions & 0 deletions infrahub_sdk/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class ConfigBase(BaseSettings):
pagination_size: int = Field(default=50, description="Page size for queries to the server")
retry_delay: int = Field(default=5, description="Number of seconds to wait until attempting a retry.")
retry_on_failure: bool = Field(default=False, description="Retry operation in case of failure")
schema_converge_timeout: int = Field(
default=60, description="Number of seconds to wait for schema to have converged"
)
timeout: int = Field(default=60, description="Default connection timeout in seconds")
transport: RequesterTransport = Field(
default=RequesterTransport.HTTPX, description="Set an alternate transport using a predefined option"
Expand Down
49 changes: 47 additions & 2 deletions infrahub_sdk/schema.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import annotations

import asyncio
from collections import defaultdict
from collections.abc import MutableMapping
from enum import Enum
from pathlib import Path
from time import sleep
from typing import TYPE_CHECKING, Any, Optional, TypedDict, TypeVar, Union
from urllib.parse import urlencode

Expand All @@ -22,6 +24,7 @@
)
from .generator import InfrahubGenerator
from .graphql import Mutation
from .queries import SCHEMA_HASH_SYNC_STATUS
from .transforms import InfrahubTransform
from .utils import duplicates

Expand Down Expand Up @@ -616,15 +619,36 @@

return self.cache[branch]

async def load(self, schemas: list[dict], branch: Optional[str] = None) -> SchemaLoadResponse:
async def load(
self, schemas: list[dict], branch: Optional[str] = None, wait_until_converged: bool = False
) -> SchemaLoadResponse:
branch = branch or self.client.default_branch
url = f"{self.client.address}/api/schema/load?branch={branch}"
response = await self.client._post(
url=url, timeout=max(120, self.client.default_timeout), payload={"schemas": schemas}
)

if wait_until_converged:
await self.wait_until_converged(branch=branch)

Check warning on line 632 in infrahub_sdk/schema.py

View check run for this annotation

Codecov / codecov/patch

infrahub_sdk/schema.py#L632

Added line #L632 was not covered by tests

return self._validate_load_schema_response(response=response)

async def wait_until_converged(self, branch: Optional[str] = None) -> None:
"""Wait until the schema has converged on the selected branch or the timeout has been reached"""
waited = 0
while True:
status = await self.client.execute_graphql(query=SCHEMA_HASH_SYNC_STATUS, branch_name=branch)
if status["InfrahubStatus"]["summary"]["schema_hash_synced"]:
self.client.log.info(f"Schema successfully converged after {waited} seconds")
return

if waited >= self.client.config.schema_converge_timeout:
self.client.log.warning(f"Schema not converged after {waited} seconds, proceeding regardless")
return

Check warning on line 647 in infrahub_sdk/schema.py

View check run for this annotation

Codecov / codecov/patch

infrahub_sdk/schema.py#L646-L647

Added lines #L646 - L647 were not covered by tests

waited += 1
await asyncio.sleep(delay=1)

async def check(self, schemas: list[dict], branch: Optional[str] = None) -> tuple[bool, Optional[dict]]:
branch = branch or self.client.default_branch
url = f"{self.client.address}/api/schema/check?branch={branch}"
Expand Down Expand Up @@ -999,15 +1023,36 @@

return nodes

def load(self, schemas: list[dict], branch: Optional[str] = None) -> SchemaLoadResponse:
def load(
self, schemas: list[dict], branch: Optional[str] = None, wait_until_converged: bool = False
) -> SchemaLoadResponse:
branch = branch or self.client.default_branch
url = f"{self.client.address}/api/schema/load?branch={branch}"
response = self.client._post(
url=url, timeout=max(120, self.client.default_timeout), payload={"schemas": schemas}
)

if wait_until_converged:
self.wait_until_converged(branch=branch)

Check warning on line 1036 in infrahub_sdk/schema.py

View check run for this annotation

Codecov / codecov/patch

infrahub_sdk/schema.py#L1036

Added line #L1036 was not covered by tests

return self._validate_load_schema_response(response=response)

def wait_until_converged(self, branch: Optional[str] = None) -> None:
"""Wait until the schema has converged on the selected branch or the timeout has been reached"""
waited = 0
while True:
status = self.client.execute_graphql(query=SCHEMA_HASH_SYNC_STATUS, branch_name=branch)
if status["InfrahubStatus"]["summary"]["schema_hash_synced"]:
self.client.log.info(f"Schema successfully converged after {waited} seconds")
return

if waited >= self.client.config.schema_converge_timeout:
self.client.log.warning(f"Schema not converged after {waited} seconds, proceeding regardless")
return

Check warning on line 1051 in infrahub_sdk/schema.py

View check run for this annotation

Codecov / codecov/patch

infrahub_sdk/schema.py#L1050-L1051

Added lines #L1050 - L1051 were not covered by tests

waited += 1
sleep(1)

def check(self, schemas: list[dict], branch: Optional[str] = None) -> tuple[bool, Optional[dict]]:
branch = branch or self.client.default_branch
url = f"{self.client.address}/api/schema/check?branch={branch}"
Expand Down
23 changes: 23 additions & 0 deletions tests/unit/sdk/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from unittest import mock

import pytest
from pytest_httpx import HTTPXMock
from rich.console import Console

from infrahub_sdk import Config, InfrahubClient, InfrahubClientSync
Expand All @@ -18,6 +19,7 @@
InfrahubSchemaSync,
NodeSchema,
)
from tests.unit.sdk.conftest import BothClients

async_schema_methods = [method for method in dir(InfrahubSchema) if not method.startswith("_")]
sync_schema_methods = [method for method in dir(InfrahubSchemaSync) if not method.startswith("_")]
Expand Down Expand Up @@ -170,6 +172,27 @@ async def test_remove_enum_option_raises(clients, client_type, mock_schema_query
clients.sync.schema.add_enum_option("BuiltinTag", "attribute", "option")


@pytest.mark.parametrize("client_type", client_types)
async def test_schema_wait_happy_path(clients: BothClients, client_type: list[str], httpx_mock: HTTPXMock) -> None:
"""Simplistic unittest that can be removed once we have the integration tests running again."""
httpx_mock.add_response(
method="POST",
url="http://mock/graphql/branch1",
json={"data": {"InfrahubStatus": {"summary": {"schema_hash_synced": False}}}},
)
httpx_mock.add_response(
method="POST",
url="http://mock/graphql/branch1",
json={"data": {"InfrahubStatus": {"summary": {"schema_hash_synced": True}}}},
)
if client_type == "standard":
await clients.standard.schema.wait_until_converged(branch="branch1")
else:
clients.sync.schema.wait_until_converged(branch="branch1")

assert len(httpx_mock.get_requests()) == 2


async def test_infrahub_repository_config_getters():
repo_config = InfrahubRepositoryConfig(
jinja2_transforms=[
Expand Down