Skip to content

Commit 08f2762

Browse files
authored
Merge pull request #155 from opsmill/pog-wait-for-schema-converge
Add function to wait until the schema has converged
2 parents beda19e + 0b00a39 commit 08f2762

File tree

4 files changed

+74
-2
lines changed

4 files changed

+74
-2
lines changed

changelog/+0e61a54f.added.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added InfrahubClient.schema.wait_until_converged() which allowes you to wait until the schema has converged across all Infrahub workers before proceeding with an operation. The InfrahubClient.schema.load() method has also been updated with a new parameter "wait_until_converged".

infrahub_sdk/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ class ConfigBase(BaseSettings):
5454
pagination_size: int = Field(default=50, description="Page size for queries to the server")
5555
retry_delay: int = Field(default=5, description="Number of seconds to wait until attempting a retry.")
5656
retry_on_failure: bool = Field(default=False, description="Retry operation in case of failure")
57+
schema_converge_timeout: int = Field(
58+
default=60, description="Number of seconds to wait for schema to have converged"
59+
)
5760
timeout: int = Field(default=60, description="Default connection timeout in seconds")
5861
transport: RequesterTransport = Field(
5962
default=RequesterTransport.HTTPX, description="Set an alternate transport using a predefined option"

infrahub_sdk/schema.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from __future__ import annotations
22

3+
import asyncio
34
from collections import defaultdict
45
from collections.abc import MutableMapping
56
from enum import Enum
67
from pathlib import Path
8+
from time import sleep
79
from typing import TYPE_CHECKING, Any, Optional, TypedDict, TypeVar, Union
810
from urllib.parse import urlencode
911

@@ -22,6 +24,7 @@
2224
)
2325
from .generator import InfrahubGenerator
2426
from .graphql import Mutation
27+
from .queries import SCHEMA_HASH_SYNC_STATUS
2528
from .transforms import InfrahubTransform
2629
from .utils import duplicates
2730

@@ -616,15 +619,36 @@ async def all(
616619

617620
return self.cache[branch]
618621

619-
async def load(self, schemas: list[dict], branch: Optional[str] = None) -> SchemaLoadResponse:
622+
async def load(
623+
self, schemas: list[dict], branch: Optional[str] = None, wait_until_converged: bool = False
624+
) -> SchemaLoadResponse:
620625
branch = branch or self.client.default_branch
621626
url = f"{self.client.address}/api/schema/load?branch={branch}"
622627
response = await self.client._post(
623628
url=url, timeout=max(120, self.client.default_timeout), payload={"schemas": schemas}
624629
)
625630

631+
if wait_until_converged:
632+
await self.wait_until_converged(branch=branch)
633+
626634
return self._validate_load_schema_response(response=response)
627635

636+
async def wait_until_converged(self, branch: Optional[str] = None) -> None:
637+
"""Wait until the schema has converged on the selected branch or the timeout has been reached"""
638+
waited = 0
639+
while True:
640+
status = await self.client.execute_graphql(query=SCHEMA_HASH_SYNC_STATUS, branch_name=branch)
641+
if status["InfrahubStatus"]["summary"]["schema_hash_synced"]:
642+
self.client.log.info(f"Schema successfully converged after {waited} seconds")
643+
return
644+
645+
if waited >= self.client.config.schema_converge_timeout:
646+
self.client.log.warning(f"Schema not converged after {waited} seconds, proceeding regardless")
647+
return
648+
649+
waited += 1
650+
await asyncio.sleep(delay=1)
651+
628652
async def check(self, schemas: list[dict], branch: Optional[str] = None) -> tuple[bool, Optional[dict]]:
629653
branch = branch or self.client.default_branch
630654
url = f"{self.client.address}/api/schema/check?branch={branch}"
@@ -999,15 +1023,36 @@ def fetch(
9991023

10001024
return nodes
10011025

1002-
def load(self, schemas: list[dict], branch: Optional[str] = None) -> SchemaLoadResponse:
1026+
def load(
1027+
self, schemas: list[dict], branch: Optional[str] = None, wait_until_converged: bool = False
1028+
) -> SchemaLoadResponse:
10031029
branch = branch or self.client.default_branch
10041030
url = f"{self.client.address}/api/schema/load?branch={branch}"
10051031
response = self.client._post(
10061032
url=url, timeout=max(120, self.client.default_timeout), payload={"schemas": schemas}
10071033
)
10081034

1035+
if wait_until_converged:
1036+
self.wait_until_converged(branch=branch)
1037+
10091038
return self._validate_load_schema_response(response=response)
10101039

1040+
def wait_until_converged(self, branch: Optional[str] = None) -> None:
1041+
"""Wait until the schema has converged on the selected branch or the timeout has been reached"""
1042+
waited = 0
1043+
while True:
1044+
status = self.client.execute_graphql(query=SCHEMA_HASH_SYNC_STATUS, branch_name=branch)
1045+
if status["InfrahubStatus"]["summary"]["schema_hash_synced"]:
1046+
self.client.log.info(f"Schema successfully converged after {waited} seconds")
1047+
return
1048+
1049+
if waited >= self.client.config.schema_converge_timeout:
1050+
self.client.log.warning(f"Schema not converged after {waited} seconds, proceeding regardless")
1051+
return
1052+
1053+
waited += 1
1054+
sleep(1)
1055+
10111056
def check(self, schemas: list[dict], branch: Optional[str] = None) -> tuple[bool, Optional[dict]]:
10121057
branch = branch or self.client.default_branch
10131058
url = f"{self.client.address}/api/schema/check?branch={branch}"

tests/unit/sdk/test_schema.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from unittest import mock
44

55
import pytest
6+
from pytest_httpx import HTTPXMock
67
from rich.console import Console
78

89
from infrahub_sdk import Config, InfrahubClient, InfrahubClientSync
@@ -18,6 +19,7 @@
1819
InfrahubSchemaSync,
1920
NodeSchema,
2021
)
22+
from tests.unit.sdk.conftest import BothClients
2123

2224
async_schema_methods = [method for method in dir(InfrahubSchema) if not method.startswith("_")]
2325
sync_schema_methods = [method for method in dir(InfrahubSchemaSync) if not method.startswith("_")]
@@ -170,6 +172,27 @@ async def test_remove_enum_option_raises(clients, client_type, mock_schema_query
170172
clients.sync.schema.add_enum_option("BuiltinTag", "attribute", "option")
171173

172174

175+
@pytest.mark.parametrize("client_type", client_types)
176+
async def test_schema_wait_happy_path(clients: BothClients, client_type: list[str], httpx_mock: HTTPXMock) -> None:
177+
"""Simplistic unittest that can be removed once we have the integration tests running again."""
178+
httpx_mock.add_response(
179+
method="POST",
180+
url="http://mock/graphql/branch1",
181+
json={"data": {"InfrahubStatus": {"summary": {"schema_hash_synced": False}}}},
182+
)
183+
httpx_mock.add_response(
184+
method="POST",
185+
url="http://mock/graphql/branch1",
186+
json={"data": {"InfrahubStatus": {"summary": {"schema_hash_synced": True}}}},
187+
)
188+
if client_type == "standard":
189+
await clients.standard.schema.wait_until_converged(branch="branch1")
190+
else:
191+
clients.sync.schema.wait_until_converged(branch="branch1")
192+
193+
assert len(httpx_mock.get_requests()) == 2
194+
195+
173196
async def test_infrahub_repository_config_getters():
174197
repo_config = InfrahubRepositoryConfig(
175198
jinja2_transforms=[

0 commit comments

Comments
 (0)