Skip to content

Commit 14ec996

Browse files
author
Grzegorz Pustulka
committed
Add index management system with datetime partitioning
1 parent a0b77cb commit 14ec996

File tree

31 files changed

+2694
-154
lines changed

31 files changed

+2694
-154
lines changed

CHANGELOG.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,28 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1111
### Added
1212

1313
- Added the ability to set timeout for Opensearch and Elasticsearch clients by setting the environmental variable `ES_TIMEOUT` [#408](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/408)
14+
- Added comprehensive index management system with dynamic selection and insertion strategies for improved performance and scalability [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405)
15+
- Added `ENABLE_DATETIME_INDEX_FILTERING` environment variable to enable datetime-based index selection using collection IDs. Requires indexes in format: `STAC_ITEMS_INDEX_PREFIX_collection-id_start_year-start_month-start_day-end_year-end_month-end_day`, e.g. `items_sentinel-2-l2a_2025-06-06-2025-09-22`. Default is `false`. [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405)
16+
- Added `DATETIME_INDEX_MAX_SIZE_GB` environment variable to set maximum size limit in GB for datetime-based indexes. When an index exceeds this size, a new time-partitioned index will be created. Default is `25` GB. Only applies when `ENABLE_DATETIME_INDEX_FILTERING` is enabled. [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405)
17+
- Added search engine adapter system with support for both Elasticsearch and OpenSearch [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405):
18+
- `SearchEngineAdapter` base class with engine-specific implementations
19+
- `ElasticsearchAdapter` and `OpenSearchAdapter` with tailored index creation methods
20+
- Automatic engine type detection based on client class
21+
- `SearchEngineAdapterFactory` for creating appropriate adapters
22+
- Added datetime-based index selection strategies with caching support [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405):
23+
- `AsyncDatetimeBasedIndexSelector` and `SyncDatetimeBasedIndexSelector` for temporal filtering
24+
- `IndexCacheManager` with configurable TTL-based cache expiration (default 1 hour)
25+
- `AsyncIndexAliasLoader` and `SyncIndexAliasLoader` for alias management
26+
- `UnfilteredIndexSelector` as fallback for returning all available indexes
27+
- Added index insertion strategies with automatic partitioning [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405):
28+
- Simple insertion strategy (`AsyncSimpleIndexInserter`, `SyncSimpleIndexInserter`) for traditional single-index-per-collection approach
29+
- Datetime-based insertion strategy (`AsyncDatetimeIndexInserter`, `SyncDatetimeIndexInserter`) with time-based partitioning
30+
- Automatic index size monitoring and splitting when limits exceeded
31+
- Handling of chronologically early data and bulk operations
32+
- Added index management utilities [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405):
33+
- `IndexSizeManager` for size monitoring and overflow handling
34+
- `DatetimeIndexManager` for datetime-based index operations
35+
- Factory patterns (`IndexInsertionFactory`, `IndexSelectorFactory`) for strategy creation based on configuration
1436

1537
## [v6.0.0] - 2025-06-22
1638

@@ -22,6 +44,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
2244

2345
- Updated stac-fastapi parent libraries to v6.0.0 [#291](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/291)
2446

47+
2548
## [v5.0.0] - 2025-06-11
2649

2750
### Added

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ test-opensearch:
7373

7474
.PHONY: test
7575
test:
76-
-$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest --cov=stac_fastapi --cov-report=term-missing'
76+
-$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest -s --cov=stac_fastapi --cov-report=term-missing'
7777
docker compose down
7878

79-
-$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest --cov=stac_fastapi --cov-report=term-missing'
79+
-$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest -s --cov=stac_fastapi --cov-report=term-missing'
8080
docker compose down
8181

8282
.PHONY: run-database-es

README.md

Lines changed: 25 additions & 23 deletions
Large diffs are not rendered by default.

compose.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ services:
2121
- ES_USE_SSL=false
2222
- ES_VERIFY_CERTS=false
2323
- BACKEND=elasticsearch
24+
- ENABLE_DATETIME_INDEX_FILTERING=true
2425
ports:
2526
- "8080:8080"
2627
volumes:
@@ -55,6 +56,7 @@ services:
5556
- ES_VERIFY_CERTS=false
5657
- BACKEND=opensearch
5758
- STAC_FASTAPI_RATE_LIMIT=200/minute
59+
- ENABLE_DATETIME_INDEX_FILTERING=true
5860
ports:
5961
- "8082:8082"
6062
volumes:
@@ -69,9 +71,11 @@ services:
6971
elasticsearch:
7072
container_name: es-container
7173
image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.11.0}
74+
platform: linux/amd64
7275
hostname: elasticsearch
7376
environment:
7477
ES_JAVA_OPTS: -Xms512m -Xmx1g
78+
action.destructive_requires_name: false
7579
volumes:
7680
- ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
7781
- ./elasticsearch/snapshots:/usr/share/elasticsearch/snapshots
@@ -81,11 +85,13 @@ services:
8185
opensearch:
8286
container_name: os-container
8387
image: opensearchproject/opensearch:${OPENSEARCH_VERSION:-2.11.1}
88+
platform: linux/amd64
8489
hostname: opensearch
8590
environment:
8691
- discovery.type=single-node
8792
- plugins.security.disabled=true
8893
- OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m
94+
- action.destructive_requires_name=false
8995
volumes:
9096
- ./opensearch/config/opensearch.yml:/usr/share/opensearch/config/opensearch.yml
9197
- ./opensearch/snapshots:/usr/share/opensearch/snapshots

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
BulkTransactionMethod,
3838
Items,
3939
)
40+
from stac_fastapi.sfeos_helpers.database import return_date
4041
from stac_fastapi.types import stac as stac_types
4142
from stac_fastapi.types.conformance import BASE_CONFORMANCE_CLASSES
4243
from stac_fastapi.types.core import AsyncBaseCoreClient
@@ -324,10 +325,15 @@ async def item_collection(
324325
search=search, collection_ids=[collection_id]
325326
)
326327

327-
if datetime:
328+
try:
329+
datetime_search = return_date(datetime)
328330
search = self.database.apply_datetime_filter(
329-
search=search, interval=datetime
331+
search=search, datetime_search=datetime_search
330332
)
333+
except (ValueError, TypeError) as e:
334+
# Handle invalid interval formats if return_date fails
335+
logger.error(f"Invalid interval format: {datetime}, error: {e}")
336+
datetime_search = None
331337

332338
if bbox:
333339
bbox = [float(x) for x in bbox]
@@ -342,6 +348,7 @@ async def item_collection(
342348
sort=None,
343349
token=token,
344350
collection_ids=[collection_id],
351+
datetime_search=datetime_search,
345352
)
346353

347354
items = [
@@ -500,10 +507,15 @@ async def post_search(
500507
search=search, collection_ids=search_request.collections
501508
)
502509

503-
if search_request.datetime:
510+
try:
511+
datetime_search = return_date(search_request.datetime)
504512
search = self.database.apply_datetime_filter(
505-
search=search, interval=search_request.datetime
513+
search=search, datetime_search=datetime_search
506514
)
515+
except (ValueError, TypeError) as e:
516+
# Handle invalid interval formats if return_date fails
517+
logger.error(f"Invalid interval format: {search_request.datetime}, error: {e}")
518+
datetime_search = None
507519

508520
if search_request.bbox:
509521
bbox = search_request.bbox
@@ -560,6 +572,7 @@ async def post_search(
560572
token=search_request.token,
561573
sort=sort,
562574
collection_ids=search_request.collections,
575+
datetime_search=datetime_search,
563576
)
564577

565578
fields = (

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,8 @@
3636
get_queryables_mapping_shared,
3737
index_alias_by_collection_id,
3838
index_by_collection_id,
39-
indices,
40-
mk_actions,
4139
mk_item_id,
4240
populate_sort_shared,
43-
return_date,
4441
validate_refresh,
4542
)
4643
from stac_fastapi.sfeos_helpers.database.utils import (
@@ -55,9 +52,14 @@
5552
ITEMS_INDEX_PREFIX,
5653
Geometry,
5754
)
55+
from stac_fastapi.sfeos_helpers.search_engine import (
56+
BaseIndexInserter,
57+
IndexInsertionFactory,
58+
IndexSelectionStrategy,
59+
IndexSelectorFactory,
60+
)
5861
from stac_fastapi.types.errors import ConflictError, NotFoundError
5962
from stac_fastapi.types.links import resolve_links
60-
from stac_fastapi.types.rfc3339 import DateTimeType
6163
from stac_fastapi.types.stac import Collection, Item
6264

6365
logger = logging.getLogger(__name__)
@@ -135,6 +137,10 @@ class DatabaseLogic(BaseDatabaseLogic):
135137
sync_settings: SyncElasticsearchSettings = attr.ib(
136138
factory=SyncElasticsearchSettings
137139
)
140+
async_index_selector: IndexSelectionStrategy = attr.ib(init=False)
141+
sync_index_selector: IndexSelectionStrategy = attr.ib(init=False)
142+
async_index_inserter: BaseIndexInserter = attr.ib(init=False)
143+
sync_index_inserter: BaseIndexInserter = attr.ib(init=False)
138144

139145
client = attr.ib(init=False)
140146
sync_client = attr.ib(init=False)
@@ -143,6 +149,18 @@ def __attrs_post_init__(self):
143149
"""Initialize clients after the class is instantiated."""
144150
self.client = self.async_settings.create_client
145151
self.sync_client = self.sync_settings.create_client
152+
self.async_index_inserter = (
153+
IndexInsertionFactory.create_async_insertion_strategy(self.client)
154+
)
155+
self.sync_index_inserter = IndexInsertionFactory.create_sync_insertion_strategy(
156+
self.sync_client
157+
)
158+
self.async_index_selector = IndexSelectorFactory.create_async_selector(
159+
self.client
160+
)
161+
self.sync_index_selector = IndexSelectorFactory.create_sync_selector(
162+
self.sync_client
163+
)
146164

147165
item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer)
148166
collection_serializer: Type[CollectionSerializer] = attr.ib(
@@ -256,30 +274,18 @@ def apply_collections_filter(search: Search, collection_ids: List[str]):
256274

257275
@staticmethod
258276
def apply_datetime_filter(
259-
search: Search, interval: Optional[Union[DateTimeType, str]]
277+
search: Search, datetime_search: Dict[str, Optional[str]]
260278
) -> Search:
261279
"""Apply a filter to search on datetime, start_datetime, and end_datetime fields.
262280
263281
Args:
264282
search: The search object to filter.
265-
interval: Optional datetime interval to filter by. Can be:
266-
- A single datetime string (e.g., "2023-01-01T12:00:00")
267-
- A datetime range string (e.g., "2023-01-01/2023-12-31")
268-
- A datetime object
269-
- A tuple of (start_datetime, end_datetime)
283+
datetime_search: Dict[str, Optional[str]]
270284
271285
Returns:
272286
The filtered search object.
273287
"""
274-
if not interval:
275-
return search
276-
277-
should = []
278-
try:
279-
datetime_search = return_date(interval)
280-
except (ValueError, TypeError) as e:
281-
# Handle invalid interval formats if return_date fails
282-
logger.error(f"Invalid interval format: {interval}, error: {e}")
288+
if not datetime_search:
283289
return search
284290

285291
if "eq" in datetime_search:
@@ -489,6 +495,7 @@ async def execute_search(
489495
token: Optional[str],
490496
sort: Optional[Dict[str, Dict[str, str]]],
491497
collection_ids: Optional[List[str]],
498+
datetime_search: Dict[str, Optional[str]],
492499
ignore_unavailable: bool = True,
493500
) -> Tuple[Iterable[Dict[str, Any]], Optional[int], Optional[str]]:
494501
"""Execute a search query with limit and other optional parameters.
@@ -499,6 +506,7 @@ async def execute_search(
499506
token (Optional[str]): The token used to return the next set of results.
500507
sort (Optional[Dict[str, Dict[str, str]]]): Specifies how the results should be sorted.
501508
collection_ids (Optional[List[str]]): The collection ids to search.
509+
datetime_search (Dict[str, Optional[str]]): Datetime range used for index selection.
502510
ignore_unavailable (bool, optional): Whether to ignore unavailable collections. Defaults to True.
503511
504512
Returns:
@@ -519,7 +527,9 @@ async def execute_search(
519527

520528
query = search.query.to_dict() if search.query else None
521529

522-
index_param = indices(collection_ids)
530+
index_param = await self.async_index_selector.select_indexes(
531+
collection_ids, datetime_search
532+
)
523533

524534
max_result_window = MAX_LIMIT
525535

@@ -583,6 +593,7 @@ async def aggregate(
583593
geometry_geohash_grid_precision: int,
584594
geometry_geotile_grid_precision: int,
585595
datetime_frequency_interval: str,
596+
datetime_search,
586597
ignore_unavailable: Optional[bool] = True,
587598
):
588599
"""Return aggregations of STAC Items."""
@@ -618,7 +629,10 @@ def _fill_aggregation_parameters(name: str, agg: dict) -> dict:
618629
if k in aggregations
619630
}
620631

621-
index_param = indices(collection_ids)
632+
index_param = await self.async_index_selector.select_indexes(
633+
collection_ids, datetime_search
634+
)
635+
622636
search_task = asyncio.create_task(
623637
self.client.search(
624638
index=index_param,
@@ -816,9 +830,12 @@ async def create_item(
816830
item=item, base_url=base_url, exist_ok=exist_ok
817831
)
818832

833+
target_index = await self.async_index_inserter.get_target_index(
834+
collection_id, item
835+
)
819836
# Index the item in the database
820837
await self.client.index(
821-
index=index_alias_by_collection_id(collection_id),
838+
index=target_index,
822839
id=mk_item_id(item_id, collection_id),
823840
document=item,
824841
refresh=refresh,
@@ -983,9 +1000,9 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs: Any):
9831000

9841001
try:
9851002
# Perform the delete operation
986-
await self.client.delete(
1003+
await self.client.delete_by_query(
9871004
index=index_alias_by_collection_id(collection_id),
988-
id=mk_item_id(item_id, collection_id),
1005+
body={"query": {"term": {"_id": mk_item_id(item_id, collection_id)}}},
9891006
refresh=refresh,
9901007
)
9911008
except ESNotFoundError:
@@ -1085,8 +1102,10 @@ async def create_collection(self, collection: Collection, **kwargs: Any):
10851102
refresh=refresh,
10861103
)
10871104

1088-
# Create the item index for the collection
1089-
await create_item_index(collection_id)
1105+
if self.async_index_inserter.should_create_collection_index():
1106+
await self.async_index_inserter.create_simple_index(
1107+
self.client, collection_id
1108+
)
10901109

10911110
async def find_collection(self, collection_id: str) -> Collection:
10921111
"""Find and return a collection from the database.
@@ -1360,9 +1379,12 @@ async def bulk_async(
13601379

13611380
# Perform the bulk insert
13621381
raise_on_error = self.async_settings.raise_on_bulk_error
1382+
actions = await self.async_index_inserter.prepare_bulk_actions(
1383+
collection_id, processed_items
1384+
)
13631385
success, errors = await helpers.async_bulk(
13641386
self.client,
1365-
mk_actions(collection_id, processed_items),
1387+
actions,
13661388
refresh=refresh,
13671389
raise_on_error=raise_on_error,
13681390
)
@@ -1426,9 +1448,12 @@ def bulk_sync(
14261448

14271449
# Perform the bulk insert
14281450
raise_on_error = self.sync_settings.raise_on_bulk_error
1451+
actions = self.sync_index_inserter.prepare_bulk_actions(
1452+
collection_id, processed_items
1453+
)
14291454
success, errors = helpers.bulk(
14301455
self.sync_client,
1431-
mk_actions(collection_id, processed_items),
1456+
actions,
14321457
refresh=refresh,
14331458
raise_on_error=raise_on_error,
14341459
)

0 commit comments

Comments
 (0)