9
9
from stac_fastapi .core .utilities import get_bool_env
10
10
from stac_fastapi .sfeos_helpers .database import (
11
11
extract_date ,
12
- extract_date_from_index ,
13
12
index_alias_by_collection_id ,
14
13
index_by_collection_id ,
15
14
mk_item_id ,
15
+ extract_first_date_from_index ,
16
16
)
17
17
from stac_fastapi .sfeos_helpers .mappings import (
18
18
_ES_INDEX_NAME_UNSUPPORTED_CHARS_TABLE ,
@@ -54,19 +54,27 @@ def create_datetime_index_sync(
54
54
) -> str :
55
55
pass
56
56
57
- async def update_index_alias (self , client : Any , collection_id : str , end_date : str ):
58
- index = index_alias_by_collection_id (collection_id )
59
- await client .indices .put_alias (
60
- index = index , name = self .alias_by_index_and_end_date (index , end_date )
61
- )
62
-
63
- def update_index_alias_sync (
64
- self , sync_client : Any , collection_id : str , end_date : str
65
- ):
66
- index = index_alias_by_collection_id (collection_id )
67
- sync_client .indices .put_alias (
68
- index = index , name = self .alias_by_index_and_end_date (index , end_date )
69
- )
57
+ async def update_index_alias (self , client : Any , end_date : str , old_alias : str ):
58
+ index = ITEMS_INDEX_PREFIX + old_alias
59
+ new_alias = self .alias_by_index_and_end_date (old_alias , end_date )
60
+ await client .indices .update_aliases (body = {
61
+ "actions" : [
62
+ {"remove" : {"index" : index , "alias" : old_alias }},
63
+ {"add" : {"index" : index , "alias" : new_alias }}
64
+ ]
65
+ })
66
+ return new_alias
67
+
68
+ def update_index_alias_sync (self , client : Any , end_date : str , old_alias : str ):
69
+ index = ITEMS_INDEX_PREFIX + old_alias
70
+ new_alias = self .alias_by_index_and_end_date (old_alias , end_date )
71
+ client .indices .update_aliases (body = {
72
+ "actions" : [
73
+ {"remove" : {"index" : index , "alias" : old_alias }},
74
+ {"add" : {"index" : index , "alias" : new_alias }}
75
+ ]
76
+ })
77
+ return new_alias
70
78
71
79
@staticmethod
72
80
def index_by_collection_id_and_date (collection_id : str , start_date : str ) -> str :
@@ -247,19 +255,29 @@ async def _get_target_index_base(
247
255
return target_index
248
256
249
257
all_indexes .sort ()
258
+
259
+ if (start_date := extract_date (product_datetime )) < (end_date := extract_first_date_from_index (all_indexes [0 ])):
260
+ target_index = await self .search_adapter .create_datetime_index (
261
+ self .client , collection_id , str (start_date )
262
+ )
263
+ alias = await self .search_adapter .update_index_alias (
264
+ self .client , str (end_date - timedelta (days = 1 )), target_index
265
+ )
266
+ await index_selector .refresh_cache ()
267
+ return alias
268
+
250
269
if target_index != all_indexes [- 1 ]:
251
270
return target_index
252
271
253
272
if check_size :
254
- breakpoint ()
255
273
index_size_gb = await self .get_index_size_in_gb (target_index )
256
274
max_size_gb = float (os .getenv ("DATETIME_INDEX_MAX_SIZE_GB" , 20 ))
257
275
258
276
if index_size_gb > max_size_gb :
259
277
end_date = extract_date (product_datetime )
260
- if end_date != extract_date_from_index (all_indexes [- 1 ]):
278
+ if end_date != extract_first_date_from_index (all_indexes [- 1 ]):
261
279
await self .search_adapter .update_index_alias (
262
- self .client , collection_id , str (end_date )
280
+ self .client , str (end_date ), target_index
263
281
)
264
282
target_index = await self .search_adapter .create_datetime_index (
265
283
self .client , collection_id , (end_date + timedelta (days = 1 ))
@@ -308,12 +326,12 @@ async def prepare_bulk_actions(
308
326
max_size_gb = float (os .getenv ("DATETIME_INDEX_MAX_SIZE_GB" , 20 ))
309
327
310
328
if index_size_gb > max_size_gb :
311
- current_index_end_date = extract_date_from_index (first_item_index )
329
+ current_index_end_date = extract_first_date_from_index (first_item_index )
312
330
first_item_date = extract_date (first_item ["properties" ]["datetime" ])
313
331
314
332
if first_item_date != current_index_end_date :
315
333
await self .search_adapter .update_index_alias (
316
- self .client , collection_id , str (current_index_end_date )
334
+ self .client , str (current_index_end_date ), latest_index
317
335
)
318
336
next_day_start = current_index_end_date + timedelta (days = 1 )
319
337
new_index = await self .search_adapter .create_datetime_index (
@@ -374,6 +392,16 @@ def _get_target_index_base(
374
392
index_selector .refresh_cache ()
375
393
return target_index
376
394
395
+ if (start_date := extract_date (product_datetime )) < (end_date := extract_first_date_from_index (all_indexes [0 ])):
396
+ target_index = self .search_adapter .create_datetime_index_sync (
397
+ self .sync_client , collection_id , str (start_date )
398
+ )
399
+ alias = self .search_adapter .update_index_alias_sync (
400
+ self .sync_client , str (end_date - timedelta (days = 1 )), target_index
401
+ )
402
+ index_selector .refresh_cache ()
403
+ return alias
404
+
377
405
all_indexes .sort ()
378
406
if target_index != all_indexes [- 1 ]:
379
407
return target_index
@@ -384,9 +412,9 @@ def _get_target_index_base(
384
412
385
413
if index_size_gb > max_size_gb :
386
414
end_date = extract_date (product_datetime )
387
- if end_date != extract_date_from_index (all_indexes [- 1 ]):
388
- self .search_adapter .update_index_alias_sync (
389
- self .sync_client , collection_id , str (end_date )
415
+ if end_date != extract_first_date_from_index (all_indexes [- 1 ]):
416
+ self .search_adapter .update_index_alias (
417
+ self .sync_client , str (end_date ), target_index
390
418
)
391
419
target_index = self .search_adapter .create_datetime_index_sync (
392
420
self .sync_client , collection_id , (end_date + timedelta (days = 1 ))
@@ -432,12 +460,12 @@ def prepare_bulk_actions(
432
460
index_size_gb = self .get_index_size_in_gb (first_item_index )
433
461
max_size_gb = float (os .getenv ("DATETIME_INDEX_MAX_SIZE_GB" , 20 ))
434
462
if index_size_gb > max_size_gb :
435
- current_index_end_date = extract_date_from_index (first_item_index )
463
+ current_index_end_date = extract_first_date_from_index (first_item_index )
436
464
first_item_date = extract_date (first_item ["properties" ]["datetime" ])
437
465
438
466
if first_item_date != current_index_end_date :
439
467
self .search_adapter .update_index_alias_sync (
440
- self .sync_client , collection_id , str (current_index_end_date )
468
+ self .sync_client , str (current_index_end_date ), latest_index
441
469
)
442
470
next_day_start = current_index_end_date + timedelta (days = 1 )
443
471
new_index = self .search_adapter .create_datetime_index_sync (
0 commit comments