Skip to content

Commit 11e40c4

Browse files
author
Grzegorz Pustulka
committed
Add index management system with datetime partitioning
1 parent b3dabfe commit 11e40c4

File tree

31 files changed

+2694
-130
lines changed

31 files changed

+2694
-130
lines changed

CHANGELOG.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,30 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1616

1717
- Added the ability to authenticate with OpenSearch/ElasticSearch with SSL disabled [#388](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/388)
1818

19+
### Added
20+
- Added comprehensive index management system with dynamic selection and insertion strategies for improved performance and scalability [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405)
21+
- Added `ENABLE_DATETIME_INDEX_FILTERING` environment variable to enable datetime-based index selection using collection IDs. Requires indexes in format: `STAC_ITEMS_INDEX_PREFIX_collection-id_start_year-start_month-start_day-end_year-end_month-end_day`, e.g. `items_sentinel-2-l2a_2025-06-06-2025-09-22`. Default is `false`. [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405)
22+
- Added `DATETIME_INDEX_MAX_SIZE_GB` environment variable to set maximum size limit in GB for datetime-based indexes. When an index exceeds this size, a new time-partitioned index will be created. Default is `25` GB. Only applies when `ENABLE_DATETIME_INDEX_FILTERING` is enabled. [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405)
23+
- Added search engine adapter system with support for both Elasticsearch and OpenSearch [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405):
24+
- `SearchEngineAdapter` base class with engine-specific implementations
25+
- `ElasticsearchAdapter` and `OpenSearchAdapter` with tailored index creation methods
26+
- Automatic engine type detection based on client class
27+
- `SearchEngineAdapterFactory` for creating appropriate adapters
28+
- Added datetime-based index selection strategies with caching support [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405):
29+
- `AsyncDatetimeBasedIndexSelector` and `SyncDatetimeBasedIndexSelector` for temporal filtering
30+
- `IndexCacheManager` with configurable TTL-based cache expiration (default 1 hour)
31+
- `AsyncIndexAliasLoader` and `SyncIndexAliasLoader` for alias management
32+
- `UnfilteredIndexSelector` as fallback for returning all available indexes
33+
- Added index insertion strategies with automatic partitioning [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405):
34+
- Simple insertion strategy (`AsyncSimpleIndexInserter`, `SyncSimpleIndexInserter`) for traditional single-index-per-collection approach
35+
- Datetime-based insertion strategy (`AsyncDatetimeIndexInserter`, `SyncDatetimeIndexInserter`) with time-based partitioning
36+
- Automatic index size monitoring and splitting when limits exceeded
37+
- Handling of chronologically early data and bulk operations
38+
- Added index management utilities [#405](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/405):
39+
- `IndexSizeManager` for size monitoring and overflow handling
40+
- `DatetimeIndexManager` for datetime-based index operations
41+
- Factory patterns (`IndexInsertionFactory`, `IndexSelectorFactory`) for strategy creation based on configuration
42+
1943
## [v5.0.0a0] - 2025-05-29
2044

2145
### Added

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,10 @@ test-opensearch:
7575

7676
.PHONY: test
7777
test:
78-
-$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest --cov=stac_fastapi --cov-report=term-missing'
78+
-$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest -s --cov=stac_fastapi --cov-report=term-missing'
7979
docker compose down
8080

81-
-$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest --cov=stac_fastapi --cov-report=term-missing'
81+
-$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest -s --cov=stac_fastapi --cov-report=term-missing'
8282
docker compose down
8383

8484
.PHONY: run-database-es

README.md

Lines changed: 26 additions & 23 deletions
Large diffs are not rendered by default.

compose.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ services:
2121
- ES_USE_SSL=false
2222
- ES_VERIFY_CERTS=false
2323
- BACKEND=elasticsearch
24+
- ENABLE_DATETIME_INDEX_FILTERING=true
2425
ports:
2526
- "8080:8080"
2627
volumes:
@@ -55,6 +56,7 @@ services:
5556
- ES_VERIFY_CERTS=false
5657
- BACKEND=opensearch
5758
- STAC_FASTAPI_RATE_LIMIT=200/minute
59+
- ENABLE_DATETIME_INDEX_FILTERING=true
5860
ports:
5961
- "8082:8082"
6062
volumes:
@@ -69,9 +71,11 @@ services:
6971
elasticsearch:
7072
container_name: es-container
7173
image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.11.0}
74+
platform: linux/amd64
7275
hostname: elasticsearch
7376
environment:
7477
ES_JAVA_OPTS: -Xms512m -Xmx1g
78+
action.destructive_requires_name: false
7579
volumes:
7680
- ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
7781
- ./elasticsearch/snapshots:/usr/share/elasticsearch/snapshots
@@ -81,11 +85,13 @@ services:
8185
opensearch:
8286
container_name: os-container
8387
image: opensearchproject/opensearch:${OPENSEARCH_VERSION:-2.11.1}
88+
platform: linux/amd64
8489
hostname: opensearch
8590
environment:
8691
- discovery.type=single-node
8792
- plugins.security.disabled=true
8893
- OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m
94+
- action.destructive_requires_name=false
8995
volumes:
9096
- ./opensearch/config/opensearch.yml:/usr/share/opensearch/config/opensearch.yml
9197
- ./opensearch/snapshots:/usr/share/opensearch/snapshots

stac_fastapi/core/stac_fastapi/core/core.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
BulkTransactionMethod,
3232
Items,
3333
)
34+
from stac_fastapi.sfeos_helpers.database import return_date
3435
from stac_fastapi.types import stac as stac_types
3536
from stac_fastapi.types.conformance import BASE_CONFORMANCE_CLASSES
3637
from stac_fastapi.types.core import AsyncBaseCoreClient, AsyncBaseTransactionsClient
@@ -315,9 +316,10 @@ async def item_collection(
315316
search=search, collection_ids=[collection_id]
316317
)
317318

319+
datetime_search = return_date(datetime)
318320
if datetime:
319321
search = self.database.apply_datetime_filter(
320-
search=search, interval=datetime
322+
search=search, datetime_search=datetime_search
321323
)
322324

323325
if bbox:
@@ -333,6 +335,7 @@ async def item_collection(
333335
sort=None,
334336
token=token,
335337
collection_ids=[collection_id],
338+
datetime_search=datetime_search,
336339
)
337340

338341
items = [
@@ -491,9 +494,10 @@ async def post_search(
491494
search=search, collection_ids=search_request.collections
492495
)
493496

497+
datetime_search = return_date(search_request.datetime)
494498
if search_request.datetime:
495499
search = self.database.apply_datetime_filter(
496-
search=search, interval=search_request.datetime
500+
search=search, datetime_search=datetime_search
497501
)
498502

499503
if search_request.bbox:
@@ -551,6 +555,7 @@ async def post_search(
551555
token=search_request.token,
552556
sort=sort,
553557
collection_ids=search_request.collections,
558+
datetime_search=datetime_search,
554559
)
555560

556561
fields = (

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import logging
66
from base64 import urlsafe_b64decode, urlsafe_b64encode
77
from copy import deepcopy
8-
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union
8+
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type
99

1010
import attr
1111
import elasticsearch.helpers as helpers
@@ -29,11 +29,8 @@
2929
get_queryables_mapping_shared,
3030
index_alias_by_collection_id,
3131
index_by_collection_id,
32-
indices,
33-
mk_actions,
3432
mk_item_id,
3533
populate_sort_shared,
36-
return_date,
3734
validate_refresh,
3835
)
3936
from stac_fastapi.sfeos_helpers.mappings import (
@@ -44,8 +41,13 @@
4441
ITEMS_INDEX_PREFIX,
4542
Geometry,
4643
)
44+
from stac_fastapi.sfeos_helpers.search_engine import (
45+
BaseIndexInserter,
46+
IndexInsertionFactory,
47+
IndexSelectionStrategy,
48+
IndexSelectorFactory,
49+
)
4750
from stac_fastapi.types.errors import ConflictError, NotFoundError
48-
from stac_fastapi.types.rfc3339 import DateTimeType
4951
from stac_fastapi.types.stac import Collection, Item
5052

5153
logger = logging.getLogger(__name__)
@@ -123,6 +125,10 @@ class DatabaseLogic(BaseDatabaseLogic):
123125
sync_settings: SyncElasticsearchSettings = attr.ib(
124126
factory=SyncElasticsearchSettings
125127
)
128+
async_index_selector: IndexSelectionStrategy = attr.ib(init=False)
129+
sync_index_selector: IndexSelectionStrategy = attr.ib(init=False)
130+
async_index_inserter: BaseIndexInserter = attr.ib(init=False)
131+
sync_index_inserter: BaseIndexInserter = attr.ib(init=False)
126132

127133
client = attr.ib(init=False)
128134
sync_client = attr.ib(init=False)
@@ -131,6 +137,18 @@ def __attrs_post_init__(self):
131137
"""Initialize clients after the class is instantiated."""
132138
self.client = self.async_settings.create_client
133139
self.sync_client = self.sync_settings.create_client
140+
self.async_index_inserter = (
141+
IndexInsertionFactory.create_async_insertion_strategy(self.client)
142+
)
143+
self.sync_index_inserter = IndexInsertionFactory.create_sync_insertion_strategy(
144+
self.sync_client
145+
)
146+
self.async_index_selector = IndexSelectorFactory.create_async_selector(
147+
self.client
148+
)
149+
self.sync_index_selector = IndexSelectorFactory.create_sync_selector(
150+
self.sync_client
151+
)
134152

135153
item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer)
136154
collection_serializer: Type[CollectionSerializer] = attr.ib(
@@ -244,19 +262,18 @@ def apply_collections_filter(search: Search, collection_ids: List[str]):
244262

245263
@staticmethod
246264
def apply_datetime_filter(
247-
search: Search, interval: Optional[Union[DateTimeType, str]]
265+
search: Search, datetime_search: Dict[str, Optional[str]]
248266
):
249267
"""Apply a filter to search on datetime, start_datetime, and end_datetime fields.
250268
251269
Args:
252270
search (Search): The search object to filter.
253-
interval: Optional[Union[DateTimeType, str]]
271+
datetime_search: Dict[str, Optional[str]]
254272
255273
Returns:
256274
Search: The filtered search object.
257275
"""
258276
should = []
259-
datetime_search = return_date(interval)
260277

261278
# If the request is a single datetime return
262279
# items with datetimes equal to the requested datetime OR
@@ -501,6 +518,7 @@ async def execute_search(
501518
token: Optional[str],
502519
sort: Optional[Dict[str, Dict[str, str]]],
503520
collection_ids: Optional[List[str]],
521+
datetime_search: Dict[str, Optional[str]],
504522
ignore_unavailable: bool = True,
505523
) -> Tuple[Iterable[Dict[str, Any]], Optional[int], Optional[str]]:
506524
"""Execute a search query with limit and other optional parameters.
@@ -511,6 +529,7 @@ async def execute_search(
511529
token (Optional[str]): The token used to return the next set of results.
512530
sort (Optional[Dict[str, Dict[str, str]]]): Specifies how the results should be sorted.
513531
collection_ids (Optional[List[str]]): The collection ids to search.
532+
datetime_search (Dict[str, Optional[str]]): Datetime range used for index selection.
514533
ignore_unavailable (bool, optional): Whether to ignore unavailable collections. Defaults to True.
515534
516535
Returns:
@@ -531,7 +550,9 @@ async def execute_search(
531550

532551
query = search.query.to_dict() if search.query else None
533552

534-
index_param = indices(collection_ids)
553+
index_param = await self.async_index_selector.select_indexes(
554+
collection_ids, datetime_search
555+
)
535556

536557
max_result_window = MAX_LIMIT
537558

@@ -595,6 +616,7 @@ async def aggregate(
595616
geometry_geohash_grid_precision: int,
596617
geometry_geotile_grid_precision: int,
597618
datetime_frequency_interval: str,
619+
datetime_search,
598620
ignore_unavailable: Optional[bool] = True,
599621
):
600622
"""Return aggregations of STAC Items."""
@@ -630,7 +652,10 @@ def _fill_aggregation_parameters(name: str, agg: dict) -> dict:
630652
if k in aggregations
631653
}
632654

633-
index_param = indices(collection_ids)
655+
index_param = await self.async_index_selector.select_indexes(
656+
collection_ids, datetime_search
657+
)
658+
634659
search_task = asyncio.create_task(
635660
self.client.search(
636661
index=index_param,
@@ -828,9 +853,12 @@ async def create_item(
828853
item=item, base_url=base_url, exist_ok=exist_ok
829854
)
830855

856+
target_index = await self.async_index_inserter.get_target_index(
857+
collection_id, item
858+
)
831859
# Index the item in the database
832860
await self.client.index(
833-
index=index_alias_by_collection_id(collection_id),
861+
index=target_index,
834862
id=mk_item_id(item_id, collection_id),
835863
document=item,
836864
refresh=refresh,
@@ -866,9 +894,9 @@ async def delete_item(self, item_id: str, collection_id: str, **kwargs: Any):
866894

867895
try:
868896
# Perform the delete operation
869-
await self.client.delete(
897+
await self.client.delete_by_query(
870898
index=index_alias_by_collection_id(collection_id),
871-
id=mk_item_id(item_id, collection_id),
899+
body={"query": {"term": {"_id": mk_item_id(item_id, collection_id)}}},
872900
refresh=refresh,
873901
)
874902
except ESNotFoundError:
@@ -937,8 +965,10 @@ async def create_collection(self, collection: Collection, **kwargs: Any):
937965
refresh=refresh,
938966
)
939967

940-
# Create the item index for the collection
941-
await create_item_index(collection_id)
968+
if self.async_index_inserter.should_create_collection_index():
969+
await self.async_index_inserter.create_simple_index(
970+
self.client, collection_id
971+
)
942972

943973
async def find_collection(self, collection_id: str) -> Collection:
944974
"""Find and return a collection from the database.
@@ -1136,9 +1166,12 @@ async def bulk_async(
11361166

11371167
# Perform the bulk insert
11381168
raise_on_error = self.async_settings.raise_on_bulk_error
1169+
actions = await self.async_index_inserter.prepare_bulk_actions(
1170+
collection_id, processed_items
1171+
)
11391172
success, errors = await helpers.async_bulk(
11401173
self.client,
1141-
mk_actions(collection_id, processed_items),
1174+
actions,
11421175
refresh=refresh,
11431176
raise_on_error=raise_on_error,
11441177
)
@@ -1202,9 +1235,12 @@ def bulk_sync(
12021235

12031236
# Perform the bulk insert
12041237
raise_on_error = self.sync_settings.raise_on_bulk_error
1238+
actions = self.sync_index_inserter.prepare_bulk_actions(
1239+
collection_id, processed_items
1240+
)
12051241
success, errors = helpers.bulk(
12061242
self.sync_client,
1207-
mk_actions(collection_id, processed_items),
1243+
actions,
12081244
refresh=refresh,
12091245
raise_on_error=raise_on_error,
12101246
)

0 commit comments

Comments
 (0)