Skip to content

Commit 0c2d8e6

Browse files
author
Grzegorz Pustulka
committed
added fixes in insertion and selection strategies
1 parent 3cf4762 commit 0c2d8e6

File tree

8 files changed

+172
-50
lines changed

8 files changed

+172
-50
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,10 @@ test-opensearch:
7575

7676
.PHONY: test
7777
test:
78-
-$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest --cov=stac_fastapi --cov-report=term-missing'
78+
-$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest -s --cov=stac_fastapi --cov-report=term-missing'
7979
docker compose down
8080

81-
-$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest --cov=stac_fastapi --cov-report=term-missing'
81+
-$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest -s --cov=stac_fastapi --cov-report=term-missing'
8282
docker compose down
8383

8484
.PHONY: run-database-es

compose.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ services:
2121
- ES_USE_SSL=false
2222
- ES_VERIFY_CERTS=false
2323
- BACKEND=elasticsearch
24+
- ENABLE_DATETIME_INDEX_FILTERING=true
25+
- DATETIME_INDEX_MAX_SIZE_GB=0
26+
- DATETIME_INDEX_MAX_SIZE_GB=0.00002
2427
ports:
2528
- "8080:8080"
2629
volumes:
@@ -55,6 +58,8 @@ services:
5558
- ES_VERIFY_CERTS=false
5659
- BACKEND=opensearch
5760
- STAC_FASTAPI_RATE_LIMIT=200/minute
61+
- ENABLE_DATETIME_INDEX_FILTERING=true
62+
- DATETIME_INDEX_MAX_SIZE_GB=0.00002
5863
ports:
5964
- "8082:8082"
6065
volumes:

stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/__init__.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,16 @@
4242
)
4343
from .index_insertion_strategies import (
4444
AsyncIndexInserter,
45+
AsyncSimpleIndexInsertion,
46+
BaseIndexInserter,
47+
ElasticsearchAdapter,
4548
IndexInsertionFactory,
49+
OpenSearchAdapter,
50+
SearchEngineAdapter,
51+
SearchEngineAdapterFactory,
52+
SearchEngineType,
4653
SyncIndexInserter,
54+
SyncSimpleIndexInsertion,
4755
)
4856
from .index_selection_strategies import (
4957
AsyncDatetimeBasedIndexSelector,
@@ -76,8 +84,16 @@
7684
"AsyncDatetimeBasedIndexSelector",
7785
# Index insertion strategies
7886
"AsyncIndexInserter",
79-
"SyncIndexInserter",
87+
"AsyncSimpleIndexInsertion",
88+
"BaseIndexInserter",
89+
"ElasticsearchAdapter",
8090
"IndexInsertionFactory",
91+
"OpenSearchAdapter",
92+
"SearchEngineAdapter",
93+
"SearchEngineAdapterFactory",
94+
"SearchEngineType",
95+
"SyncIndexInserter",
96+
"SyncSimpleIndexInsertion",
8197
# Query operations
8298
"apply_free_text_filter_shared",
8399
"apply_intersects_filter_shared",
@@ -93,4 +109,4 @@
93109
"return_date",
94110
"extract_date",
95111
"extract_date_from_index",
96-
]
112+
]

stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/datetime.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def extract_date(date_str: str) -> date:
7272
Returns:
7373
A date object extracted from the input string.
7474
"""
75+
date_str = date_str.replace('Z', '+00:00')
7576
return datetime_type.fromisoformat(date_str).date()
7677

7778

stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/index_insertion_strategies.py

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@ def create_datetime_index_sync(
5454
) -> str:
5555
pass
5656

57-
async def create_index_alias(self, client: Any, collection_id: str, end_date: str):
57+
async def update_index_alias(self, client: Any, collection_id: str, end_date: str):
5858
index = index_alias_by_collection_id(collection_id)
5959
await client.indices.put_alias(
6060
index=index, name=self.alias_by_index_and_end_date(index, end_date)
6161
)
6262

63-
def create_index_alias_sync(
63+
def update_index_alias_sync(
6464
self, sync_client: Any, collection_id: str, end_date: str
6565
):
6666
index = index_alias_by_collection_id(collection_id)
@@ -91,11 +91,12 @@ async def create_datetime_index(
9191
self, client: Any, collection_id: str, start_date: str
9292
) -> str:
9393
index_name = self.index_by_collection_id_and_date(collection_id, start_date)
94+
alias_name = index_name.removeprefix(ITEMS_INDEX_PREFIX)
9495
await client.options(ignore_status=400).indices.create(
9596
index=index_name,
96-
body={"aliases": {index_alias_by_collection_id(collection_id): {}}},
97+
body={"aliases": {index_alias_by_collection_id(collection_id): {}, alias_name: {}}},
9798
)
98-
return index_name
99+
return alias_name
99100

100101
def create_simple_index_sync(self, sync_client: Any, collection_id: str) -> str:
101102
index_name = f"{index_by_collection_id(collection_id)}-000001"
@@ -109,11 +110,12 @@ def create_datetime_index_sync(
109110
self, sync_client: Any, collection_id: str, start_date: str
110111
) -> str:
111112
index_name = self.index_by_collection_id_and_date(collection_id, start_date)
113+
alias_name = index_name.removeprefix(ITEMS_INDEX_PREFIX)
112114
sync_client.options(ignore_status=400).indices.create(
113115
index=index_name,
114-
body={"aliases": {index_alias_by_collection_id(collection_id): {}}},
116+
body={"aliases": {index_alias_by_collection_id(collection_id): {}, alias_name: {}}},
115117
)
116-
return index_name
118+
return alias_name
117119

118120

119121
class OpenSearchAdapter(SearchEngineAdapter):
@@ -135,15 +137,16 @@ async def create_datetime_index(
135137
self, client: Any, collection_id: str, start_date: str
136138
) -> str:
137139
index_name = self.index_by_collection_id_and_date(collection_id, start_date)
140+
alias_name = index_name.removeprefix(ITEMS_INDEX_PREFIX)
138141
await client.indices.create(
139142
index=index_name,
140143
body={
141-
"aliases": {index_alias_by_collection_id(collection_id): {}},
144+
"aliases": {index_alias_by_collection_id(collection_id): {}, alias_name: {}},
142145
"mappings": ES_ITEMS_MAPPINGS,
143146
"settings": ES_ITEMS_SETTINGS,
144147
},
145148
)
146-
return index_name
149+
return alias_name
147150

148151
def create_simple_index_sync(self, sync_client: Any, collection_id: str) -> str:
149152
index_name = f"{index_by_collection_id(collection_id)}-000001"
@@ -163,15 +166,16 @@ def create_datetime_index_sync(
163166
self, sync_client: Any, collection_id: str, start_date: str
164167
) -> str:
165168
index_name = self.index_by_collection_id_and_date(collection_id, start_date)
169+
alias_name = index_name.removeprefix(ITEMS_INDEX_PREFIX)
166170
sync_client.indices.create(
167171
index=index_name,
168172
body={
169-
"aliases": {index_alias_by_collection_id(collection_id): {}},
173+
"aliases": {index_alias_by_collection_id(collection_id): {}, alias_name: {}},
170174
"mappings": ES_ITEMS_MAPPINGS,
171175
"settings": ES_ITEMS_SETTINGS,
172176
},
173177
)
174-
return index_name
178+
return alias_name
175179

176180

177181
class SearchEngineAdapterFactory:
@@ -217,6 +221,10 @@ def should_create_collection_index(self) -> bool:
217221
async def create_simple_index(self, client: Any, collection_id: str):
218222
return await self.search_adapter.create_simple_index(client, collection_id)
219223

224+
async def get_index_size_in_gb(self, index_name: str) -> float:
225+
data = await self.client.indices.stats(index=index_name)
226+
return data["_all"]["primaries"]["store"]["size_in_bytes"] / 1e9
227+
220228
async def _get_target_index_base(
221229
self,
222230
index_selector,
@@ -243,16 +251,14 @@ async def _get_target_index_base(
243251
return target_index
244252

245253
if check_size:
246-
data = await self.client.indices.stats(index=target_index)
247-
current_size_in_gb = (
248-
data["_all"]["primaries"]["store"]["size_in_bytes"] / 1e9
249-
)
254+
breakpoint()
255+
index_size_gb = await self.get_index_size_in_gb(target_index)
250256
max_size_gb = float(os.getenv("DATETIME_INDEX_MAX_SIZE_GB", 20))
251257

252-
if current_size_in_gb > max_size_gb:
258+
if index_size_gb > max_size_gb:
253259
end_date = extract_date(product_datetime)
254260
if end_date != extract_date_from_index(all_indexes[-1]):
255-
await self.search_adapter.create_index_alias(
261+
await self.search_adapter.update_index_alias(
256262
self.client, collection_id, str(end_date)
257263
)
258264
target_index = await self.search_adapter.create_datetime_index(
@@ -298,18 +304,15 @@ async def prepare_bulk_actions(
298304
new_index = None
299305

300306
if first_item_index == latest_index:
301-
data = await self.client.indices.stats(index=first_item_index)
302-
current_size_in_gb = (
303-
data["_all"]["primaries"]["store"]["size_in_bytes"] / 1e9
304-
)
307+
index_size_gb = await self.get_index_size_in_gb(first_item_index)
305308
max_size_gb = float(os.getenv("DATETIME_INDEX_MAX_SIZE_GB", 20))
306309

307-
if current_size_in_gb > max_size_gb:
310+
if index_size_gb > max_size_gb:
308311
current_index_end_date = extract_date_from_index(first_item_index)
309312
first_item_date = extract_date(first_item["properties"]["datetime"])
310313

311314
if first_item_date != current_index_end_date:
312-
await self.search_adapter.create_index_alias(
315+
await self.search_adapter.update_index_alias(
313316
self.client, collection_id, str(current_index_end_date)
314317
)
315318
next_day_start = current_index_end_date + timedelta(days=1)
@@ -348,6 +351,10 @@ def __init__(self, sync_client, search_adapter: SearchEngineAdapter):
348351
def should_create_collection_index(self) -> bool:
349352
return False
350353

354+
def get_index_size_in_gb(self, index_name: str) -> float:
355+
data = self.sync_client.indices.stats(index=index_name)
356+
return data["_all"]["primaries"]["store"]["size_in_bytes"] / 1e9
357+
351358
def _get_target_index_base(
352359
self,
353360
index_selector,
@@ -372,16 +379,13 @@ def _get_target_index_base(
372379
return target_index
373380

374381
if check_size:
375-
data = self.sync_client.indices.stats(index=target_index)
376-
current_size_in_gb = (
377-
data["_all"]["primaries"]["store"]["size_in_bytes"] / 1e9
378-
)
382+
index_size_gb = self.get_index_size_in_gb(target_index)
379383
max_size_gb = float(os.getenv("DATETIME_INDEX_MAX_SIZE_GB", 20))
380384

381-
if current_size_in_gb > max_size_gb:
385+
if index_size_gb > max_size_gb:
382386
end_date = extract_date(product_datetime)
383387
if end_date != extract_date_from_index(all_indexes[-1]):
384-
self.search_adapter.create_index_alias_sync(
388+
self.search_adapter.update_index_alias_sync(
385389
self.sync_client, collection_id, str(end_date)
386390
)
387391
target_index = self.search_adapter.create_datetime_index_sync(
@@ -425,18 +429,14 @@ def prepare_bulk_actions(
425429
new_index = None
426430

427431
if first_item_index == latest_index:
428-
data = self.sync_client.indices.stats(index=first_item_index)
429-
current_size_in_gb = (
430-
data["_all"]["primaries"]["store"]["size_in_bytes"] / 1e9
431-
)
432+
index_size_gb = self.get_index_size_in_gb(first_item_index)
432433
max_size_gb = float(os.getenv("DATETIME_INDEX_MAX_SIZE_GB", 20))
433-
434-
if current_size_in_gb > max_size_gb:
434+
if index_size_gb > max_size_gb:
435435
current_index_end_date = extract_date_from_index(first_item_index)
436436
first_item_date = extract_date(first_item["properties"]["datetime"])
437437

438438
if first_item_date != current_index_end_date:
439-
self.search_adapter.create_index_alias_sync(
439+
self.search_adapter.update_index_alias_sync(
440440
self.sync_client, collection_id, str(current_index_end_date)
441441
)
442442
next_day_start = current_index_end_date + timedelta(days=1)

stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/index_selection_strategies.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,25 @@ async def select_indexes(
6969
class AsyncDatetimeBasedIndexSelector(IndexSelectionStrategy):
7070
"""Asynchronous index selector that filters indices based on datetime criteria with caching."""
7171

72+
_instance = None
73+
74+
def __new__(cls, client):
75+
if cls._instance is None:
76+
cls._instance = super().__new__(cls)
77+
return cls._instance
78+
7279
def __init__(self, client):
7380
"""Initialize the datetime-based index selector.
7481
7582
Args:
7683
client: Elasticsearch/OpenSearch client instance used for querying
7784
index aliases and metadata.
7885
"""
79-
self.client = client
80-
self._aliases_cache: Optional[Dict[str, List[str]]] = None
81-
self._cache_timestamp: float = 0
86+
if not hasattr(self, '_initialized'):
87+
self.client = client
88+
self._aliases_cache: Optional[Dict[str, List[str]]] = None
89+
self._cache_timestamp: float = 0
90+
self._initialized = True
8291

8392
@property
8493
def _cache_expired(self) -> bool:
@@ -101,14 +110,13 @@ async def _load_aliases_cache(self) -> Dict[str, List[str]]:
101110
"""
102111
response = await self.client.indices.get_alias(index=f"{ITEMS_INDEX_PREFIX}*")
103112
result = {}
104-
105113
for index_info in response.values():
106114
aliases = index_info.get("aliases", {})
107115
base_alias = None
108116
items_aliases = []
109117

110118
for alias_name in aliases.keys():
111-
if alias_name.startswith(ITEMS_INDEX_PREFIX):
119+
if not alias_name.startswith(ITEMS_INDEX_PREFIX):
112120
items_aliases.append(alias_name)
113121
else:
114122
base_alias = alias_name
@@ -151,7 +159,7 @@ async def get_collection_indexes(self, collection_id: str) -> List[str]:
151159
Returns empty list if collection is not found in cache.
152160
"""
153161
cache = await self.get_aliases_cache()
154-
return cache.get(collection_id, [])
162+
return cache.get(index_alias_by_collection_id(collection_id), [])
155163

156164
async def select_indexes(
157165
self,
@@ -194,16 +202,25 @@ async def select_indexes(
194202
class SyncDatetimeBasedIndexSelector(IndexSelectionStrategy):
195203
"""Synchronous index selector that filters indices based on datetime criteria with caching."""
196204

205+
_instance = None
206+
207+
def __new__(cls, client):
208+
if cls._instance is None:
209+
cls._instance = super().__new__(cls)
210+
return cls._instance
211+
197212
def __init__(self, sync_client):
198213
"""Initialize the datetime-based index selector.
199214
200215
Args:
201216
sync_client: Synchronous Elasticsearch/OpenSearch client instance used for querying
202217
index aliases and metadata.
203218
"""
204-
self.sync_client = sync_client
205-
self._aliases_cache: Optional[Dict[str, List[str]]] = None
206-
self._cache_timestamp: float = 0
219+
if not hasattr(self, '_initialized'):
220+
self.sync_client = sync_client
221+
self._aliases_cache: Optional[Dict[str, List[str]]] = None
222+
self._cache_timestamp: float = 0
223+
self._initialized = True
207224

208225
@property
209226
def _cache_expired(self) -> bool:
@@ -233,7 +250,7 @@ def _load_aliases_cache(self) -> Dict[str, List[str]]:
233250
items_aliases = []
234251

235252
for alias_name in aliases.keys():
236-
if alias_name.startswith(ITEMS_INDEX_PREFIX):
253+
if not alias_name.startswith(ITEMS_INDEX_PREFIX):
237254
items_aliases.append(alias_name)
238255
else:
239256
base_alias = alias_name
@@ -276,7 +293,7 @@ def get_collection_indexes(self, collection_id: str) -> List[str]:
276293
Returns empty list if collection is not found in cache.
277294
"""
278295
cache = self.get_aliases_cache()
279-
return cache.get(collection_id, [])
296+
return cache.get(index_alias_by_collection_id(collection_id), [])
280297

281298
def select_indexes(
282299
self,

0 commit comments

Comments
 (0)