Skip to content

Add es_os_refresh env var to refresh index, ensure refresh passed via kwargs #370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 10, 2025
Merged
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Added logging to bulk insertion methods to provide detailed feedback on errors encountered during operations. [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364)
- Introduced the `RAISE_ON_BULK_ERROR` environment variable to control whether bulk insertion methods raise exceptions on errors (`true`) or log warnings and continue processing (`false`). [#364](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/364)
- Added code coverage reporting to the test suite using pytest-cov. [#87](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/87)
- Introduced the `DATABASE_REFRESH` environment variable to control whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)

### Changed

- Updated dynamic mapping for items to map long values to double versus float. [#326](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/326)
- Extended Datetime Search to search on start_datetime and end_datetime as well as datetime fields. [#182](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/182)
- Changed item update operation to use Elasticsearch index API instead of delete and create for better efficiency and atomicity. [#75](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/75)
- Bulk insertion via `BulkTransactionsClient` now strictly validates all STAC Items using the Pydantic model before insertion. Any invalid item will immediately raise a `ValidationError`, ensuring consistent validation with single-item inserts and preventing invalid STAC Items from being stored. This validation is enforced regardless of the `RAISE_ON_BULK_ERROR` setting. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368)

- Refactored CRUD methods in `TransactionsClient` to use the `_resolve_refresh` helper method for consistent and reusable handling of the `refresh` parameter. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)

### Fixed

- Refactored `create_item` and `update_item` methods to share unified logic, ensuring consistent conflict detection, validation, and database operations. [#368](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/368)
- Fixed an issue where some routes were not passing the `refresh` parameter from `kwargs` to the database logic, ensuring consistent behavior across all CRUD operations. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)

## [v4.0.0] - 2025-04-23

Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,11 @@ You can customize additional settings in your `.env` file:
| `RELOAD` | Enable auto-reload for development. | `true` | Optional |
| `STAC_FASTAPI_RATE_LIMIT` | API rate limit per client. | `200/minute` | Optional |
| `BACKEND` | Tests-related variable | `elasticsearch` or `opensearch` based on the backend | Optional |
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional |
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional |
| `ELASTICSEARCH_VERSION` | Version of Elasticsearch to use. | `8.11.0` | Optional | |
| `OPENSEARCH_VERSION` | OpenSearch version | `2.11.1` | Optional
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` | Optional |
| `ENABLE_DIRECT_RESPONSE` | Enable direct response for maximum performance (disables all FastAPI dependencies, including authentication, custom status codes, and validation) | `false` | Optional
| `RAISE_ON_BULK_ERROR` | Controls whether bulk insert operations raise exceptions on errors. If set to `true`, the operation will stop and raise an exception when an error occurs. If set to `false`, errors will be logged, and the operation will continue. **Note:** STAC Item and ItemCollection validation errors will always raise, regardless of this flag. | `false` Optional |
| `DATABASE_REFRESH` | Controls whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. | `false` | Optional |

> [!NOTE]
> The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, and `ES_VERIFY_CERTS` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch.
Expand Down
28 changes: 15 additions & 13 deletions stac_fastapi/core/stac_fastapi/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,10 +712,11 @@ async def create_item(
for feature in features
]
attempted = len(processed_items)

success, errors = await self.database.bulk_async(
collection_id,
processed_items,
refresh=kwargs.get("refresh", False),
collection_id=collection_id,
processed_items=processed_items,
**kwargs,
)
if errors:
logger.error(
Expand All @@ -729,10 +730,7 @@ async def create_item(

# Handle single item
await self.database.create_item(
item_dict,
refresh=kwargs.get("refresh", False),
base_url=base_url,
exist_ok=False,
item_dict, base_url=base_url, exist_ok=False, **kwargs
)
return ItemSerializer.db_to_stac(item_dict, base_url)

Expand All @@ -757,11 +755,12 @@ async def update_item(
"""
item = item.model_dump(mode="json")
base_url = str(kwargs["request"].base_url)

now = datetime_type.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
item["properties"]["updated"] = now

await self.database.create_item(
item, refresh=kwargs.get("refresh", False), base_url=base_url, exist_ok=True
item, base_url=base_url, exist_ok=True, **kwargs
)

return ItemSerializer.db_to_stac(item, base_url)
Expand All @@ -777,7 +776,9 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs) -> None:
Returns:
None: Returns 204 No Content on successful deletion
"""
await self.database.delete_item(item_id=item_id, collection_id=collection_id)
await self.database.delete_item(
item_id=item_id, collection_id=collection_id, **kwargs
)
return None

@overrides
Expand All @@ -798,8 +799,9 @@ async def create_collection(
"""
collection = collection.model_dump(mode="json")
request = kwargs["request"]

collection = self.database.collection_serializer.stac_to_db(collection, request)
await self.database.create_collection(collection=collection)
await self.database.create_collection(collection=collection, **kwargs)
return CollectionSerializer.db_to_stac(
collection,
request,
Expand Down Expand Up @@ -835,7 +837,7 @@ async def update_collection(

collection = self.database.collection_serializer.stac_to_db(collection, request)
await self.database.update_collection(
collection_id=collection_id, collection=collection
collection_id=collection_id, collection=collection, **kwargs
)

return CollectionSerializer.db_to_stac(
Expand All @@ -860,7 +862,7 @@ async def delete_collection(self, collection_id: str, **kwargs) -> None:
Raises:
NotFoundError: If the collection doesn't exist
"""
await self.database.delete_collection(collection_id=collection_id)
await self.database.delete_collection(collection_id=collection_id, **kwargs)
return None


Expand Down Expand Up @@ -937,7 +939,7 @@ def bulk_item_insert(
success, errors = self.database.bulk_sync(
collection_id,
processed_items,
refresh=kwargs.get("refresh", False),
**kwargs,
)
if errors:
logger.error(f"Bulk sync operation encountered errors: {errors}")
Expand Down
27 changes: 27 additions & 0 deletions stac_fastapi/core/stac_fastapi/core/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,33 @@ def get_bool_env(name: str, default: bool = False) -> bool:
return default


def resolve_refresh(refresh: str) -> str:
"""
Resolve the `refresh` parameter from kwargs or the environment variable.

Args:
refresh (str): The `refresh` parameter value.

Returns:
str: The resolved value of the `refresh` parameter, which can be "true", "false", or "wait_for".

Raises:
ValueError: If the `refresh` value is not one of "true", "false", or "wait_for".
"""
logger = logging.getLogger(__name__)

# Normalize and validate the `refresh` value
refresh = refresh.lower()
if refresh not in {"true", "false", "wait_for"}:
raise ValueError(
"Invalid value for `refresh`. Must be 'true', 'false', or 'wait_for'."
)

# Log the resolved value
logger.info(f"`refresh` parameter resolved to: {refresh}")
return refresh


def bbox2polygon(b0: float, b1: float, b2: float, b3: float) -> List[List[List[float]]]:
"""Transform a bounding box represented by its four coordinates `b0`, `b1`, `b2`, and `b3` into a polygon.

Expand Down
38 changes: 37 additions & 1 deletion stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import os
import ssl
from typing import Any, Dict, Set
from typing import Any, Dict, Set, Union

import certifi
from elasticsearch._async.client import AsyncElasticsearch
Expand Down Expand Up @@ -88,6 +88,24 @@ class ElasticsearchSettings(ApiSettings, ApiBaseSettings):
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)

@property
def database_refresh(self) -> Union[bool, str]:
"""
Get the value of the DATABASE_REFRESH environment variable.

Returns:
Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for".
"""
value = os.getenv("DATABASE_REFRESH", "false").lower()
if value in {"true", "false"}:
return value == "true"
elif value == "wait_for":
return "wait_for"
else:
raise ValueError(
"Invalid value for DATABASE_REFRESH. Must be 'true', 'false', or 'wait_for'."
)

@property
def create_client(self):
"""Create es client."""
Expand All @@ -109,6 +127,24 @@ class AsyncElasticsearchSettings(ApiSettings, ApiBaseSettings):
enable_direct_response: bool = get_bool_env("ENABLE_DIRECT_RESPONSE", default=False)
raise_on_bulk_error: bool = get_bool_env("RAISE_ON_BULK_ERROR", default=False)

@property
def database_refresh(self) -> Union[bool, str]:
"""
Get the value of the DATABASE_REFRESH environment variable.

Returns:
Union[bool, str]: The value of DATABASE_REFRESH, which can be True, False, or "wait_for".
"""
value = os.getenv("DATABASE_REFRESH", "false").lower()
if value in {"true", "false"}:
return value == "true"
elif value == "wait_for":
return "wait_for"
else:
raise ValueError(
"Invalid value for DATABASE_REFRESH. Must be 'true', 'false', or 'wait_for'."
)

@property
def create_client(self):
"""Create async elasticsearch client."""
Expand Down
Loading