From 870e291aae23a6e8fdabdf8151648a5b71875477 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Fri, 2 May 2025 15:51:33 +0800 Subject: [PATCH 1/7] log, add response to bulk insert --- stac_fastapi/core/stac_fastapi/core/core.py | 39 +++--- .../elasticsearch/database_logic.py | 120 ++++++++++++------ .../stac_fastapi/opensearch/database_logic.py | 46 ++++--- 3 files changed, 132 insertions(+), 73 deletions(-) diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index d9e8d214..09a7c44b 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -676,21 +676,22 @@ class TransactionsClient(AsyncBaseTransactionsClient): @overrides async def create_item( self, collection_id: str, item: Union[Item, ItemCollection], **kwargs - ) -> Optional[stac_types.Item]: - """Create an item in the collection. + ) -> Union[stac_types.Item, str]: + """ + Create an item or a feature collection of items in the specified collection. Args: - collection_id (str): The id of the collection to add the item to. - item (stac_types.Item): The item to be added to the collection. - kwargs: Additional keyword arguments. + collection_id (str): The ID of the collection to add the item(s) to. + item (Union[Item, ItemCollection]): A single item or a collection of items to be added. + **kwargs: Additional keyword arguments, such as `request` and `refresh`. Returns: - stac_types.Item: The created item. + Union[stac_types.Item, str]: The created item if a single item is added, or a summary string + indicating the number of items successfully added and errors if a collection of items is added. Raises: - NotFound: If the specified collection is not found in the database. - ConflictError: If the item in the specified collection already exists. - + NotFoundError: If the specified collection is not found in the database. + ConflictError: If an item with the same ID already exists in the collection. """ item = item.model_dump(mode="json") base_url = str(kwargs["request"].base_url) @@ -706,12 +707,16 @@ async def create_item( ) for item in item["features"] ] - - await self.database.bulk_async( + attempted = len(processed_items) + success, errors = await self.database.bulk_async( collection_id, processed_items, refresh=kwargs.get("refresh", False) ) + if errors: + logger.error(f"Bulk async operation encountered errors: {errors}") + else: + logger.info(f"Bulk async operation succeeded with {success} actions.") - return None + return f"Successfully added {success} Items. {attempted - success} errors occurred." else: item = await self.database.prep_create_item(item=item, base_url=base_url) await self.database.create_item(item, refresh=kwargs.get("refresh", False)) @@ -907,12 +912,16 @@ def bulk_item_insert( # not a great way to get the collection_id-- should be part of the method signature collection_id = processed_items[0]["collection"] - - self.database.bulk_sync( + attempted = len(processed_items) + success, errors = self.database.bulk_sync( collection_id, processed_items, refresh=kwargs.get("refresh", False) ) + if errors: + logger.error(f"Bulk sync operation encountered errors: {errors}") + else: + logger.info(f"Bulk sync operation succeeded with {success} actions.") - return f"Successfully added {len(processed_items)} Items." + return f"Successfully added {success} Items. {attempted - success} errors occurred." _DEFAULT_QUERYABLES: Dict[str, Dict[str, Any]] = { diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index d32db777..3737bcc9 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -703,31 +703,47 @@ async def prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False ) -> Item: """ - Preps an item for insertion into the database. + Prepare an item for insertion into the database. + + This method performs pre-insertion preparation on the given `item`, such as: + - Verifying that the collection the item belongs to exists. + - Optionally checking if an item with the same ID already exists in the database. + - Serializing the item into a database-compatible format. Args: - item (Item): The item to be prepped for insertion. - base_url (str): The base URL used to create the item's self URL. - exist_ok (bool): Indicates whether the item can exist already. + item (Item): The item to be prepared for insertion. + base_url (str): The base URL used to construct the item's self URL. + exist_ok (bool): Indicates whether the item can already exist in the database. + If False, a `ConflictError` is raised if the item exists. Returns: - Item: The prepped item. + Item: The prepared item, serialized into a database-compatible format. Raises: - ConflictError: If the item already exists in the database. - + NotFoundError: If the collection that the item belongs to does not exist in the database. + ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False. """ + logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.") + + # Check if the collection exists await self.check_collection_exists(collection_id=item["collection"]) + # Check if the item already exists in the database if not exist_ok and await self.client.exists( index=index_alias_by_collection_id(item["collection"]), id=mk_item_id(item["id"], item["collection"]), ): + logger.warning( + f"Item {item['id']} in collection {item['collection']} already exists." + ) raise ConflictError( f"Item {item['id']} in collection {item['collection']} already exists" ) - return self.item_serializer.stac_to_db(item, base_url) + # Serialize the item into a database-compatible format + prepped_item = self.item_serializer.stac_to_db(item, base_url) + logger.debug(f"Item {item['id']} prepared successfully.") + return prepped_item def sync_prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False @@ -735,36 +751,46 @@ def sync_prep_create_item( """ Prepare an item for insertion into the database. - This method performs pre-insertion preparation on the given `item`, - such as checking if the collection the item belongs to exists, - and optionally verifying that an item with the same ID does not already exist in the database. + This method performs pre-insertion preparation on the given `item`, such as: + - Verifying that the collection the item belongs to exists. + - Optionally checking if an item with the same ID already exists in the database. + - Serializing the item into a database-compatible format. Args: - item (Item): The item to be inserted into the database. - base_url (str): The base URL used for constructing URLs for the item. - exist_ok (bool): Indicates whether the item can exist already. + item (Item): The item to be prepared for insertion. + base_url (str): The base URL used to construct the item's self URL. + exist_ok (bool): Indicates whether the item can already exist in the database. + If False, a `ConflictError` is raised if the item exists. Returns: - Item: The item after preparation is done. + Item: The prepared item, serialized into a database-compatible format. Raises: NotFoundError: If the collection that the item belongs to does not exist in the database. - ConflictError: If an item with the same ID already exists in the collection. + ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False. """ - item_id = item["id"] - collection_id = item["collection"] - if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=collection_id): - raise NotFoundError(f"Collection {collection_id} does not exist") + logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.") + # Check if the collection exists + if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): + raise NotFoundError(f"Collection {item['collection']} does not exist") + + # Check if the item already exists in the database if not exist_ok and self.sync_client.exists( - index=index_alias_by_collection_id(collection_id), - id=mk_item_id(item_id, collection_id), + index=index_alias_by_collection_id(item["collection"]), + id=mk_item_id(item["id"], item["collection"]), ): + logger.warning( + f"Item {item['id']} in collection {item['collection']} already exists." + ) raise ConflictError( - f"Item {item_id} in collection {collection_id} already exists" + f"Item {item['id']} in collection {item['collection']} already exists" ) - return self.item_serializer.stac_to_db(item, base_url) + # Serialize the item into a database-compatible format + prepped_item = self.item_serializer.stac_to_db(item, base_url) + logger.debug(f"Item {item['id']} prepared successfully.") + return prepped_item async def create_item(self, item: Item, refresh: bool = False): """Database logic for creating one item. @@ -960,51 +986,63 @@ async def delete_collection(self, collection_id: str, refresh: bool = False): async def bulk_async( self, collection_id: str, processed_items: List[Item], refresh: bool = False - ) -> None: - """Perform a bulk insert of items into the database asynchronously. + ) -> Tuple[int, List[Dict[str, Any]]]: + """ + Perform a bulk insert of items into the database asynchronously. Args: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). + Returns: + Tuple[int, List[Dict[str, Any]]]: A tuple containing: + - The number of successfully processed actions (`success`). + - A list of errors encountered during the bulk operation (`errors`). + Notes: - This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The - insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. The - `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the - index is refreshed after the bulk insert. The function does not return any value. + This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. + The insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. + The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, + the index is refreshed after the bulk insert. """ - await helpers.async_bulk( + success, errors = await helpers.async_bulk( self.client, mk_actions(collection_id, processed_items), refresh=refresh, - raise_on_error=False, + raise_on_error=False, # Do not raise errors ) + return success, errors def bulk_sync( self, collection_id: str, processed_items: List[Item], refresh: bool = False - ) -> None: - """Perform a bulk insert of items into the database synchronously. + ) -> Tuple[int, List[Dict[str, Any]]]: + """ + Perform a bulk insert of items into the database synchronously. Args: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). + Returns: + Tuple[int, List[Dict[str, Any]]]: A tuple containing: + - The number of successfully processed actions (`success`). + - A list of errors encountered during the bulk operation (`errors`). + Notes: - This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The - insert is performed synchronously and blocking, meaning that the function does not return until the insert has + This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. + The insert is performed synchronously and blocking, meaning that the function does not return until the insert has completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to - True, the index is refreshed after the bulk insert. The function does not return any value. + True, the index is refreshed after the bulk insert. """ - helpers.bulk( + success, errors = helpers.bulk( self.sync_client, mk_actions(collection_id, processed_items), refresh=refresh, - raise_on_error=False, + raise_on_error=False, # Do not raise errors ) + return success, errors # DANGER async def delete_items(self) -> None: diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 29bd6030..f4523553 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -984,51 +984,63 @@ async def delete_collection(self, collection_id: str, refresh: bool = False): async def bulk_async( self, collection_id: str, processed_items: List[Item], refresh: bool = False - ) -> None: - """Perform a bulk insert of items into the database asynchronously. + ) -> Tuple[int, List[Dict[str, Any]]]: + """ + Perform a bulk insert of items into the database asynchronously. Args: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). + Returns: + Tuple[int, List[Dict[str, Any]]]: A tuple containing: + - The number of successfully processed actions (`success`). + - A list of errors encountered during the bulk operation (`errors`). + Notes: - This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The - insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. The - `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the - index is refreshed after the bulk insert. The function does not return any value. + This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. + The insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. + The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, + the index is refreshed after the bulk insert. """ - await helpers.async_bulk( + success, errors = await helpers.async_bulk( self.client, mk_actions(collection_id, processed_items), refresh=refresh, - raise_on_error=False, + raise_on_error=False, # Do not raise errors ) + return success, errors def bulk_sync( self, collection_id: str, processed_items: List[Item], refresh: bool = False - ) -> None: - """Perform a bulk insert of items into the database synchronously. + ) -> Tuple[int, List[Dict[str, Any]]]: + """ + Perform a bulk insert of items into the database synchronously. Args: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). + Returns: + Tuple[int, List[Dict[str, Any]]]: A tuple containing: + - The number of successfully processed actions (`success`). + - A list of errors encountered during the bulk operation (`errors`). + Notes: - This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The - insert is performed synchronously and blocking, meaning that the function does not return until the insert has + This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. + The insert is performed synchronously and blocking, meaning that the function does not return until the insert has completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to - True, the index is refreshed after the bulk insert. The function does not return any value. + True, the index is refreshed after the bulk insert. """ - helpers.bulk( + success, errors = helpers.bulk( self.sync_client, mk_actions(collection_id, processed_items), refresh=refresh, - raise_on_error=False, + raise_on_error=False, # Do not raise errors ) + return success, errors # DANGER async def delete_items(self) -> None: From ae1071ea662137eafe385dd0d886dd968b6dbdb1 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Fri, 2 May 2025 16:15:38 +0800 Subject: [PATCH 2/7] bulk raise on error env var --- README.md | 3 ++- stac_fastapi/core/stac_fastapi/core/core.py | 11 +++++++++-- .../elasticsearch/database_logic.py | 18 ++++++++++++++---- .../stac_fastapi/opensearch/database_logic.py | 18 ++++++++++++++---- 4 files changed, 39 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 896db23f..5f93608e 100644 --- a/README.md +++ b/README.md @@ -113,7 +113,8 @@ You can customize additional settings in your `.env` file: | `BACKEND` | Tests-related variable | `elasticsearch` or `opensearch` based on the backend | Optional | | `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional | | `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional | -| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional | +| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional +| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. | `false` | Optional | > [!NOTE] > The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch. diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 09a7c44b..54f1f430 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -1,6 +1,7 @@ """Core client.""" import logging +import os from collections import deque from datetime import datetime as datetime_type from datetime import timezone @@ -709,7 +710,10 @@ async def create_item( ] attempted = len(processed_items) success, errors = await self.database.bulk_async( - collection_id, processed_items, refresh=kwargs.get("refresh", False) + collection_id, + processed_items, + refresh=kwargs.get("refresh", False), + raise_on_error=os.getenv("RAISE_ON_BULK_ERROR", False), ) if errors: logger.error(f"Bulk async operation encountered errors: {errors}") @@ -914,7 +918,10 @@ def bulk_item_insert( collection_id = processed_items[0]["collection"] attempted = len(processed_items) success, errors = self.database.bulk_sync( - collection_id, processed_items, refresh=kwargs.get("refresh", False) + collection_id, + processed_items, + refresh=kwargs.get("refresh", False), + raise_on_error=os.getenv("RAISE_ON_BULK_ERROR", False), ) if errors: logger.error(f"Bulk sync operation encountered errors: {errors}") diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 3737bcc9..0488f1d8 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -985,7 +985,11 @@ async def delete_collection(self, collection_id: str, refresh: bool = False): await delete_item_index(collection_id) async def bulk_async( - self, collection_id: str, processed_items: List[Item], refresh: bool = False + self, + collection_id: str, + processed_items: List[Item], + refresh: bool = False, + raise_on_error: bool = False, ) -> Tuple[int, List[Dict[str, Any]]]: """ Perform a bulk insert of items into the database asynchronously. @@ -994,6 +998,7 @@ async def bulk_async( collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). + raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False). Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1010,12 +1015,16 @@ async def bulk_async( self.client, mk_actions(collection_id, processed_items), refresh=refresh, - raise_on_error=False, # Do not raise errors + raise_on_error=raise_on_error, ) return success, errors def bulk_sync( - self, collection_id: str, processed_items: List[Item], refresh: bool = False + self, + collection_id: str, + processed_items: List[Item], + refresh: bool = False, + raise_on_error: bool = False, ) -> Tuple[int, List[Dict[str, Any]]]: """ Perform a bulk insert of items into the database synchronously. @@ -1024,6 +1033,7 @@ def bulk_sync( collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). + raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False). Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1040,7 +1050,7 @@ def bulk_sync( self.sync_client, mk_actions(collection_id, processed_items), refresh=refresh, - raise_on_error=False, # Do not raise errors + raise_on_error=raise_on_error, ) return success, errors diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index f4523553..6af1380e 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -983,7 +983,11 @@ async def delete_collection(self, collection_id: str, refresh: bool = False): await delete_item_index(collection_id) async def bulk_async( - self, collection_id: str, processed_items: List[Item], refresh: bool = False + self, + collection_id: str, + processed_items: List[Item], + refresh: bool = False, + raise_on_error: bool = False, ) -> Tuple[int, List[Dict[str, Any]]]: """ Perform a bulk insert of items into the database asynchronously. @@ -992,6 +996,7 @@ async def bulk_async( collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). + raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False). Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1008,12 +1013,16 @@ async def bulk_async( self.client, mk_actions(collection_id, processed_items), refresh=refresh, - raise_on_error=False, # Do not raise errors + raise_on_error=raise_on_error, ) return success, errors def bulk_sync( - self, collection_id: str, processed_items: List[Item], refresh: bool = False + self, + collection_id: str, + processed_items: List[Item], + refresh: bool = False, + raise_on_error: bool = False, ) -> Tuple[int, List[Dict[str, Any]]]: """ Perform a bulk insert of items into the database synchronously. @@ -1022,6 +1031,7 @@ def bulk_sync( collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). + raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False). Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1038,7 +1048,7 @@ def bulk_sync( self.sync_client, mk_actions(collection_id, processed_items), refresh=refresh, - raise_on_error=False, # Do not raise errors + raise_on_error=raise_on_error, ) return success, errors From 345a0afa7ff71d701d386278d3f0b948d43663a0 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Fri, 2 May 2025 19:31:58 +0800 Subject: [PATCH 3/7] extend raise on/ off --- stac_fastapi/core/stac_fastapi/core/core.py | 9 +- .../elasticsearch/database_logic.py | 70 ++++++++--- .../stac_fastapi/opensearch/database_logic.py | 111 ++++++++++++++---- 3 files changed, 143 insertions(+), 47 deletions(-) diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 54f1f430..28722b61 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -1,7 +1,6 @@ """Core client.""" import logging -import os from collections import deque from datetime import datetime as datetime_type from datetime import timezone @@ -713,7 +712,6 @@ async def create_item( collection_id, processed_items, refresh=kwargs.get("refresh", False), - raise_on_error=os.getenv("RAISE_ON_BULK_ERROR", False), ) if errors: logger.error(f"Bulk async operation encountered errors: {errors}") @@ -722,7 +720,9 @@ async def create_item( return f"Successfully added {success} Items. {attempted - success} errors occurred." else: - item = await self.database.prep_create_item(item=item, base_url=base_url) + item = await self.database.async_prep_create_item( + item=item, base_url=base_url + ) await self.database.create_item(item, refresh=kwargs.get("refresh", False)) return ItemSerializer.db_to_stac(item, base_url) @@ -885,7 +885,7 @@ def preprocess_item( The preprocessed item. """ exist_ok = method == BulkTransactionMethod.UPSERT - return self.database.sync_prep_create_item( + return self.database.bulk_sync_prep_create_item( item=item, base_url=base_url, exist_ok=exist_ok ) @@ -921,7 +921,6 @@ def bulk_item_insert( collection_id, processed_items, refresh=kwargs.get("refresh", False), - raise_on_error=os.getenv("RAISE_ON_BULK_ERROR", False), ) if errors: logger.error(f"Bulk sync operation encountered errors: {errors}") diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 0488f1d8..0b5408d1 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -31,7 +31,7 @@ ) from stac_fastapi.core.extensions import filter from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer -from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon +from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, get_bool_env from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings from stac_fastapi.elasticsearch.config import ( ElasticsearchSettings as SyncElasticsearchSettings, @@ -699,7 +699,37 @@ async def check_collection_exists(self, collection_id: str): if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise NotFoundError(f"Collection {collection_id} does not exist") - async def prep_create_item( + async def async_prep_create_item( + self, item: Item, base_url: str, exist_ok: bool = False + ) -> Item: + """ + Preps an item for insertion into the database. + + Args: + item (Item): The item to be prepped for insertion. + base_url (str): The base URL used to create the item's self URL. + exist_ok (bool): Indicates whether the item can exist already. + + Returns: + Item: The prepped item. + + Raises: + ConflictError: If the item already exists in the database. + + """ + await self.check_collection_exists(collection_id=item["collection"]) + + if not exist_ok and await self.client.exists( + index=index_alias_by_collection_id(item["collection"]), + id=mk_item_id(item["id"], item["collection"]), + ): + raise ConflictError( + f"Item {item['id']} in collection {item['collection']} already exists" + ) + + return self.item_serializer.stac_to_db(item, base_url) + + async def bulk_async_prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False ) -> Item: """ @@ -721,7 +751,8 @@ async def prep_create_item( Raises: NotFoundError: If the collection that the item belongs to does not exist in the database. - ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False. + ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False, + and `RAISE_ON_BULK_ERROR` is set to `true`. """ logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.") @@ -733,19 +764,22 @@ async def prep_create_item( index=index_alias_by_collection_id(item["collection"]), id=mk_item_id(item["id"], item["collection"]), ): - logger.warning( + error_message = ( f"Item {item['id']} in collection {item['collection']} already exists." ) - raise ConflictError( - f"Item {item['id']} in collection {item['collection']} already exists" - ) + if get_bool_env("RAISE_ON_BULK_ERROR", default=False): + raise ConflictError(error_message) + else: + logger.warning( + f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false." + ) # Serialize the item into a database-compatible format prepped_item = self.item_serializer.stac_to_db(item, base_url) logger.debug(f"Item {item['id']} prepared successfully.") return prepped_item - def sync_prep_create_item( + def bulk_sync_prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False ) -> Item: """ @@ -767,7 +801,8 @@ def sync_prep_create_item( Raises: NotFoundError: If the collection that the item belongs to does not exist in the database. - ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False. + ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False, + and `RAISE_ON_BULK_ERROR` is set to `true`. """ logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.") @@ -780,12 +815,15 @@ def sync_prep_create_item( index=index_alias_by_collection_id(item["collection"]), id=mk_item_id(item["id"], item["collection"]), ): - logger.warning( + error_message = ( f"Item {item['id']} in collection {item['collection']} already exists." ) - raise ConflictError( - f"Item {item['id']} in collection {item['collection']} already exists" - ) + if get_bool_env("RAISE_ON_BULK_ERROR", default=False): + raise ConflictError(error_message) + else: + logger.warning( + f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false." + ) # Serialize the item into a database-compatible format prepped_item = self.item_serializer.stac_to_db(item, base_url) @@ -989,7 +1027,6 @@ async def bulk_async( collection_id: str, processed_items: List[Item], refresh: bool = False, - raise_on_error: bool = False, ) -> Tuple[int, List[Dict[str, Any]]]: """ Perform a bulk insert of items into the database asynchronously. @@ -998,7 +1035,6 @@ async def bulk_async( collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). - raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False). Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1011,6 +1047,7 @@ async def bulk_async( The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the index is refreshed after the bulk insert. """ + raise_on_error = get_bool_env("RAISE_ON_BULK_ERROR", default=False) success, errors = await helpers.async_bulk( self.client, mk_actions(collection_id, processed_items), @@ -1024,7 +1061,6 @@ def bulk_sync( collection_id: str, processed_items: List[Item], refresh: bool = False, - raise_on_error: bool = False, ) -> Tuple[int, List[Dict[str, Any]]]: """ Perform a bulk insert of items into the database synchronously. @@ -1033,7 +1069,6 @@ def bulk_sync( collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). - raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False). Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1046,6 +1081,7 @@ def bulk_sync( completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the index is refreshed after the bulk insert. """ + raise_on_error = get_bool_env("RAISE_ON_BULK_ERROR", default=False) success, errors = helpers.bulk( self.sync_client, mk_actions(collection_id, processed_items), diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 6af1380e..ddf9b3cf 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -31,7 +31,7 @@ ) from stac_fastapi.core.extensions import filter from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer -from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon +from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, get_bool_env from stac_fastapi.opensearch.config import ( AsyncOpensearchSettings as AsyncSearchSettings, ) @@ -723,7 +723,7 @@ async def check_collection_exists(self, collection_id: str): if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise NotFoundError(f"Collection {collection_id} does not exist") - async def prep_create_item( + async def async_prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False ) -> Item: """ @@ -753,42 +753,105 @@ async def prep_create_item( return self.item_serializer.stac_to_db(item, base_url) - def sync_prep_create_item( + async def bulk_async_prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False ) -> Item: """ Prepare an item for insertion into the database. - This method performs pre-insertion preparation on the given `item`, - such as checking if the collection the item belongs to exists, - and optionally verifying that an item with the same ID does not already exist in the database. + This method performs pre-insertion preparation on the given `item`, such as: + - Verifying that the collection the item belongs to exists. + - Optionally checking if an item with the same ID already exists in the database. + - Serializing the item into a database-compatible format. Args: - item (Item): The item to be inserted into the database. - base_url (str): The base URL used for constructing URLs for the item. - exist_ok (bool): Indicates whether the item can exist already. + item (Item): The item to be prepared for insertion. + base_url (str): The base URL used to construct the item's self URL. + exist_ok (bool): Indicates whether the item can already exist in the database. + If False, a `ConflictError` is raised if the item exists. Returns: - Item: The item after preparation is done. + Item: The prepared item, serialized into a database-compatible format. Raises: NotFoundError: If the collection that the item belongs to does not exist in the database. - ConflictError: If an item with the same ID already exists in the collection. + ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False, + and `RAISE_ON_BULK_ERROR` is set to `true`. """ - item_id = item["id"] - collection_id = item["collection"] - if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=collection_id): - raise NotFoundError(f"Collection {collection_id} does not exist") + logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.") - if not exist_ok and self.sync_client.exists( - index=index_alias_by_collection_id(collection_id), - id=mk_item_id(item_id, collection_id), + # Check if the collection exists + await self.check_collection_exists(collection_id=item["collection"]) + + # Check if the item already exists in the database + if not exist_ok and await self.client.exists( + index=index_alias_by_collection_id(item["collection"]), + id=mk_item_id(item["id"], item["collection"]), ): - raise ConflictError( - f"Item {item_id} in collection {collection_id} already exists" + error_message = ( + f"Item {item['id']} in collection {item['collection']} already exists." ) + if get_bool_env("RAISE_ON_BULK_ERROR", default=False): + raise ConflictError(error_message) + else: + logger.warning( + f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false." + ) + # Serialize the item into a database-compatible format + prepped_item = self.item_serializer.stac_to_db(item, base_url) + logger.debug(f"Item {item['id']} prepared successfully.") + return prepped_item + + def bulk_sync_prep_create_item( + self, item: Item, base_url: str, exist_ok: bool = False + ) -> Item: + """ + Prepare an item for insertion into the database. - return self.item_serializer.stac_to_db(item, base_url) + This method performs pre-insertion preparation on the given `item`, such as: + - Verifying that the collection the item belongs to exists. + - Optionally checking if an item with the same ID already exists in the database. + - Serializing the item into a database-compatible format. + + Args: + item (Item): The item to be prepared for insertion. + base_url (str): The base URL used to construct the item's self URL. + exist_ok (bool): Indicates whether the item can already exist in the database. + If False, a `ConflictError` is raised if the item exists. + + Returns: + Item: The prepared item, serialized into a database-compatible format. + + Raises: + NotFoundError: If the collection that the item belongs to does not exist in the database. + ConflictError: If an item with the same ID already exists in the collection and `exist_ok` is False, + and `RAISE_ON_BULK_ERROR` is set to `true`. + """ + logger.debug(f"Preparing item {item['id']} in collection {item['collection']}.") + + # Check if the collection exists + if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): + raise NotFoundError(f"Collection {item['collection']} does not exist") + + # Check if the item already exists in the database + if not exist_ok and self.sync_client.exists( + index=index_alias_by_collection_id(item["collection"]), + id=mk_item_id(item["id"], item["collection"]), + ): + error_message = ( + f"Item {item['id']} in collection {item['collection']} already exists." + ) + if get_bool_env("RAISE_ON_BULK_ERROR", default=False): + raise ConflictError(error_message) + else: + logger.warning( + f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false." + ) + + # Serialize the item into a database-compatible format + prepped_item = self.item_serializer.stac_to_db(item, base_url) + logger.debug(f"Item {item['id']} prepared successfully.") + return prepped_item async def create_item(self, item: Item, refresh: bool = False): """Database logic for creating one item. @@ -987,7 +1050,6 @@ async def bulk_async( collection_id: str, processed_items: List[Item], refresh: bool = False, - raise_on_error: bool = False, ) -> Tuple[int, List[Dict[str, Any]]]: """ Perform a bulk insert of items into the database asynchronously. @@ -996,7 +1058,6 @@ async def bulk_async( collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). - raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False). Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1009,6 +1070,7 @@ async def bulk_async( The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the index is refreshed after the bulk insert. """ + raise_on_error = get_bool_env("RAISE_ON_BULK_ERROR", default=False) success, errors = await helpers.async_bulk( self.client, mk_actions(collection_id, processed_items), @@ -1022,7 +1084,6 @@ def bulk_sync( collection_id: str, processed_items: List[Item], refresh: bool = False, - raise_on_error: bool = False, ) -> Tuple[int, List[Dict[str, Any]]]: """ Perform a bulk insert of items into the database synchronously. @@ -1031,7 +1092,6 @@ def bulk_sync( collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. refresh (bool): Whether to refresh the index after the bulk insert (default: False). - raise_on_error (bool): Whether to raise an error if the bulk operation fails (default: False). Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1044,6 +1104,7 @@ def bulk_sync( completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the index is refreshed after the bulk insert. """ + raise_on_error = get_bool_env("RAISE_ON_BULK_ERROR", default=False) success, errors = helpers.bulk( self.sync_client, mk_actions(collection_id, processed_items), From e08f1cdc56af460400a90d25ba6144fecb27ccfa Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Fri, 2 May 2025 23:41:29 +0800 Subject: [PATCH 4/7] update test --- stac_fastapi/tests/api/test_api.py | 57 +++++++++++++++--------------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/stac_fastapi/tests/api/test_api.py b/stac_fastapi/tests/api/test_api.py index 91c0e811..807da5e4 100644 --- a/stac_fastapi/tests/api/test_api.py +++ b/stac_fastapi/tests/api/test_api.py @@ -5,6 +5,8 @@ import pytest +from stac_fastapi.types.errors import ConflictError + from ..conftest import create_collection, create_item ROUTES = { @@ -635,53 +637,47 @@ async def test_search_line_string_intersects(app_client, ctx): @pytest.mark.parametrize( "value, expected", [ - (32767, 1), # Short Limit, + (32767, 1), # Short Limit (2147483647, 1), # Int Limit - (2147483647 + 5000, 1), # Above int Limit - (21474836470, 1), # Above int Limit - # This value still fails to return 1 - # Commenting out - # (9223372036854775807, 1), + (2147483647 + 5000, 1), # Above Int Limit + (21474836470, 1), # Above Int Limit ], ) async def test_big_int_eo_search( app_client, txn_client, test_item, test_collection, value, expected ): - - random_str = "".join(random.choice("abcdef") for i in range(random.randint(1, 5))) + random_str = "".join(random.choice("abcdef") for _ in range(5)) collection_id = f"test-collection-eo-{random_str}" - test_big_int_item = test_item - del test_big_int_item["properties"]["eo:bands"] - test_big_int_item["collection"] = collection_id - test_big_int_collection = test_collection - test_big_int_collection["id"] = collection_id - - # type number - attr = "eo:full_width_half_max" - - stac_extensions = [ - "https://stac-extensions.github.io/eo/v2.0.0/schema.json", + test_collection["id"] = collection_id + test_collection["stac_extensions"] = [ + "https://stac-extensions.github.io/eo/v2.0.0/schema.json" ] - test_collection["stac_extensions"] = stac_extensions + test_item["collection"] = collection_id + test_item["stac_extensions"] = test_collection["stac_extensions"] - test_item["stac_extensions"] = stac_extensions + # Remove "eo:bands" to simplify the test + del test_item["properties"]["eo:bands"] - await create_collection(txn_client, test_collection) + # Attribute to test + attr = "eo:full_width_half_max" - for val in [ - value, - value + random.randint(10, 1010), - value - random.randint(10, 1010), - ]: + try: + await create_collection(txn_client, test_collection) + except ConflictError: + pass + + # Create items with deterministic offsets + for val in [value, value + 100, value - 100]: item = deepcopy(test_item) item["id"] = str(uuid.uuid4()) item["properties"][attr] = val await create_item(txn_client, item) + # Search for the exact value params = { - "collections": [item["collection"]], + "collections": [collection_id], "filter": { "args": [ { @@ -697,5 +693,8 @@ async def test_big_int_eo_search( } resp = await app_client.post("/search", json=params) resp_json = resp.json() - results = set([x["properties"][attr] for x in resp_json["features"]]) + + # Validate results + results = {x["properties"][attr] for x in resp_json["features"]} assert len(results) == expected + assert results == {value} From ebac915232415af237c66a1404cc21e5746aa120 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Fri, 2 May 2025 23:48:39 +0800 Subject: [PATCH 5/7] update changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 18191082..4abdea39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Added logging to bulk insertion methods to provide detailed feedback on errors encountered during operations. [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364) +- Introduced the `RAISE_ON_BULK_ERROR` environment variable to control whether bulk insertion methods raise exceptions on errors (`true`) or log warnings and continue processing (`false`). [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364) + ### Changed - Updated dynamic mapping for items to map long values to double versus float [#326](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/326) From f6c1aa0c819cb4a59ac53f1785963cb3c587eb94 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Sat, 3 May 2025 11:26:49 +0800 Subject: [PATCH 6/7] use settings --- .../stac_fastapi/elasticsearch/config.py | 2 ++ .../elasticsearch/database_logic.py | 26 ++++++++++++++----- .../stac_fastapi/opensearch/config.py | 2 ++ .../stac_fastapi/opensearch/database_logic.py | 22 +++++++++++----- 4 files changed, 38 insertions(+), 14 deletions(-) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py index 2044a4b2..37e1ba5b 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py @@ -86,6 +86,7 @@ class ElasticsearchSettings(ApiSettings, ApiBaseSettings): indexed_fields: Set[str] = {"datetime"} enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) + raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) @property def create_client(self): @@ -106,6 +107,7 @@ class AsyncElasticsearchSettings(ApiSettings, ApiBaseSettings): indexed_fields: Set[str] = {"datetime"} enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) + raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) @property def create_client(self): diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 0b5408d1..2834a4de 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -31,7 +31,7 @@ ) from stac_fastapi.core.extensions import filter from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer -from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, get_bool_env +from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings from stac_fastapi.elasticsearch.config import ( ElasticsearchSettings as SyncElasticsearchSettings, @@ -128,8 +128,20 @@ async def delete_item_index(collection_id: str): class DatabaseLogic(BaseDatabaseLogic): """Database logic.""" - client = AsyncElasticsearchSettings().create_client - sync_client = SyncElasticsearchSettings().create_client + async_settings: AsyncElasticsearchSettings = attr.ib( + factory=AsyncElasticsearchSettings + ) + sync_settings: SyncElasticsearchSettings = attr.ib( + factory=SyncElasticsearchSettings + ) + + client = attr.ib(init=False) + sync_client = attr.ib(init=False) + + def __attrs_post_init__(self): + """Initialize clients after the class is instantiated.""" + self.client = self.async_settings.create_client + self.sync_client = self.sync_settings.create_client item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer) collection_serializer: Type[CollectionSerializer] = attr.ib( @@ -767,7 +779,7 @@ async def bulk_async_prep_create_item( error_message = ( f"Item {item['id']} in collection {item['collection']} already exists." ) - if get_bool_env("RAISE_ON_BULK_ERROR", default=False): + if self.async_settings.raise_on_bulk_error: raise ConflictError(error_message) else: logger.warning( @@ -818,7 +830,7 @@ def bulk_sync_prep_create_item( error_message = ( f"Item {item['id']} in collection {item['collection']} already exists." ) - if get_bool_env("RAISE_ON_BULK_ERROR", default=False): + if self.sync_settings.raise_on_bulk_error: raise ConflictError(error_message) else: logger.warning( @@ -1047,7 +1059,7 @@ async def bulk_async( The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the index is refreshed after the bulk insert. """ - raise_on_error = get_bool_env("RAISE_ON_BULK_ERROR", default=False) + raise_on_error = self.async_settings.raise_on_bulk_error success, errors = await helpers.async_bulk( self.client, mk_actions(collection_id, processed_items), @@ -1081,7 +1093,7 @@ def bulk_sync( completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the index is refreshed after the bulk insert. """ - raise_on_error = get_bool_env("RAISE_ON_BULK_ERROR", default=False) + raise_on_error = self.sync_settings.raise_on_bulk_error success, errors = helpers.bulk( self.sync_client, mk_actions(collection_id, processed_items), diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py index 00498468..4c305fda 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py @@ -83,6 +83,7 @@ class OpensearchSettings(ApiSettings, ApiBaseSettings): indexed_fields: Set[str] = {"datetime"} enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) + raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) @property def create_client(self): @@ -103,6 +104,7 @@ class AsyncOpensearchSettings(ApiSettings, ApiBaseSettings): indexed_fields: Set[str] = {"datetime"} enable_response_models: bool = False enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) + raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) @property def create_client(self): diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index ddf9b3cf..a555e3b0 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -31,7 +31,7 @@ ) from stac_fastapi.core.extensions import filter from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer -from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, get_bool_env +from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon from stac_fastapi.opensearch.config import ( AsyncOpensearchSettings as AsyncSearchSettings, ) @@ -143,8 +143,16 @@ async def delete_item_index(collection_id: str) -> None: class DatabaseLogic(BaseDatabaseLogic): """Database logic.""" - client = AsyncSearchSettings().create_client - sync_client = SyncSearchSettings().create_client + async_settings: AsyncSearchSettings = attr.ib(factory=AsyncSearchSettings) + sync_settings: SyncSearchSettings = attr.ib(factory=SyncSearchSettings) + + client = attr.ib(init=False) + sync_client = attr.ib(init=False) + + def __attrs_post_init__(self): + """Initialize clients after the class is instantiated.""" + self.client = self.async_settings.create_client + self.sync_client = self.sync_settings.create_client item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer) collection_serializer: Type[CollectionSerializer] = attr.ib( @@ -791,7 +799,7 @@ async def bulk_async_prep_create_item( error_message = ( f"Item {item['id']} in collection {item['collection']} already exists." ) - if get_bool_env("RAISE_ON_BULK_ERROR", default=False): + if self.async_settings.raise_on_bulk_error: raise ConflictError(error_message) else: logger.warning( @@ -841,7 +849,7 @@ def bulk_sync_prep_create_item( error_message = ( f"Item {item['id']} in collection {item['collection']} already exists." ) - if get_bool_env("RAISE_ON_BULK_ERROR", default=False): + if self.sync_settings.raise_on_bulk_error: raise ConflictError(error_message) else: logger.warning( @@ -1070,7 +1078,7 @@ async def bulk_async( The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the index is refreshed after the bulk insert. """ - raise_on_error = get_bool_env("RAISE_ON_BULK_ERROR", default=False) + raise_on_error = self.async_settings.raise_on_bulk_error success, errors = await helpers.async_bulk( self.client, mk_actions(collection_id, processed_items), @@ -1104,7 +1112,7 @@ def bulk_sync( completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the index is refreshed after the bulk insert. """ - raise_on_error = get_bool_env("RAISE_ON_BULK_ERROR", default=False) + raise_on_error = self.sync_settings.raise_on_bulk_error success, errors = helpers.bulk( self.sync_client, mk_actions(collection_id, processed_items), From 643cc0bb1737a270c07342ad319d865f95c0c9cc Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Sun, 4 May 2025 12:11:54 +0800 Subject: [PATCH 7/7] add test --- stac_fastapi/core/stac_fastapi/core/core.py | 2 +- .../{test_elasticsearch.py => test_es_os.py} | 53 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) rename stac_fastapi/tests/clients/{test_elasticsearch.py => test_es_os.py} (83%) diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 28722b61..59cc1682 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -927,7 +927,7 @@ def bulk_item_insert( else: logger.info(f"Bulk sync operation succeeded with {success} actions.") - return f"Successfully added {success} Items. {attempted - success} errors occurred." + return f"Successfully added/updated {success} Items. {attempted - success} errors occurred." _DEFAULT_QUERYABLES: Dict[str, Dict[str, Any]] = { diff --git a/stac_fastapi/tests/clients/test_elasticsearch.py b/stac_fastapi/tests/clients/test_es_os.py similarity index 83% rename from stac_fastapi/tests/clients/test_elasticsearch.py rename to stac_fastapi/tests/clients/test_es_os.py index a0867ad3..e913f11f 100644 --- a/stac_fastapi/tests/clients/test_elasticsearch.py +++ b/stac_fastapi/tests/clients/test_es_os.py @@ -1,3 +1,4 @@ +import os import uuid from copy import deepcopy from typing import Callable @@ -10,6 +11,13 @@ from ..conftest import MockRequest, create_item +if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch": + from stac_fastapi.opensearch.config import OpensearchSettings as SearchSettings +else: + from stac_fastapi.elasticsearch.config import ( + ElasticsearchSettings as SearchSettings, + ) + @pytest.mark.asyncio async def test_create_collection(app_client, ctx, core_client, txn_client): @@ -297,6 +305,51 @@ async def test_bulk_item_insert(ctx, core_client, txn_client, bulk_txn_client): # ) +@pytest.mark.asyncio +async def test_bulk_item_insert_with_raise_on_error( + ctx, core_client, txn_client, bulk_txn_client +): + """ + Test bulk_item_insert behavior with RAISE_ON_BULK_ERROR set to true and false. + + This test verifies that when RAISE_ON_BULK_ERROR is set to true, a ConflictError + is raised for conflicting items. When set to false, the operation logs errors + and continues gracefully. + """ + + # Insert an initial item to set up a conflict + initial_item = deepcopy(ctx.item) + initial_item["id"] = str(uuid.uuid4()) + await create_item(txn_client, initial_item) + + # Verify the initial item is inserted + fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest()) + assert len(fc["features"]) >= 1 + + # Create conflicting items (same ID as the initial item) + conflicting_items = {initial_item["id"]: deepcopy(initial_item)} + + # Test with RAISE_ON_BULK_ERROR set to true + os.environ["RAISE_ON_BULK_ERROR"] = "true" + bulk_txn_client.database.sync_settings = SearchSettings() + + with pytest.raises(ConflictError): + bulk_txn_client.bulk_item_insert(Items(items=conflicting_items), refresh=True) + + # Test with RAISE_ON_BULK_ERROR set to false + os.environ["RAISE_ON_BULK_ERROR"] = "false" + bulk_txn_client.database.sync_settings = SearchSettings() # Reinitialize settings + result = bulk_txn_client.bulk_item_insert( + Items(items=conflicting_items), refresh=True + ) + + # Validate the results + assert "Successfully added/updated 1 Items" in result + + # Clean up the inserted item + await txn_client.delete_item(initial_item["id"], ctx.item["collection"]) + + @pytest.mark.asyncio async def test_feature_collection_insert( core_client,