Skip to content

Bulk insert upgrade #356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cicd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:

strategy:
matrix:
python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]
python-version: [ "3.9", "3.10", "3.11", "3.12", "3.13"]
backend: [ "elasticsearch7", "elasticsearch8", "opensearch"]

name: Python ${{ matrix.python-version }} testing with ${{ matrix.backend }}
Expand Down
18 changes: 17 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,28 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Fixed
- Fixed inheritance relating to BaseDatabaseSettings and ApiBaseSettings [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355)
- Fixed delete_item and delete_collection methods return types [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355)
- Bulk operations now properly raise errors instead of failing silently [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355)
- Added BulkInsertError for detailed error reporting [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355)
- Fixed unsafe error suppression in OpenSearch bulk operations [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355)

### Changed
- Bulk methods now return (success_count, error_list) tuples [#355](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/355)


## [v4.0.0]

### Added
- Added support for dynamically-generated queryables based on Elasticsearch/OpenSearch mappings, with extensible metadata augmentation [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351)
- Included default queryables configuration for seamless integration. [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351)

### Changed
- Refactored database logic to reduce duplication [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351)
- Replaced `fastapi-slim` with `fastapi` dependency [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351)
- Changed minimum Python version to 3.9 [#354](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/354)
- Updated stac-fastapi api, types, and extensions libraries to 5.1.1 and made various associated changes [#354](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/354)

### Fixed
- Improved performance of `mk_actions` and `filter-links` methods [#351](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/351)
Expand Down Expand Up @@ -314,7 +329,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Use genexp in execute_search and get_all_collections to return results.
- Added db_to_stac serializer to item_collection method in core.py.

[Unreleased]: https://github.com/stac-utils/stac-fastapi-elasticsearch/tree/v3.2.5...main
[Unreleased]: https://github.com/stac-utils/stac-fastapi-elasticsearch/tree/v4.0.0...main
[v4.0.0]: https://github.com/stac-utils/stac-fastapi-elasticsearch/tree/v3.2.4...v4.0.0
[v3.2.5]: https://github.com/stac-utils/stac-fastapi-elasticsearch/tree/v3.2.4...v3.2.5
[v3.2.4]: https://github.com/stac-utils/stac-fastapi-elasticsearch/tree/v3.2.3...v3.2.4
[v3.2.3]: https://github.com/stac-utils/stac-fastapi-elasticsearch/tree/v3.2.2...v3.2.3
Expand Down
22 changes: 11 additions & 11 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ OS_APP_PORT ?= 8082
OS_HOST ?= docker.for.mac.localhost
OS_PORT ?= 9202

run_es = docker-compose \
run_es = docker compose \
run \
-p ${EXTERNAL_APP_PORT}:${ES_APP_PORT} \
-e PY_IGNORE_IMPORTMISMATCH=1 \
-e APP_HOST=${APP_HOST} \
-e APP_PORT=${ES_APP_PORT} \
app-elasticsearch

run_os = docker-compose \
run_os = docker compose \
run \
-p ${EXTERNAL_APP_PORT}:${OS_APP_PORT} \
-e PY_IGNORE_IMPORTMISMATCH=1 \
Expand All @@ -45,7 +45,7 @@ run-deploy-locally:

.PHONY: image-dev
image-dev:
docker-compose build
docker compose build

.PHONY: docker-run-es
docker-run-es: image-dev
Expand All @@ -66,28 +66,28 @@ docker-shell-os:
.PHONY: test-elasticsearch
test-elasticsearch:
-$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest'
docker-compose down
docker compose down

.PHONY: test-opensearch
test-opensearch:
-$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest'
docker-compose down
docker compose down

.PHONY: test
test:
-$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest'
docker-compose down
docker compose down

-$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest'
docker-compose down
docker compose down

.PHONY: run-database-es
run-database-es:
docker-compose run --rm elasticsearch
docker compose run --rm elasticsearch

.PHONY: run-database-os
run-database-os:
docker-compose run --rm opensearch
docker compose run --rm opensearch

.PHONY: pybase-install
pybase-install:
Expand All @@ -107,10 +107,10 @@ install-os: pybase-install

.PHONY: docs-image
docs-image:
docker-compose -f docker-compose.docs.yml \
docker compose -f docker compose.docs.yml \
build

.PHONY: docs
docs: docs-image
docker-compose -f docker-compose.docs.yml \
docker compose -f docker compose.docs.yml \
run docs
2 changes: 0 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: '3.9'

services:
app-elasticsearch:
container_name: stac-fastapi-es
Expand Down
8 changes: 4 additions & 4 deletions stac_fastapi/core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
"fastapi",
"attrs>=23.2.0",
"pydantic",
"stac_pydantic>=3",
"stac-fastapi.types==3.0.0",
"stac-fastapi.api==3.0.0",
"stac-fastapi.extensions==3.0.0",
"stac_pydantic==3.1.*",
"stac-fastapi.api==5.1.1",
"stac-fastapi.extensions==5.1.1",
"stac-fastapi.types==5.1.1",
"orjson",
"overrides",
"geojson-pydantic",
Expand Down
89 changes: 41 additions & 48 deletions stac_fastapi/core/stac_fastapi/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ async def item_collection(
self,
collection_id: str,
bbox: Optional[BBox] = None,
datetime: Optional[DateTimeType] = None,
datetime: Optional[str] = None,
limit: Optional[int] = 10,
token: Optional[str] = None,
**kwargs,
Expand All @@ -287,7 +287,7 @@ async def item_collection(
Args:
collection_id (str): The identifier of the collection to read items from.
bbox (Optional[BBox]): The bounding box to filter items by.
datetime (Optional[DateTimeType]): The datetime range to filter items by.
datetime (Optional[str]): The datetime range to filter items by.
limit (int): The maximum number of items to return. The default value is 10.
token (str): A token used for pagination.
request (Request): The incoming request.
Expand Down Expand Up @@ -426,39 +426,36 @@ def _return_date(

return result

def _format_datetime_range(self, date_tuple: DateTimeType) -> str:
def _format_datetime_range(self, date_str: str) -> str:
"""
Convert a tuple of datetime objects or None into a formatted string for API requests.
Convert a datetime range into a formatted string.

Args:
date_tuple (tuple): A tuple containing two elements, each can be a datetime object or None.
date_tuple (str): A string containing two datetime values separated by a '/'.

Returns:
str: A string formatted as 'YYYY-MM-DDTHH:MM:SS.sssZ/YYYY-MM-DDTHH:MM:SS.sssZ', with '..' used if any element is None.
"""

def format_datetime(dt):
"""Format a single datetime object to the ISO8601 extended format with 'Z'."""
return dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" if dt else ".."

start, end = date_tuple
return f"{format_datetime(start)}/{format_datetime(end)}"
start, end = date_str.split("/")
start = start.replace("+01:00", "Z") if start else ".."
end = end.replace("+01:00", "Z") if end else ".."
return f"{start}/{end}"

async def get_search(
self,
request: Request,
collections: Optional[List[str]] = None,
ids: Optional[List[str]] = None,
bbox: Optional[BBox] = None,
datetime: Optional[DateTimeType] = None,
datetime: Optional[str] = None,
limit: Optional[int] = 10,
query: Optional[str] = None,
token: Optional[str] = None,
fields: Optional[List[str]] = None,
sortby: Optional[str] = None,
q: Optional[List[str]] = None,
intersects: Optional[str] = None,
filter: Optional[str] = None,
filter_expr: Optional[str] = None,
filter_lang: Optional[str] = None,
**kwargs,
) -> stac_types.ItemCollection:
Expand All @@ -468,7 +465,7 @@ async def get_search(
collections (Optional[List[str]]): List of collection IDs to search in.
ids (Optional[List[str]]): List of item IDs to search for.
bbox (Optional[BBox]): Bounding box to search in.
datetime (Optional[DateTimeType]): Filter items based on the datetime field.
datetime (Optional[str]): Filter items based on the datetime field.
limit (Optional[int]): Maximum number of results to return.
query (Optional[str]): Query string to filter the results.
token (Optional[str]): Access token to use when searching the catalog.
Expand All @@ -495,7 +492,7 @@ async def get_search(
}

if datetime:
base_args["datetime"] = self._format_datetime_range(datetime)
base_args["datetime"] = self._format_datetime_range(date_str=datetime)

if intersects:
base_args["intersects"] = orjson.loads(unquote_plus(intersects))
Expand All @@ -506,12 +503,12 @@ async def get_search(
for sort in sortby
]

if filter:
base_args["filter-lang"] = "cql2-json"
if filter_expr:
base_args["filter_lang"] = "cql2-json"
base_args["filter"] = orjson.loads(
unquote_plus(filter)
unquote_plus(filter_expr)
if filter_lang == "cql2-json"
else to_cql2(parse_cql2_text(filter))
else to_cql2(parse_cql2_text(filter_expr))
)

if fields:
Expand Down Expand Up @@ -593,8 +590,8 @@ async def post_search(
)

# only cql2_json is supported here
if hasattr(search_request, "filter"):
cql2_filter = getattr(search_request, "filter", None)
if hasattr(search_request, "filter_expr"):
cql2_filter = getattr(search_request, "filter_expr", None)
try:
search = self.database.apply_cql2_filter(search, cql2_filter)
except Exception as e:
Expand Down Expand Up @@ -734,17 +731,15 @@ async def update_item(
return ItemSerializer.db_to_stac(item, base_url)

@overrides
async def delete_item(
self, item_id: str, collection_id: str, **kwargs
) -> Optional[stac_types.Item]:
async def delete_item(self, item_id: str, collection_id: str, **kwargs) -> None:
"""Delete an item from a collection.

Args:
item_id (str): The identifier of the item to delete.
collection_id (str): The identifier of the collection that contains the item.

Returns:
Optional[stac_types.Item]: The deleted item, or `None` if the item was successfully deleted.
None: Returns 204 No Content on successful deletion
"""
await self.database.delete_item(item_id=item_id, collection_id=collection_id)
return None
Expand Down Expand Up @@ -814,23 +809,20 @@ async def update_collection(
)

@overrides
async def delete_collection(
self, collection_id: str, **kwargs
) -> Optional[stac_types.Collection]:
async def delete_collection(self, collection_id: str, **kwargs) -> None:
"""
Delete a collection.

This method deletes an existing collection in the database.

Args:
collection_id (str): The identifier of the collection that contains the item.
kwargs: Additional keyword arguments.
collection_id (str): The identifier of the collection to delete

Returns:
None.
None: Returns 204 No Content on successful deletion

Raises:
NotFoundError: If the collection doesn't exist.
NotFoundError: If the collection doesn't exist
"""
await self.database.delete_collection(collection_id=collection_id)
return None
Expand Down Expand Up @@ -875,35 +867,36 @@ def preprocess_item(
def bulk_item_insert(
self, items: Items, chunk_size: Optional[int] = None, **kwargs
) -> str:
"""Perform a bulk insertion of items into the database using Elasticsearch.
"""Perform bulk insertion of items.

Args:
items: The items to insert.
chunk_size: The size of each chunk for bulk processing.
**kwargs: Additional keyword arguments, such as `request` and `refresh`.
items: The items to insert
chunk_size: Chunk size for bulk processing
**kwargs: Additional keyword arguments

Returns:
A string indicating the number of items successfully added.
str: Message indicating number of items successfully added

Raises:
BulkInsertError: If any items fail insertion
"""
request = kwargs.get("request")
if request:
base_url = str(request.base_url)
else:
base_url = ""
base_url = str(request.base_url) if request else ""

processed_items = [
self.preprocess_item(item, base_url, items.method)
for item in items.items.values()
]

# not a great way to get the collection_id-- should be part of the method signature
collection_id = processed_items[0]["collection"]
collection_id = processed_items[0]["collection"] if processed_items else ""

self.database.bulk_sync(
collection_id, processed_items, refresh=kwargs.get("refresh", False)
success_count, errors = self.database.bulk_sync(
collection_id,
processed_items,
refresh=kwargs.get("refresh", False),
raise_errors=True,
)

return f"Successfully added {len(processed_items)} Items."
return "sucessfully added {} items".format(success_count)


_DEFAULT_QUERYABLES: Dict[str, Dict[str, Any]] = {
Expand Down
39 changes: 39 additions & 0 deletions stac_fastapi/core/stac_fastapi/core/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Core exceptions module for STAC FastAPI application.

This module contains custom exception classes to handle specific error conditions in a structured way.
"""


class BulkInsertError(Exception):
"""Exception raised for bulk insert operation failures.

Attributes:
success_count (int): Number of successfully inserted items
errors (List[Dict]): Detailed error information for failed operations
failure_count (int): Derived count of failed operations

Notes:
Raised by bulk_async/bulk_sync methods when raise_errors=True
and any operations fail during bulk insertion.
"""

def __init__(self, message, success_count, errors):
"""Initialize BulkInsertError instance with operation details.

Args:
message (str): Human-readable error description
success_count (int): Number of successfully processed items
errors (List[Dict]): List of error dictionaries from bulk operation
"""
super().__init__(message)
self.success_count = success_count
self.errors = errors
self.failure_count = len(errors)

def __str__(self) -> str:
"""Return enhanced string representation with operation metrics.

Returns:
str: Formatted string containing base message with success/failure counts
"""
return f"{super().__str__()} (Success: {self.success_count}, Failures: {self.failure_count})"
Loading
Loading