diff --git a/CHANGELOG.md b/CHANGELOG.md index 83f9b501..f8e149b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,11 +10,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added - Added configurable landing page ID `STAC_FASTAPI_LANDING_PAGE_ID` [#352](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/352) +- Introduced the `DATABASE_REFRESH` environment variable to control whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370) ### Changed +- Refactored CRUD methods in `TransactionsClient` to use the `validate_refresh` helper method for consistent and reusable handling of the `refresh` parameter. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370) + ### Fixed +- Fixed an issue where some routes were not passing the `refresh` parameter from `kwargs` to the database logic, ensuring consistent behavior across all CRUD operations. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370) + ## [v4.1.0] - 2025-05-09 ### Added diff --git a/README.md b/README.md index 3786f612..8644bde9 100644 --- a/README.md +++ b/README.md @@ -112,10 +112,11 @@ You can customize additional settings in your `.env` file: | `RELOAD` | Enable auto-reload for development. | `true` | Optional | | `STAC_FASTAPI_RATE_LIMIT` | API rate limit per client. | `200/minute` | Optional | | `BACKEND` | Tests-related variable | `elasticsearch` or `opensearch` based on the backend | Optional | -| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional | -| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional | +| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional | | | `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional -| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` | Optional | +| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional +| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` Optional | +| `DATABASE_REFRESH` | Controls whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. | `false` | Optional | > [!NOTE] > The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch. diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index f994b619..987acdf6 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -712,10 +712,11 @@ async def create_item( for feature in features ] attempted = len(processed_items) + success, errors = await self.database.bulk_async( - collection_id, - processed_items, - refresh=kwargs.get("refresh", False), + collection_id=collection_id, + processed_items=processed_items, + **kwargs, ) if errors: logger.error( @@ -729,10 +730,7 @@ async def create_item( # Handle single item await self.database.create_item( - item_dict, - refresh=kwargs.get("refresh", False), - base_url=base_url, - exist_ok=False, + item_dict, base_url=base_url, exist_ok=False, **kwargs ) return ItemSerializer.db_to_stac(item_dict, base_url) @@ -757,11 +755,12 @@ async def update_item( """ item = item.model_dump(mode="json") base_url = str(kwargs["request"].base_url) + now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") item["properties"]["updated"] = now await self.database.create_item( - item, refresh=kwargs.get("refresh", False), base_url=base_url, exist_ok=True + item, base_url=base_url, exist_ok=True, **kwargs ) return ItemSerializer.db_to_stac(item, base_url) @@ -777,7 +776,9 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs) -> None: Returns: None: Returns 204 No Content on successful deletion """ - await self.database.delete_item(item_id=item_id, collection_id=collection_id) + await self.database.delete_item( + item_id=item_id, collection_id=collection_id, **kwargs + ) return None @overrides @@ -798,8 +799,9 @@ async def create_collection( """ collection = collection.model_dump(mode="json") request = kwargs["request"] + collection = self.database.collection_serializer.stac_to_db(collection, request) - await self.database.create_collection(collection=collection) + await self.database.create_collection(collection=collection, **kwargs) return CollectionSerializer.db_to_stac( collection, request, @@ -835,7 +837,7 @@ async def update_collection( collection = self.database.collection_serializer.stac_to_db(collection, request) await self.database.update_collection( - collection_id=collection_id, collection=collection + collection_id=collection_id, collection=collection, **kwargs ) return CollectionSerializer.db_to_stac( @@ -860,7 +862,7 @@ async def delete_collection(self, collection_id: str, **kwargs) -> None: Raises: NotFoundError: If the collection doesn't exist """ - await self.database.delete_collection(collection_id=collection_id) + await self.database.delete_collection(collection_id=collection_id, **kwargs) return None @@ -937,7 +939,7 @@ def bulk_item_insert( success, errors = self.database.bulk_sync( collection_id, processed_items, - refresh=kwargs.get("refresh", False), + **kwargs, ) if errors: logger.error(f"Bulk sync operation encountered errors: {errors}") diff --git a/stac_fastapi/core/stac_fastapi/core/utilities.py b/stac_fastapi/core/stac_fastapi/core/utilities.py index e7aafe67..d4a35109 100644 --- a/stac_fastapi/core/stac_fastapi/core/utilities.py +++ b/stac_fastapi/core/stac_fastapi/core/utilities.py @@ -12,20 +12,75 @@ MAX_LIMIT = 10000 -def get_bool_env(name: str, default: bool = False) -> bool: +def validate_refresh(value: Union[str, bool]) -> str: + """ + Validate the `refresh` parameter value. + + Args: + value (Union[str, bool]): The `refresh` parameter value, which can be a string or a boolean. + + Returns: + str: The validated value of the `refresh` parameter, which can be "true", "false", or "wait_for". + """ + logger = logging.getLogger(__name__) + + # Handle boolean-like values using get_bool_env + if isinstance(value, bool) or value in { + "true", + "false", + "1", + "0", + "yes", + "no", + "y", + "n", + }: + is_true = get_bool_env("DATABASE_REFRESH", default=value) + return "true" if is_true else "false" + + # Normalize to lowercase for case-insensitivity + value = value.lower() + + # Handle "wait_for" explicitly + if value == "wait_for": + return "wait_for" + + # Log a warning for invalid values and default to "false" + logger.warning( + f"Invalid value for `refresh`: '{value}'. Expected 'true', 'false', or 'wait_for'. Defaulting to 'false'." + ) + return "false" + + +def get_bool_env(name: str, default: Union[bool, str] = False) -> bool: """ Retrieve a boolean value from an environment variable. Args: name (str): The name of the environment variable. - default (bool, optional): The default value to use if the variable is not set or unrecognized. Defaults to False. + default (Union[bool, str], optional): The default value to use if the variable is not set or unrecognized. Defaults to False. Returns: bool: The boolean value parsed from the environment variable. """ - value = os.getenv(name, str(default).lower()) true_values = ("true", "1", "yes", "y") false_values = ("false", "0", "no", "n") + + # Normalize the default value + if isinstance(default, bool): + default_str = "true" if default else "false" + elif isinstance(default, str): + default_str = default.lower() + else: + logger = logging.getLogger(__name__) + logger.warning( + f"The `default` parameter must be a boolean or string, got {type(default).__name__}. " + f"Falling back to `False`." + ) + default_str = "false" + + # Retrieve and normalize the environment variable value + value = os.getenv(name, default_str) if value.lower() in true_values: return True elif value.lower() in false_values: @@ -34,9 +89,9 @@ def get_bool_env(name: str, default: bool = False) -> bool: logger = logging.getLogger(__name__) logger.warning( f"Environment variable '{name}' has unrecognized value '{value}'. " - f"Expected one of {true_values + false_values}. Using default: {default}" + f"Expected one of {true_values + false_values}. Using default: {default_str}" ) - return default + return default_str in true_values def bbox2polygon(b0: float, b1: float, b2: float, b3: float) -> List[List[List[float]]]: diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py index 37e1ba5b..accbe8cc 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py @@ -3,14 +3,14 @@ import logging import os import ssl -from typing import Any, Dict, Set +from typing import Any, Dict, Set, Union import certifi from elasticsearch._async.client import AsyncElasticsearch from elasticsearch import Elasticsearch # type: ignore[attr-defined] from stac_fastapi.core.base_settings import ApiBaseSettings -from stac_fastapi.core.utilities import get_bool_env +from stac_fastapi.core.utilities import get_bool_env, validate_refresh from stac_fastapi.types.config import ApiSettings @@ -88,6 +88,17 @@ class ElasticsearchSettings(ApiSettings, ApiBaseSettings): enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) + @property + def database_refresh(self) -> Union[bool, str]: + """ + Get the value of the DATABASE_REFRESH environment variable. + + Returns: + Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for". + """ + value = os.getenv("DATABASE_REFRESH", "false") + return validate_refresh(value) + @property def create_client(self): """Create es client.""" @@ -109,6 +120,17 @@ class AsyncElasticsearchSettings(ApiSettings, ApiBaseSettings): enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) + @property + def database_refresh(self) -> Union[bool, str]: + """ + Get the value of the DATABASE_REFRESH environment variable. + + Returns: + Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for". + """ + value = os.getenv("DATABASE_REFRESH", "false") + return validate_refresh(value) + @property def create_client(self): """Create async elasticsearch client.""" diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 9a773230..7afbb58d 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -31,7 +31,7 @@ ) from stac_fastapi.core.extensions import filter from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer -from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon +from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, validate_refresh from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings from stac_fastapi.elasticsearch.config import ( ElasticsearchSettings as SyncElasticsearchSettings, @@ -845,15 +845,19 @@ def bulk_sync_prep_create_item( async def create_item( self, item: Item, - refresh: bool = False, base_url: str = "", exist_ok: bool = False, + **kwargs: Any, ): """Database logic for creating one item. Args: item (Item): The item to be created. - refresh (bool, optional): Refresh the index after performing the operation. Defaults to False. + base_url (str, optional): The base URL for the item. Defaults to an empty string. + exist_ok (bool, optional): Whether to allow the item to exist already. Defaults to False. + **kwargs: Additional keyword arguments. + - refresh (str): Whether to refresh the index after the operation. Can be "true", "false", or "wait_for". + - refresh (bool): Whether to refresh the index after the operation. Defaults to the value in `self.async_settings.database_refresh`. Raises: ConflictError: If the item already exists in the database. @@ -861,12 +865,28 @@ async def create_item( Returns: None """ - # todo: check if collection exists, but cache + # Extract item and collection IDs item_id = item["id"] collection_id = item["collection"] + + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) + + # Log the creation attempt + logger.info( + f"Creating item {item_id} in collection {collection_id} with refresh={refresh}" + ) + + # Prepare the item for insertion item = await self.async_prep_create_item( item=item, base_url=base_url, exist_ok=exist_ok ) + + # Index the item in the database await self.client.index( index=index_alias_by_collection_id(collection_id), id=mk_item_id(item_id, collection_id), @@ -874,26 +894,43 @@ async def create_item( refresh=refresh, ) - async def delete_item( - self, item_id: str, collection_id: str, refresh: bool = False - ): + async def delete_item(self, item_id: str, collection_id: str, **kwargs: Any): """Delete a single item from the database. Args: item_id (str): The id of the Item to be deleted. collection_id (str): The id of the Collection that the Item belongs to. - refresh (bool, optional): Whether to refresh the index after the deletion. Default is False. + **kwargs: Additional keyword arguments. + - refresh (str): Whether to refresh the index after the operation. Can be "true", "false", or "wait_for". + - refresh (bool): Whether to refresh the index after the operation. Defaults to the value in `self.async_settings.database_refresh`. Raises: NotFoundError: If the Item does not exist in the database. + + Returns: + None """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) + + # Log the deletion attempt + logger.info( + f"Deleting item {item_id} from collection {collection_id} with refresh={refresh}" + ) + try: + # Perform the delete operation await self.client.delete( index=index_alias_by_collection_id(collection_id), id=mk_item_id(item_id, collection_id), refresh=refresh, ) except ESNotFoundError: + # Raise a custom NotFoundError if the item does not exist raise NotFoundError( f"Item {item_id} in collection {collection_id} not found" ) @@ -916,24 +953,41 @@ async def get_items_mapping(self, collection_id: str) -> Dict[str, Any]: except ESNotFoundError: raise NotFoundError(f"Mapping for index {index_name} not found") - async def create_collection(self, collection: Collection, refresh: bool = False): + async def create_collection(self, collection: Collection, **kwargs: Any): """Create a single collection in the database. Args: collection (Collection): The Collection object to be created. - refresh (bool, optional): Whether to refresh the index after the creation. Default is False. + **kwargs: Additional keyword arguments. + - refresh (str): Whether to refresh the index after the operation. Can be "true", "false", or "wait_for". + - refresh (bool): Whether to refresh the index after the operation. Defaults to the value in `self.async_settings.database_refresh`. Raises: ConflictError: If a Collection with the same id already exists in the database. + Returns: + None + Notes: A new index is created for the items in the Collection using the `create_item_index` function. """ collection_id = collection["id"] + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) + + # Log the creation attempt + logger.info(f"Creating collection {collection_id} with refresh={refresh}") + + # Check if the collection already exists if await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise ConflictError(f"Collection {collection_id} already exists") + # Index the collection in the database await self.client.index( index=COLLECTIONS_INDEX, id=collection_id, @@ -941,6 +995,7 @@ async def create_collection(self, collection: Collection, refresh: bool = False) refresh=refresh, ) + # Create the item index for the collection await create_item_index(collection_id) async def find_collection(self, collection_id: str) -> Collection: @@ -970,29 +1025,52 @@ async def find_collection(self, collection_id: str) -> Collection: return collection["_source"] async def update_collection( - self, collection_id: str, collection: Collection, refresh: bool = False + self, collection_id: str, collection: Collection, **kwargs: Any ): - """Update a collection from the database. + """Update a collection in the database. Args: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to be updated. collection (Collection): The Collection object to be used for the update. + **kwargs: Additional keyword arguments. + - refresh (str): Whether to refresh the index after the operation. Can be "true", "false", or "wait_for". + - refresh (bool): Whether to refresh the index after the operation. Defaults to the value in `self.async_settings.database_refresh`. + Returns: + None Raises: - NotFoundError: If the collection with the given `collection_id` is not - found in the database. + NotFoundError: If the collection with the given `collection_id` is not found in the database. + ConflictError: If a conflict occurs during the update. Notes: This function updates the collection in the database using the specified - `collection_id` and with the collection specified in the `Collection` object. - If the collection is not found, a `NotFoundError` is raised. + `collection_id` and the provided `Collection` object. If the collection ID + changes, the function creates a new collection, reindexes the items, and deletes + the old collection. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) + + # Log the update attempt + logger.info(f"Updating collection {collection_id} with refresh={refresh}") + + # Ensure the collection exists await self.find_collection(collection_id=collection_id) + # Handle collection ID change if collection_id != collection["id"]: + logger.info( + f"Collection ID change detected: {collection_id} -> {collection['id']}" + ) + + # Create the new collection await self.create_collection(collection, refresh=refresh) + # Reindex items from the old collection to the new collection await self.client.reindex( body={ "dest": {"index": f"{ITEMS_INDEX_PREFIX}{collection['id']}"}, @@ -1006,9 +1084,11 @@ async def update_collection( refresh=refresh, ) + # Delete the old collection await self.delete_collection(collection_id) else: + # Update the existing collection await self.client.index( index=COLLECTIONS_INDEX, id=collection_id, @@ -1016,33 +1096,57 @@ async def update_collection( refresh=refresh, ) - async def delete_collection(self, collection_id: str, refresh: bool = False): + async def delete_collection(self, collection_id: str, **kwargs: Any): """Delete a collection from the database. Parameters: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to be deleted. - refresh (bool): Whether to refresh the index after the deletion (default: False). + kwargs (Any, optional): Additional keyword arguments, including `refresh`. + - refresh (str): Whether to refresh the index after the operation. Can be "true", "false", or "wait_for". + - refresh (bool): Whether to refresh the index after the operation. Defaults to the value in `self.async_settings.database_refresh`. Raises: NotFoundError: If the collection with the given `collection_id` is not found in the database. + Returns: + None + Notes: This function first verifies that the collection with the specified `collection_id` exists in the database, and then - deletes the collection. If `refresh` is set to True, the index is refreshed after the deletion. Additionally, this - function also calls `delete_item_index` to delete the index for the items in the collection. + deletes the collection. If `refresh` is set to "true", "false", or "wait_for", the index is refreshed accordingly after + the deletion. Additionally, this function also calls `delete_item_index` to delete the index for the items in the collection. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Verify that the collection exists await self.find_collection(collection_id=collection_id) + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) + + # Log the deletion attempt + logger.info(f"Deleting collection {collection_id} with refresh={refresh}") + + # Delete the collection from the database await self.client.delete( index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh ) - await delete_item_index(collection_id) + + # Delete the item index for the collection + try: + await delete_item_index(collection_id) + except Exception as e: + logger.error( + f"Failed to delete item index for collection {collection_id}: {e}" + ) async def bulk_async( self, collection_id: str, processed_items: List[Item], - refresh: bool = False, + **kwargs: Any, ) -> Tuple[int, List[Dict[str, Any]]]: """ Perform a bulk insert of items into the database asynchronously. @@ -1050,7 +1154,12 @@ async def bulk_async( Args: collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. - refresh (bool): Whether to refresh the index after the bulk insert (default: False). + **kwargs (Any): Additional keyword arguments, including: + - refresh (str, optional): Whether to refresh the index after the bulk insert. + Can be "true", "false", or "wait_for". Defaults to the value of `self.sync_settings.database_refresh`. + - refresh (bool, optional): Whether to refresh the index after the bulk insert. + - raise_on_error (bool, optional): Whether to raise an error if any of the bulk operations fail. + Defaults to the value of `self.async_settings.raise_on_bulk_error`. Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1059,10 +1168,31 @@ async def bulk_async( Notes: This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. - The insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. - The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, - the index is refreshed after the bulk insert. + The insert is performed synchronously and blocking, meaning that the function does not return until the insert has + completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. The `refresh` + parameter determines whether the index is refreshed after the bulk insert: + - "true": Forces an immediate refresh of the index. + - "false": Does not refresh the index immediately (default behavior). + - "wait_for": Waits for the next refresh cycle to make the changes visible. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) + + # Log the bulk insert attempt + logger.info( + f"Performing bulk insert for collection {collection_id} with refresh={refresh}" + ) + + # Handle empty processed_items + if not processed_items: + logger.warning(f"No items to insert for collection {collection_id}") + return 0, [] + + # Perform the bulk insert raise_on_error = self.async_settings.raise_on_bulk_error success, errors = await helpers.async_bulk( self.client, @@ -1070,13 +1200,19 @@ async def bulk_async( refresh=refresh, raise_on_error=raise_on_error, ) + + # Log the result + logger.info( + f"Bulk insert completed for collection {collection_id}: {success} successes, {len(errors)} errors" + ) + return success, errors def bulk_sync( self, collection_id: str, processed_items: List[Item], - refresh: bool = False, + **kwargs: Any, ) -> Tuple[int, List[Dict[str, Any]]]: """ Perform a bulk insert of items into the database synchronously. @@ -1084,7 +1220,12 @@ def bulk_sync( Args: collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. - refresh (bool): Whether to refresh the index after the bulk insert (default: False). + **kwargs (Any): Additional keyword arguments, including: + - refresh (str, optional): Whether to refresh the index after the bulk insert. + Can be "true", "false", or "wait_for". Defaults to the value of `self.sync_settings.database_refresh`. + - refresh (bool, optional): Whether to refresh the index after the bulk insert. + - raise_on_error (bool, optional): Whether to raise an error if any of the bulk operations fail. + Defaults to the value of `self.async_settings.raise_on_bulk_error`. Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1094,9 +1235,30 @@ def bulk_sync( Notes: This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The insert is performed synchronously and blocking, meaning that the function does not return until the insert has - completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to - True, the index is refreshed after the bulk insert. + completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. The `refresh` + parameter determines whether the index is refreshed after the bulk insert: + - "true": Forces an immediate refresh of the index. + - "false": Does not refresh the index immediately (default behavior). + - "wait_for": Waits for the next refresh cycle to make the changes visible. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) + + # Log the bulk insert attempt + logger.info( + f"Performing bulk insert for collection {collection_id} with refresh={refresh}" + ) + + # Handle empty processed_items + if not processed_items: + logger.warning(f"No items to insert for collection {collection_id}") + return 0, [] + + # Perform the bulk insert raise_on_error = self.sync_settings.raise_on_bulk_error success, errors = helpers.bulk( self.sync_client, @@ -1104,6 +1266,12 @@ def bulk_sync( refresh=refresh, raise_on_error=raise_on_error, ) + + # Log the result + logger.info( + f"Bulk insert completed for collection {collection_id}: {success} successes, {len(errors)} errors" + ) + return success, errors # DANGER diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py index 4c305fda..3a53ffdf 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/config.py @@ -2,13 +2,13 @@ import logging import os import ssl -from typing import Any, Dict, Set +from typing import Any, Dict, Set, Union import certifi from opensearchpy import AsyncOpenSearch, OpenSearch from stac_fastapi.core.base_settings import ApiBaseSettings -from stac_fastapi.core.utilities import get_bool_env +from stac_fastapi.core.utilities import get_bool_env, validate_refresh from stac_fastapi.types.config import ApiSettings @@ -85,6 +85,17 @@ class OpensearchSettings(ApiSettings, ApiBaseSettings): enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) + @property + def database_refresh(self) -> Union[bool, str]: + """ + Get the value of the DATABASE_REFRESH environment variable. + + Returns: + Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for". + """ + value = os.getenv("DATABASE_REFRESH", "false") + return validate_refresh(value) + @property def create_client(self): """Create es client.""" @@ -106,6 +117,17 @@ class AsyncOpensearchSettings(ApiSettings, ApiBaseSettings): enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False) raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False) + @property + def database_refresh(self) -> Union[bool, str]: + """ + Get the value of the DATABASE_REFRESH environment variable. + + Returns: + Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for". + """ + value = os.getenv("DATABASE_REFRESH", "false") + return validate_refresh(value) + @property def create_client(self): """Create async elasticsearch client.""" diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 66c8d3e6..5b9510f3 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -31,7 +31,7 @@ ) from stac_fastapi.core.extensions import filter from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer -from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon +from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, validate_refresh from stac_fastapi.opensearch.config import ( AsyncOpensearchSettings as AsyncSearchSettings, ) @@ -864,15 +864,17 @@ def bulk_sync_prep_create_item( async def create_item( self, item: Item, - refresh: bool = False, base_url: str = "", exist_ok: bool = False, + **kwargs: Any, ): """Database logic for creating one item. Args: item (Item): The item to be created. - refresh (bool, optional): Refresh the index after performing the operation. Defaults to False. + base_url (str, optional): The base URL for the item. Defaults to an empty string. + exist_ok (bool, optional): Whether to allow the item to exist already. Defaults to False. + **kwargs: Additional keyword arguments like refresh. Raises: ConflictError: If the item already exists in the database. @@ -883,6 +885,19 @@ async def create_item( # todo: check if collection exists, but cache item_id = item["id"] collection_id = item["collection"] + + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) + + # Log the creation attempt + logger.info( + f"Creating item {item_id} in collection {collection_id} with refresh={refresh}" + ) + item = await self.async_prep_create_item( item=item, base_url=base_url, exist_ok=exist_ok ) @@ -893,19 +908,29 @@ async def create_item( refresh=refresh, ) - async def delete_item( - self, item_id: str, collection_id: str, refresh: bool = False - ): + async def delete_item(self, item_id: str, collection_id: str, **kwargs: Any): """Delete a single item from the database. Args: item_id (str): The id of the Item to be deleted. collection_id (str): The id of the Collection that the Item belongs to. - refresh (bool, optional): Whether to refresh the index after the deletion. Default is False. + **kwargs: Additional keyword arguments like refresh. Raises: NotFoundError: If the Item does not exist in the database. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) + + # Log the deletion attempt + logger.info( + f"Deleting item {item_id} from collection {collection_id} with refresh={refresh}" + ) + try: await self.client.delete( index=index_alias_by_collection_id(collection_id), @@ -935,12 +960,12 @@ async def get_items_mapping(self, collection_id: str) -> Dict[str, Any]: except exceptions.NotFoundError: raise NotFoundError(f"Mapping for index {index_name} not found") - async def create_collection(self, collection: Collection, refresh: bool = False): + async def create_collection(self, collection: Collection, **kwargs: Any): """Create a single collection in the database. Args: collection (Collection): The Collection object to be created. - refresh (bool, optional): Whether to refresh the index after the creation. Default is False. + **kwargs: Additional keyword arguments like refresh. Raises: ConflictError: If a Collection with the same id already exists in the database. @@ -950,6 +975,16 @@ async def create_collection(self, collection: Collection, refresh: bool = False) """ collection_id = collection["id"] + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) + + # Log the creation attempt + logger.info(f"Creating collection {collection_id} with refresh={refresh}") + if await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise ConflictError(f"Collection {collection_id} already exists") @@ -989,14 +1024,14 @@ async def find_collection(self, collection_id: str) -> Collection: return collection["_source"] async def update_collection( - self, collection_id: str, collection: Collection, refresh: bool = False + self, collection_id: str, collection: Collection, **kwargs: Any ): """Update a collection from the database. Args: - self: The instance of the object calling this function. collection_id (str): The ID of the collection to be updated. collection (Collection): The Collection object to be used for the update. + **kwargs: Additional keyword arguments like refresh. Raises: NotFoundError: If the collection with the given `collection_id` is not @@ -1007,9 +1042,23 @@ async def update_collection( `collection_id` and with the collection specified in the `Collection` object. If the collection is not found, a `NotFoundError` is raised. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) + + # Log the update attempt + logger.info(f"Updating collection {collection_id} with refresh={refresh}") + await self.find_collection(collection_id=collection_id) if collection_id != collection["id"]: + logger.info( + f"Collection ID change detected: {collection_id} -> {collection['id']}" + ) + await self.create_collection(collection, refresh=refresh) await self.client.reindex( @@ -1025,7 +1074,7 @@ async def update_collection( refresh=refresh, ) - await self.delete_collection(collection_id) + await self.delete_collection(collection_id=collection_id, **kwargs) else: await self.client.index( @@ -1035,23 +1084,34 @@ async def update_collection( refresh=refresh, ) - async def delete_collection(self, collection_id: str, refresh: bool = False): + async def delete_collection(self, collection_id: str, **kwargs: Any): """Delete a collection from the database. Parameters: self: The instance of the object calling this function. collection_id (str): The ID of the collection to be deleted. - refresh (bool): Whether to refresh the index after the deletion (default: False). + **kwargs: Additional keyword arguments like refresh. Raises: NotFoundError: If the collection with the given `collection_id` is not found in the database. Notes: This function first verifies that the collection with the specified `collection_id` exists in the database, and then - deletes the collection. If `refresh` is set to True, the index is refreshed after the deletion. Additionally, this - function also calls `delete_item_index` to delete the index for the items in the collection. + deletes the collection. If `refresh` is set to "true", "false", or "wait_for", the index is refreshed accordingly after + the deletion. Additionally, this function also calls `delete_item_index` to delete the index for the items in the collection. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + await self.find_collection(collection_id=collection_id) + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) + + # Log the deletion attempt + logger.info(f"Deleting collection {collection_id} with refresh={refresh}") + await self.client.delete( index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh ) @@ -1061,7 +1121,7 @@ async def bulk_async( self, collection_id: str, processed_items: List[Item], - refresh: bool = False, + **kwargs: Any, ) -> Tuple[int, List[Dict[str, Any]]]: """ Perform a bulk insert of items into the database asynchronously. @@ -1069,7 +1129,12 @@ async def bulk_async( Args: collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. - refresh (bool): Whether to refresh the index after the bulk insert (default: False). + **kwargs (Any): Additional keyword arguments, including: + - refresh (str, optional): Whether to refresh the index after the bulk insert. + Can be "true", "false", or "wait_for". Defaults to the value of `self.sync_settings.database_refresh`. + - refresh (bool, optional): Whether to refresh the index after the bulk insert. + - raise_on_error (bool, optional): Whether to raise an error if any of the bulk operations fail. + Defaults to the value of `self.async_settings.raise_on_bulk_error`. Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1078,10 +1143,30 @@ async def bulk_async( Notes: This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. - The insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. - The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, - the index is refreshed after the bulk insert. + The insert is performed synchronously and blocking, meaning that the function does not return until the insert has + completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. The `refresh` + parameter determines whether the index is refreshed after the bulk insert: + - "true": Forces an immediate refresh of the index. + - "false": Does not refresh the index immediately (default behavior). + - "wait_for": Waits for the next refresh cycle to make the changes visible. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) + + # Log the bulk insert attempt + logger.info( + f"Performing bulk insert for collection {collection_id} with refresh={refresh}" + ) + + # Handle empty processed_items + if not processed_items: + logger.warning(f"No items to insert for collection {collection_id}") + return 0, [] + raise_on_error = self.async_settings.raise_on_bulk_error success, errors = await helpers.async_bulk( self.client, @@ -1089,21 +1174,30 @@ async def bulk_async( refresh=refresh, raise_on_error=raise_on_error, ) + # Log the result + logger.info( + f"Bulk insert completed for collection {collection_id}: {success} successes, {len(errors)} errors" + ) return success, errors def bulk_sync( self, collection_id: str, processed_items: List[Item], - refresh: bool = False, + **kwargs: Any, ) -> Tuple[int, List[Dict[str, Any]]]: """ - Perform a bulk insert of items into the database synchronously. + Perform a bulk insert of items into the database asynchronously. Args: collection_id (str): The ID of the collection to which the items belong. processed_items (List[Item]): A list of `Item` objects to be inserted into the database. - refresh (bool): Whether to refresh the index after the bulk insert (default: False). + **kwargs (Any): Additional keyword arguments, including: + - refresh (str, optional): Whether to refresh the index after the bulk insert. + Can be "true", "false", or "wait_for". Defaults to the value of `self.sync_settings.database_refresh`. + - refresh (bool, optional): Whether to refresh the index after the bulk insert. + - raise_on_error (bool, optional): Whether to raise an error if any of the bulk operations fail. + Defaults to the value of `self.async_settings.raise_on_bulk_error`. Returns: Tuple[int, List[Dict[str, Any]]]: A tuple containing: @@ -1113,9 +1207,29 @@ def bulk_sync( Notes: This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The insert is performed synchronously and blocking, meaning that the function does not return until the insert has - completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to - True, the index is refreshed after the bulk insert. + completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. The `refresh` + parameter determines whether the index is refreshed after the bulk insert: + - "true": Forces an immediate refresh of the index. + - "false": Does not refresh the index immediately (default behavior). + - "wait_for": Waits for the next refresh cycle to make the changes visible. """ + # Ensure kwargs is a dictionary + kwargs = kwargs or {} + + # Resolve the `refresh` parameter + refresh = kwargs.get("refresh", self.async_settings.database_refresh) + refresh = validate_refresh(refresh) + + # Log the bulk insert attempt + logger.info( + f"Performing bulk insert for collection {collection_id} with refresh={refresh}" + ) + + # Handle empty processed_items + if not processed_items: + logger.warning(f"No items to insert for collection {collection_id}") + return 0, [] + raise_on_error = self.sync_settings.raise_on_bulk_error success, errors = helpers.bulk( self.sync_client, diff --git a/stac_fastapi/tests/elasticsearch/test_direct_response.py b/stac_fastapi/tests/config/test_config_settings.py similarity index 60% rename from stac_fastapi/tests/elasticsearch/test_direct_response.py rename to stac_fastapi/tests/config/test_config_settings.py index bbbceb56..8509c259 100644 --- a/stac_fastapi/tests/elasticsearch/test_direct_response.py +++ b/stac_fastapi/tests/config/test_config_settings.py @@ -37,3 +37,27 @@ def test_enable_direct_response_false(monkeypatch): settings_class, _ = get_settings_class() settings = settings_class() assert settings.enable_direct_response is False + + +def test_database_refresh_true(monkeypatch): + """Test that DATABASE_REFRESH env var enables database refresh.""" + monkeypatch.setenv("DATABASE_REFRESH", "true") + settings_class, _ = get_settings_class() + settings = settings_class() + assert settings.database_refresh == "true" + + +def test_database_refresh_false(monkeypatch): + """Test that DATABASE_REFRESH env var disables database refresh.""" + monkeypatch.setenv("DATABASE_REFRESH", "false") + settings_class, _ = get_settings_class() + settings = settings_class() + assert settings.database_refresh == "false" + + +def test_database_refresh_wait_for(monkeypatch): + """Test that DATABASE_REFRESH env var sets database refresh to 'wait_for'.""" + monkeypatch.setenv("DATABASE_REFRESH", "wait_for") + settings_class, _ = get_settings_class() + settings = settings_class() + assert settings.database_refresh == "wait_for" diff --git a/stac_fastapi/tests/extensions/__init__.py b/stac_fastapi/tests/extensions/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/stac_fastapi/tests/clients/test_bulk_transactions.py b/stac_fastapi/tests/extensions/test_bulk_transactions.py similarity index 100% rename from stac_fastapi/tests/clients/test_bulk_transactions.py rename to stac_fastapi/tests/extensions/test_bulk_transactions.py