|
1 |
| -import asyncio |
2 | 1 | import re
|
3 | 2 | from typing import Any
|
4 | 3 |
|
5 | 4 | import jq
|
6 | 5 | import yaml
|
7 | 6 |
|
8 | 7 | from galaxy.core.resources import load_integration_resource
|
| 8 | +from galaxy.utils.concurrency import run_in_thread |
9 | 9 |
|
10 |
| -__all__ = ["Mapper"] |
| 10 | +__all__ = ["Mapper", "MapperError", "MapperNotFoundError", "MapperCompilationError"] |
11 | 11 |
|
12 | 12 |
|
13 | 13 | class Mapper:
|
| 14 | + MAPPINGS_FILE_PATH: str = ".rely/mappings.yaml" |
| 15 | + |
14 | 16 | def __init__(self, integration_name: str):
|
15 | 17 | self.integration_name = integration_name
|
16 | 18 | self.id_allowed_chars = "[^a-zA-Z0-9-]"
|
17 | 19 |
|
18 |
| - async def _load_mapping(self, mapping_kind: str) -> list[dict]: |
19 |
| - mappings = yaml.safe_load(load_integration_resource(self.integration_name, ".rely/mappings.yaml")) |
20 |
| - return [mapping for mapping in mappings.get("resources") if mapping["kind"] == mapping_kind] |
21 |
| - |
22 |
| - def _compile_mappings(self, mapping: dict) -> dict: |
23 |
| - compiled_mapping = {} |
24 |
| - for key, value in mapping.items(): |
25 |
| - if isinstance(value, dict): |
26 |
| - compiled_mapping[key] = self._compile_mappings(value) |
27 |
| - elif isinstance(value, list): |
28 |
| - compiled_mapping[key] = [ |
29 |
| - self._compile_mappings(item) if isinstance(item, dict) else item for item in value |
30 |
| - ] |
31 |
| - else: |
32 |
| - try: |
33 |
| - compiled_mapping[key] = jq.compile(value) if isinstance(value, str) else value |
34 |
| - except Exception as e: |
35 |
| - raise Exception(f"Error compiling maps for key {key} with expression {value}: {e}") |
| 20 | + self._mappings: dict[str, dict[str, Any]] | None = None |
| 21 | + self._compiled_mappings: dict[str, dict[str, Any]] = {} |
| 22 | + |
| 23 | + @property |
| 24 | + def mappings(self) -> dict[str, dict[str, Any]]: |
| 25 | + if self._mappings is None: |
| 26 | + mappings = yaml.safe_load(load_integration_resource(self.integration_name, self.MAPPINGS_FILE_PATH)) |
| 27 | + self._mappings = {mapping["kind"]: mapping["mappings"] for mapping in mappings.get("resources") or []} |
| 28 | + return self._mappings |
| 29 | + |
| 30 | + def get_compiled_mappings(self, mapping_kind: str) -> list[Any]: |
| 31 | + if mapping_kind not in self._compiled_mappings: |
| 32 | + try: |
| 33 | + self._compiled_mappings[mapping_kind] = self._compile_mappings(self.mappings.get(mapping_kind) or {}) |
| 34 | + except Exception as e: |
| 35 | + raise MapperCompilationError(mapping_kind) from e |
| 36 | + return self._compiled_mappings[mapping_kind] |
| 37 | + |
| 38 | + def _compile_mappings(self, item: Any) -> Any: |
| 39 | + if isinstance(item, dict): |
| 40 | + return {key: self._compile_mappings(value) for key, value in item.items()} |
| 41 | + if isinstance(item, list | tuple | set): |
| 42 | + return [self._compile_mappings(value) for value in item] |
| 43 | + if isinstance(item, str): |
| 44 | + try: |
| 45 | + return jq.compile(item) |
| 46 | + except Exception as e: |
| 47 | + raise Exception(f"Error compiling maps with expression {item}: {e}") from e |
| 48 | + return item |
| 49 | + |
| 50 | + def _map_data(self, compiled_mapping: Any, context: dict[str, Any]) -> Any: |
| 51 | + if isinstance(compiled_mapping, dict): |
| 52 | + return {key: self._map_data(value, context) for key, value in compiled_mapping.items()} |
| 53 | + if isinstance(compiled_mapping, list): |
| 54 | + return [self._map_data(item, context) for item in compiled_mapping] |
| 55 | + if isinstance(compiled_mapping, jq._Program): |
| 56 | + try: |
| 57 | + return compiled_mapping.input(context).first() |
| 58 | + except Exception as e: |
| 59 | + raise Exception(f"Error mapping with expression {compiled_mapping} and payload {compiled_mapping}: {e}") |
36 | 60 | return compiled_mapping
|
37 | 61 |
|
38 |
| - def _map_entity(self, compiled_mapping: dict, json_data: dict) -> dict: |
39 |
| - entity = {} |
40 |
| - |
41 |
| - for key, value in compiled_mapping.items(): |
42 |
| - if isinstance(value, dict): |
43 |
| - entity[key] = self._map_entity(value, json_data) |
44 |
| - elif isinstance(value, list): |
45 |
| - entity[key] = [self._map_entity(item, json_data) if isinstance(item, dict) else item for item in value] |
46 |
| - else: |
47 |
| - try: |
48 |
| - entity[key] = value.input(json_data).first() if isinstance(value, jq._Program) else value |
49 |
| - except Exception as e: |
50 |
| - raise Exception(f"Error mapping key {key} with expression {value} and payload {json_data}: {e}") |
51 |
| - |
52 |
| - return self._sanitize(entity) |
| 62 | + def _map_entity(self, compiled_mapping: dict, json_data: dict[str, Any]) -> dict: |
| 63 | + return self._sanitize(self._map_data(compiled_mapping, json_data)) |
53 | 64 |
|
54 | 65 | def _replace_non_matching_characters(self, input_string: str, regex_pattern: str) -> str:
|
55 | 66 | res = re.sub(regex_pattern, ".", input_string)
|
@@ -77,21 +88,31 @@ def _sanitize(self, entity: dict) -> dict:
|
77 | 88 |
|
78 | 89 | return entity
|
79 | 90 |
|
80 |
| - async def process(self, mapping_kind: str, json_data: list[dict], context=None) -> tuple[Any]: |
81 |
| - try: |
82 |
| - mappings = await self._load_mapping(mapping_kind) |
83 |
| - if not mappings: |
84 |
| - raise Exception(f"Unknown Mapper {mapping_kind}") |
85 |
| - compiled_mappings = self._compile_mappings(mappings[0]["mappings"]) |
| 91 | + def process_sync(self, mapping_kind: str, json_data: list[dict], context: Any | None = None) -> list[Any]: |
| 92 | + mappings = self.get_compiled_mappings(mapping_kind) |
| 93 | + if not mappings: |
| 94 | + raise MapperNotFoundError(mapping_kind) |
| 95 | + return [self._map_entity(mappings, {**each, "context": context}) for each in json_data] |
86 | 96 |
|
87 |
| - loop = asyncio.get_running_loop() |
| 97 | + async def process(self, mapping_kind: str, json_data: list[dict], context: Any | None = None) -> tuple[Any]: |
| 98 | + # There is no advantage in using async here as all the work is done in a thread. |
| 99 | + # Keeping it as async for now to avoid breaking existing code that calls this as `await mapper.process(...)`. |
| 100 | + return await run_in_thread(self.process_sync, mapping_kind, json_data, context) |
88 | 101 |
|
89 |
| - entities = await asyncio.gather( |
90 |
| - *[ |
91 |
| - loop.run_in_executor(None, self._map_entity, compiled_mappings, {**each, "context": context}) |
92 |
| - for each in json_data |
93 |
| - ] |
94 |
| - ) |
95 |
| - return entities |
96 |
| - except Exception as e: |
97 |
| - raise e |
| 102 | + |
| 103 | +class MapperError(Exception): |
| 104 | + """Base class for Mapper errors.""" |
| 105 | + |
| 106 | + |
| 107 | +class MapperNotFoundError(MapperError): |
| 108 | + """Mapper not found error.""" |
| 109 | + |
| 110 | + def __init__(self, mapping_kind: str): |
| 111 | + super().__init__(f"Unknown Mapper {mapping_kind}") |
| 112 | + |
| 113 | + |
| 114 | +class MapperCompilationError(MapperError): |
| 115 | + """Mapper compilation error.""" |
| 116 | + |
| 117 | + def __init__(self, mapping_kind: str): |
| 118 | + super().__init__(f"Error compiling mappings for kind {mapping_kind}") |
0 commit comments