Skip to content

Commit 506f170

Browse files
author
Grzegorz Pustulka
committed
index insertion strategies completed - before refactoring
1 parent a89bf54 commit 506f170

File tree

5 files changed

+170
-30
lines changed

5 files changed

+170
-30
lines changed

compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ services:
7575
hostname: elasticsearch
7676
environment:
7777
ES_JAVA_OPTS: -Xms512m -Xmx1g
78+
action.destructive_requires_name: false
7879
volumes:
7980
- ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
8081
- ./elasticsearch/snapshots:/usr/share/elasticsearch/snapshots

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ class DatabaseLogic(BaseDatabaseLogic):
126126
sync_settings: SyncElasticsearchSettings = attr.ib(
127127
factory=SyncElasticsearchSettings
128128
)
129-
index_selector: IndexSelectionStrategy = attr.ib(init=False)
129+
async_index_selector: IndexSelectionStrategy = attr.ib(init=False)
130+
sync_index_selector: IndexSelectionStrategy = attr.ib(init=False)
130131
async_index_inserter: AsyncIndexInserter = attr.ib(init=False)
131132
sync_index_inserter: SyncIndexInserter = attr.ib(init=False)
132133

@@ -143,7 +144,8 @@ def __attrs_post_init__(self):
143144
self.sync_index_inserter = IndexInsertionFactory.create_sync_insertion_strategy(
144145
self.sync_client
145146
)
146-
self.index_selector = IndexSelectorFactory.create_async_selector(self.client)
147+
self.async_index_selector = IndexSelectorFactory.create_async_selector(self.client)
148+
self.sync_index_selector = IndexSelectorFactory.create_sync_selector(self.sync_client)
147149

148150
item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer)
149151
collection_serializer: Type[CollectionSerializer] = attr.ib(
@@ -544,7 +546,7 @@ async def execute_search(
544546
search_after = json.loads(urlsafe_b64decode(token).decode())
545547

546548
query = search.query.to_dict() if search.query else None
547-
index_param = await self.index_selector.select_indexes(
549+
index_param = await self.async_index_selector.select_indexes(
548550
collection_ids, datetime_search
549551
)
550552

@@ -646,7 +648,7 @@ def _fill_aggregation_parameters(name: str, agg: dict) -> dict:
646648
if k in aggregations
647649
}
648650

649-
index_param = await self.index_selector.select_indexes(
651+
index_param = await self.async_index_selector.select_indexes(
650652
collection_ids, datetime_search
651653
)
652654

stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,9 @@ class DatabaseLogic(BaseDatabaseLogic):
112112

113113
async_settings: AsyncSearchSettings = attr.ib(factory=AsyncSearchSettings)
114114
sync_settings: SyncSearchSettings = attr.ib(factory=SyncSearchSettings)
115-
index_selector: IndexSelectionStrategy = attr.ib(init=False)
116115
async_index_inserter: AsyncIndexInserter = attr.ib(init=False)
117-
sync_index_inserter: SyncIndexInserter = attr.ib(init=False)
116+
async_index_selector: IndexSelectionStrategy = attr.ib(init=False)
117+
sync_index_selector: IndexSelectionStrategy = attr.ib(init=False)
118118
client = attr.ib(init=False)
119119
sync_client = attr.ib(init=False)
120120

@@ -128,7 +128,8 @@ def __attrs_post_init__(self):
128128
self.sync_index_inserter = IndexInsertionFactory.create_sync_insertion_strategy(
129129
self.sync_client
130130
)
131-
self.index_selector = IndexSelectorFactory.create_async_selector(self.client)
131+
self.async_index_selector = IndexSelectorFactory.create_async_selector(self.client)
132+
self.sync_index_selector = IndexSelectorFactory.create_sync_selector(self.sync_client)
132133

133134
item_serializer: Type[ItemSerializer] = attr.ib(default=ItemSerializer)
134135
collection_serializer: Type[CollectionSerializer] = attr.ib(
@@ -543,7 +544,7 @@ async def execute_search(
543544

544545
search_body["sort"] = sort if sort else DEFAULT_SORT
545546

546-
index_param = await self.index_selector.select_indexes(
547+
index_param = await self.async_index_selector.select_indexes(
547548
collection_ids, datetime_search
548549
)
549550

@@ -641,7 +642,7 @@ def _fill_aggregation_parameters(name: str, agg: dict) -> dict:
641642
if k in aggregations
642643
}
643644

644-
index_param = await self.index_selector.select_indexes(
645+
index_param = await self.async_index_selector.select_indexes(
645646
collection_ids, datetime_search
646647
)
647648

stac_fastapi/tests/api/test_api.py

Lines changed: 145 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -704,32 +704,55 @@ async def test_big_int_eo_search(
704704

705705

706706
@pytest.mark.asyncio
707-
async def test_create_item_uses_existing_datetime_index(app_client, load_test_data, txn_client, ctx):
707+
async def test_create_item_in_past_date_creates_separate_index(app_client, ctx, load_test_data, txn_client):
708708
if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"):
709709
pytest.skip()
710710

711711
item = load_test_data("test_item.json")
712712
item["id"] = str(uuid.uuid4())
713-
created_item = await app_client.post(f"/collections/{item['collection']}/items", json=item)
714-
assert created_item.status_code == 201
713+
item["properties"]["datetime"] = "2012-02-12T12:30:22Z"
714+
715+
response = await app_client.post(f"/collections/{item['collection']}/items", json=item)
716+
717+
assert response.status_code == 201
718+
715719
indices = await txn_client.database.client.indices.get_alias(index="*")
716-
assert 'items_test-collection_2020-02-12' in indices.keys()
717-
await app_client.delete(f"/collections/{item['collection']}/items/{item['id']}")
720+
expected_indices = ['items_test-collection_2012-02-12', 'items_test-collection_2020-02-12']
721+
722+
for expected_index in expected_indices:
723+
assert expected_index in indices.keys()
718724

719725

720726
@pytest.mark.asyncio
727+
async def test_create_item_uses_existing_datetime_index(app_client, ctx, load_test_data, txn_client):
728+
if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"):
729+
pytest.skip()
730+
731+
item = load_test_data("test_item.json")
732+
item["id"] = str(uuid.uuid4())
733+
734+
response = await app_client.post(f"/collections/{item['collection']}/items", json=item)
735+
736+
assert response.status_code == 201
737+
738+
indices = await txn_client.database.client.indices.get_alias(index="*")
739+
assert 'items_test-collection_2020-02-12' in indices.keys()
740+
741+
721742
async def test_create_item_with_different_date_same_index(app_client, load_test_data, txn_client, ctx):
722743
if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"):
723744
pytest.skip()
724745

725746
item = load_test_data("test_item.json")
726747
item["id"] = str(uuid.uuid4())
727748
item["properties"]["datetime"] = "2022-02-12T12:30:22Z"
728-
created_item = await app_client.post(f"/collections/{item['collection']}/items", json=item)
729-
assert created_item.status_code == 201
749+
750+
response = await app_client.post(f"/collections/{item['collection']}/items", json=item)
751+
752+
assert response.status_code == 201
753+
730754
indices = await txn_client.database.client.indices.get_alias(index="*")
731755
assert 'items_test-collection_2020-02-12' in indices.keys()
732-
await app_client.delete(f"/collections/{item['collection']}/items/{item['id']}")
733756

734757

735758
@pytest.mark.asyncio
@@ -743,25 +766,126 @@ async def test_create_new_index_when_size_limit_exceeded(app_client, load_test_d
743766

744767
with patch('stac_fastapi.sfeos_helpers.database.AsyncIndexInserter.get_index_size_in_gb') as mock_get_size:
745768
mock_get_size.return_value = 21.0
746-
created_item = await app_client.post(f"/collections/{item['collection']}/items", json=item)
769+
response = await app_client.post(f"/collections/{item['collection']}/items", json=item)
770+
771+
assert response.status_code == 201
747772

748-
assert created_item.status_code == 201
749773
indices = await txn_client.database.client.indices.get_alias(index="*")
750-
assert 'items_test-collection_2020-02-12' and "items_test-collection_2024-02-13" in indices.keys()
751-
await app_client.delete(f"/collections/{item['collection']}/items/{item['id']}")
774+
expected_indices = ['items_test-collection_2020-02-12', 'items_test-collection_2024-02-13']
775+
776+
for expected_index in expected_indices:
777+
assert expected_index in indices.keys()
752778

753779

754780
@pytest.mark.asyncio
755-
async def test_create_item_in_past_date_creates_separate_index(app_client, load_test_data, txn_client, ctx):
781+
async def test_bulk_create_items_with_same_date_range(app_client, load_test_data, txn_client, ctx):
756782
if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"):
757783
pytest.skip()
758784

759-
item = load_test_data("test_item.json")
760-
item["id"] = str(uuid.uuid4())
761-
item["properties"]["datetime"] = "2012-02-12T12:30:22Z"
762-
created_item = await app_client.post(f"/collections/{item['collection']}/items", json=item)
763-
breakpoint()
764-
assert created_item.status_code == 201
785+
base_item = load_test_data("test_item.json")
786+
items_dict = {}
787+
788+
for i in range(10):
789+
item = deepcopy(base_item)
790+
item["id"] = str(uuid.uuid4())
791+
item["properties"]["datetime"] = f"2020-02-{12 + i}T12:30:22Z"
792+
items_dict[item["id"]] = item
793+
794+
payload = {
795+
"items": items_dict,
796+
"method": "insert"
797+
}
798+
799+
response = await app_client.post(
800+
f"/collections/{base_item['collection']}/bulk_items",
801+
json=payload
802+
)
803+
804+
assert response.status_code == 200
805+
765806
indices = await txn_client.database.client.indices.get_alias(index="*")
766-
assert 'items_test-collection_2012-02-12' and 'items_test-collection_2020-02-12' in indices.keys()
767-
await app_client.delete(f"/collections/{item['collection']}/items/{item['id']}")
807+
assert 'items_test-collection_2020-02-12' in indices.keys()
808+
809+
810+
@pytest.mark.asyncio
811+
async def test_bulk_create_items_with_different_date_ranges(app_client, load_test_data, txn_client, ctx):
812+
if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"):
813+
pytest.skip()
814+
815+
base_item = load_test_data("test_item.json")
816+
items_dict = {}
817+
818+
for i in range(3):
819+
item = deepcopy(base_item)
820+
item["id"] = str(uuid.uuid4())
821+
item["properties"]["datetime"] = f"2020-02-{12 + i}T12:30:22Z"
822+
items_dict[item["id"]] = item
823+
824+
for i in range(2):
825+
item = deepcopy(base_item)
826+
item["id"] = str(uuid.uuid4())
827+
item["properties"]["datetime"] = f"2010-02-{10 + i}T12:30:22Z"
828+
items_dict[item["id"]] = item
829+
830+
payload = {
831+
"items": items_dict,
832+
"method": "insert"
833+
}
834+
835+
response = await app_client.post(
836+
f"/collections/{base_item['collection']}/bulk_items",
837+
json=payload
838+
)
839+
840+
assert response.status_code == 200
841+
842+
indices = await txn_client.database.client.indices.get_alias(index="*")
843+
expected_indices = ['items_test-collection_2020-02-12', 'items_test-collection_2010-02-10']
844+
845+
for expected_index in expected_indices:
846+
assert expected_index in indices.keys()
847+
848+
849+
@pytest.mark.asyncio
850+
async def test_bulk_create_items_with_size_limit_exceeded(app_client, load_test_data, txn_client, ctx):
851+
if not os.getenv("ENABLE_DATETIME_INDEX_FILTERING"):
852+
pytest.skip()
853+
854+
base_item = load_test_data("test_item.json")
855+
items_dict = {}
856+
857+
for i in range(3):
858+
item = deepcopy(base_item)
859+
item["id"] = str(uuid.uuid4())
860+
item["properties"]["datetime"] = f"2019-02-{15 + i}T12:30:22Z"
861+
items_dict[item["id"]] = item
862+
863+
for i in range(2):
864+
item = deepcopy(base_item)
865+
item["id"] = str(uuid.uuid4())
866+
item["properties"]["datetime"] = f"2010-02-{10 + i}T12:30:22Z"
867+
items_dict[item["id"]] = item
868+
869+
payload = {
870+
"items": items_dict,
871+
"method": "insert"
872+
}
873+
874+
with patch('stac_fastapi.sfeos_helpers.database.SyncIndexInserter.get_index_size_in_gb') as mock_get_size:
875+
mock_get_size.return_value = 21.0
876+
response = await app_client.post(
877+
f"/collections/{base_item['collection']}/bulk_items",
878+
json=payload
879+
)
880+
881+
assert response.status_code == 200
882+
883+
indices = await txn_client.database.client.indices.get_alias(index="*")
884+
expected_indices = [
885+
'items_test-collection_2010-02-10',
886+
'items_test-collection_2019-02-15',
887+
'items_test-collection_2020-02-12'
888+
]
889+
890+
for expected_index in expected_indices:
891+
assert expected_index in indices.keys()

stac_fastapi/tests/conftest.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from stac_fastapi.core.route_dependencies import get_route_dependencies
2929
from stac_fastapi.core.utilities import get_bool_env
3030
from stac_fastapi.sfeos_helpers.aggregation import EsAsyncBaseAggregationClient
31+
from stac_fastapi.sfeos_helpers.mappings import ITEMS_INDEX_PREFIX
3132

3233
if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch":
3334
from stac_fastapi.opensearch.config import AsyncOpensearchSettings as AsyncSettings
@@ -57,6 +58,7 @@
5758
TokenPaginationExtension,
5859
TransactionExtension,
5960
)
61+
from stac_fastapi.extensions.third_party import BulkTransactionExtension
6062
from stac_fastapi.types.config import Settings
6163

6264
DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
@@ -153,6 +155,9 @@ async def delete_collections_and_items(txn_client: TransactionsClient) -> None:
153155
await refresh_indices(txn_client)
154156
await txn_client.database.delete_items()
155157
await txn_client.database.delete_collections()
158+
await txn_client.database.client.indices.delete(index=f"{ITEMS_INDEX_PREFIX}*")
159+
await txn_client.database.async_index_selector.refresh_cache()
160+
txn_client.database.sync_index_selector.refresh_cache()
156161

157162

158163
async def refresh_indices(txn_client: TransactionsClient) -> None:
@@ -213,6 +218,13 @@ async def app():
213218
),
214219
settings=settings,
215220
),
221+
BulkTransactionExtension(
222+
client=BulkTransactionsClient(
223+
database=database,
224+
session=None,
225+
settings=settings,
226+
)
227+
),
216228
SortExtension(),
217229
FieldsExtension(),
218230
QueryExtension(),

0 commit comments

Comments
 (0)