Skip to content

Commit 5b04365

Browse files
committed
add begin_at, update tests
1 parent ee7b02c commit 5b04365

File tree

3 files changed

+54
-22
lines changed

3 files changed

+54
-22
lines changed

servicex/query_core.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2828
from __future__ import annotations
2929

30+
import datetime
3031
import abc
3132
import asyncio
3233
from abc import ABC
@@ -318,6 +319,7 @@ def transform_complete(task: Task):
318319
else None
319320
)
320321

322+
begin_at = datetime.datetime.now(datetime.UTC)
321323
if not cached_record:
322324

323325
if self.cache.is_transform_request_submitted(sx_request_hash):
@@ -342,13 +344,14 @@ def transform_complete(task: Task):
342344

343345
download_files_task = loop.create_task(
344346
self.download_files(
345-
signed_urls_only, expandable_progress, download_progress, cached_record
347+
signed_urls_only, expandable_progress, download_progress, cached_record, begin_at
346348
)
347349
)
348350

349351
try:
350352
signed_urls = []
351353
downloaded_files = []
354+
352355
download_result = await download_files_task
353356
if signed_urls_only:
354357
signed_urls = download_result
@@ -517,6 +520,7 @@ async def download_files(
517520
progress: ExpandableProgress,
518521
download_progress: TaskID,
519522
cached_record: Optional[TransformedResults],
523+
begin_at: datetime.datetime,
520524
) -> List[str]:
521525
"""
522526
Task to monitor the list of files in the transform output's bucket. Any new files
@@ -558,20 +562,25 @@ async def get_signed_url(
558562
if self.minio:
559563
# if self.minio exists, self.current_status will too
560564
if self.current_status.files_completed > len(files_seen):
565+
new_begin_at = datetime.datetime.now(datetime.UTC)
561566
files = await self.servicex.get_transformation_results(
562-
self.current_status.request_id
567+
self.current_status.request_id,
568+
begin_at
563569
)
570+
begin_at = new_begin_at
564571

565572
for file in files:
566573
if "file-path" not in file:
567574
continue
568575

569576
file_path = file["file-path"].replace("/", ":")
577+
570578
if file_path not in files_seen:
571579
if signed_urls_only:
572580
download_tasks.append(
573581
loop.create_task(
574582
get_signed_url(
583+
self.minio,
575584
file_path,
576585
progress,
577586
download_progress,

servicex/servicex_adapter.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2828
import os
2929
import time
30+
import datetime
3031
from typing import Optional, Dict, List
3132

3233
from aiohttp import ClientSession
@@ -228,12 +229,16 @@ async def delete_transform(self, transform_id=None):
228229
f"Failed to delete transform {transform_id} - {msg}"
229230
)
230231

231-
async def get_transformation_results(self, request_id: str):
232+
async def get_transformation_results(self, request_id: str, begin_at: datetime.datetime):
232233
headers = await self._get_authorization()
233-
url = self.url + f"/servicex/internal/transformation/{request_id}/results"
234+
url = self.url + f"/servicex/transformation/{request_id}/results"
235+
236+
params = {}
237+
if begin_at:
238+
params["begin_at"] = begin_at.isoformat()
234239

235240
async with ClientSession() as session:
236-
async with session.get(headers=headers, url=url) as r:
241+
async with session.get(headers=headers, url=url, params=params) as r:
237242
if r.status == 403:
238243
raise AuthorizationError(
239244
f"Not authorized to access serviceX at {self.url}"
@@ -246,7 +251,8 @@ async def get_transformation_results(self, request_id: str):
246251
msg = await _extract_message(r)
247252
raise RuntimeError(f"Failed with message: {msg}")
248253

249-
return (await r.json())["results"]
254+
data = await r.json()
255+
return data.get("results")
250256

251257
async def cancel_transform(self, transform_id=None):
252258
headers = await self._get_authorization()

tests/test_servicex_dataset.py

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,18 @@ def cache_transform(record: TransformedResults):
204204
@pytest.mark.asyncio
205205
async def test_submit(mocker):
206206
servicex = AsyncMock()
207+
207208
servicex.submit_transform = AsyncMock()
208209
servicex.submit_transform.return_value = {"request_id": '123-456-789"'}
210+
211+
servicex.get_transformation_results = AsyncMock(side_effect=[
212+
[{"file-path": file1.filename}],
213+
[
214+
{"file-path": file1.filename},
215+
{"file-path": file2.filename},
216+
],
217+
])
218+
209219
servicex.get_transform_status = AsyncMock()
210220
servicex.get_transform_status.side_effect = [
211221
transform_status1,
@@ -214,7 +224,6 @@ async def test_submit(mocker):
214224
]
215225

216226
mock_minio = AsyncMock()
217-
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]])
218227
mock_minio.download_file = AsyncMock(
219228
side_effect=lambda a, _, shorten_filename: PurePath(a)
220229
)
@@ -235,6 +244,7 @@ async def test_submit(mocker):
235244
config=Configuration(api_endpoints=[]),
236245
)
237246
datasource.query_string_generator = FuncADLQuery_Uproot().FromTree("nominal")
247+
238248
with ExpandableProgress(display_progress=False) as progress:
239249
datasource.result_format = ResultFormat.parquet
240250
result = await datasource.submit_and_download(
@@ -245,11 +255,18 @@ async def test_submit(mocker):
245255
mock_cache.cache_transform.assert_called_once()
246256

247257

258+
248259
@pytest.mark.asyncio
249260
async def test_submit_partial_success(mocker):
250261
servicex = AsyncMock()
251262
servicex.submit_transform = AsyncMock()
252263
servicex.submit_transform.return_value = {"request_id": '123-456-789"'}
264+
265+
servicex.get_transformation_results = AsyncMock(side_effect=[
266+
[{"file-path": file1.filename}],
267+
[{"file-path": file1.filename}],
268+
])
269+
253270
servicex.get_transform_status = AsyncMock()
254271
servicex.get_transform_status.side_effect = [
255272
transform_status1,
@@ -258,7 +275,6 @@ async def test_submit_partial_success(mocker):
258275
]
259276

260277
mock_minio = AsyncMock()
261-
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]])
262278
mock_minio.download_file = AsyncMock(
263279
side_effect=lambda a, _, shorten_filename: PurePath(a)
264280
)
@@ -295,13 +311,16 @@ async def test_use_of_cache(mocker):
295311
servicex = AsyncMock()
296312
servicex.submit_transform = AsyncMock()
297313
servicex.submit_transform.return_value = {"request_id": '123-456-789"'}
314+
servicex.get_transformation_results = AsyncMock(return_value=[
315+
{"file-path": file1.filename},
316+
{"file-path": file2.filename}
317+
])
298318
servicex.get_transform_status = AsyncMock()
299319
servicex.get_transform_status.side_effect = [
300320
transform_status1,
301321
transform_status3,
302322
]
303323
mock_minio = AsyncMock()
304-
mock_minio.list_bucket = AsyncMock(return_value=[file1, file2])
305324
mock_minio.download_file = AsyncMock(
306325
side_effect=lambda a, _, shorten_filename: PurePath(a)
307326
)
@@ -336,7 +355,7 @@ async def test_use_of_cache(mocker):
336355
# second round, should hit the cache (and not call the sx_adapter, minio, or update_record)
337356
with ExpandableProgress(display_progress=False) as progress:
338357
servicex2 = AsyncMock()
339-
mock_minio.list_bucket.reset_mock()
358+
servicex.get_transformation_results.reset_mock()
340359
mock_minio.get_signed_url.reset_mock()
341360
datasource2 = Query(
342361
dataset_identifier=did,
@@ -354,14 +373,14 @@ async def test_use_of_cache(mocker):
354373
signed_urls_only=True, expandable_progress=progress
355374
)
356375
servicex2.assert_not_awaited()
357-
mock_minio.list_bucket.assert_not_awaited()
376+
servicex.get_transformation_results.assert_not_awaited()
358377
mock_minio.get_signed_url.assert_not_awaited()
359378
upd.assert_not_called()
360379
assert result1 == result2
361380
upd.reset_mock()
362381
servicex.get_transform_status.reset_mock(side_effect=True)
363382
servicex.get_transform_status.return_value = transform_status3
364-
mock_minio.list_bucket.reset_mock(side_effect=True)
383+
servicex.get_transformation_results.reset_mock(side_effect=True)
365384
# third round, should hit the cache and download files (and call update_record)
366385
with ExpandableProgress(display_progress=False) as progress:
367386
await datasource.submit_and_download(
@@ -371,14 +390,14 @@ async def test_use_of_cache(mocker):
371390
assert mock_minio.download_file.await_count == 2
372391
upd.assert_called_once()
373392
# fourth round, should hit the cache (and nothing else)
374-
mock_minio.list_bucket.reset_mock()
393+
servicex.get_transformation_results.reset_mock()
375394
mock_minio.download_file.reset_mock()
376395
with ExpandableProgress(display_progress=False) as progress:
377396
await datasource.submit_and_download(
378397
signed_urls_only=False, expandable_progress=progress
379398
)
380399
servicex.assert_not_awaited()
381-
mock_minio.list_bucket.assert_not_awaited()
400+
servicex.get_transformation_results.assert_not_awaited()
382401
mock_minio.download_file.assert_not_awaited()
383402
upd.assert_called_once()
384403
cache.close()
@@ -396,7 +415,6 @@ async def test_submit_cancel(mocker):
396415
]
397416

398417
mock_minio = AsyncMock()
399-
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]])
400418
mock_minio.download_file = AsyncMock(
401419
side_effect=lambda a, _, shorten_filename: PurePath(a)
402420
)
@@ -437,7 +455,6 @@ async def test_submit_fatal(mocker):
437455
]
438456

439457
mock_minio = AsyncMock()
440-
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]])
441458
mock_minio.download_file = AsyncMock(
442459
side_effect=lambda a, _, shorten_filename: PurePath(a)
443460
)
@@ -482,7 +499,6 @@ async def test_submit_generic(mocker, codegen_list):
482499
]
483500

484501
mock_minio = AsyncMock()
485-
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]])
486502
mock_minio.download_file = AsyncMock()
487503

488504
mock_cache = mocker.MagicMock(QueryCache)
@@ -531,7 +547,6 @@ async def test_submit_cancelled(mocker, codegen_list):
531547
sx.get_transform_status.side_effect = [transform_status4]
532548

533549
mock_minio = AsyncMock()
534-
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]])
535550
mock_minio.download_file = AsyncMock()
536551

537552
mock_cache = mocker.MagicMock(QueryCache)
@@ -601,10 +616,12 @@ async def test_use_of_ignore_cache(mocker, servicex):
601616
transform_status3,
602617
]
603618
)
604-
619+
servicex.get_transformation_results = AsyncMock(return_value=[
620+
{"file-path": file1.filename},
621+
{"file-path": file2.filename}
622+
])
605623
# Prepare Minio
606624
mock_minio = AsyncMock()
607-
mock_minio.list_bucket = AsyncMock(return_value=[file1, file2])
608625
mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"])
609626
mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio)
610627
did = FileListDataset("/foo/bar/baz.root")
@@ -674,13 +691,13 @@ async def test_use_of_ignore_cache(mocker, servicex):
674691
transform_status1,
675692
transform_status3,
676693
]
677-
mock_minio.list_bucket.reset_mock()
694+
servicex.get_transformation_results.reset_mock()
678695
mock_minio.download_file.reset_mock()
679696
with ExpandableProgress(display_progress=False) as progress:
680697
res = await datasource_without_ignore_cache.submit_and_download(
681698
signed_urls_only=True, expandable_progress=progress
682699
) # noqa
683-
mock_minio.list_bucket.assert_not_awaited()
700+
servicex.get_transformation_results.assert_not_awaited()
684701
mock_minio.download_file.assert_not_awaited()
685702
assert len(res.signed_url_list) == 2
686703
cache.close()

0 commit comments

Comments
 (0)