-
Notifications
You must be signed in to change notification settings - Fork 13
remove s3 bucket polling when waiting for transformation results #587
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
90b6173
ee7b02c
5b04365
85b5346
b80d72d
ccfe38c
36cb4aa
eb9a937
bf805bb
9492997
90a0a6b
ab9d0cf
3644ee1
9b506c5
932a7e9
6d18ff3
04ae90c
c1da28f
ffd87ad
98392dd
8e952eb
232be85
0f094c0
b79cb26
c8ad74c
489e05d
4e01dee
ee0423f
f5bf546
5bbe40f
201531a
268592f
7db79e1
6e20fe6
270bd8b
fc29f59
bf49d24
b49edc7
aa3f2fb
9785fc6
72c7bca
d218879
b0eee6c
925c358
7ddfc60
23a0f13
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
from __future__ import annotations | ||
|
||
import datetime | ||
import abc | ||
import asyncio | ||
from abc import ABC | ||
|
@@ -318,6 +319,7 @@ def transform_complete(task: Task): | |
else None | ||
) | ||
|
||
begin_at = datetime.datetime.now(tz=datetime.timezone.utc) | ||
if not cached_record: | ||
|
||
if self.cache.is_transform_request_submitted(sx_request_hash): | ||
|
@@ -342,13 +344,18 @@ def transform_complete(task: Task): | |
|
||
download_files_task = loop.create_task( | ||
self.download_files( | ||
signed_urls_only, expandable_progress, download_progress, cached_record | ||
signed_urls_only, | ||
expandable_progress, | ||
download_progress, | ||
cached_record, | ||
begin_at, | ||
) | ||
) | ||
|
||
try: | ||
signed_urls = [] | ||
downloaded_files = [] | ||
|
||
download_result = await download_files_task | ||
if signed_urls_only: | ||
signed_urls = download_result | ||
|
@@ -517,11 +524,13 @@ async def download_files( | |
progress: ExpandableProgress, | ||
download_progress: TaskID, | ||
cached_record: Optional[TransformedResults], | ||
begin_at: datetime.datetime, | ||
) -> List[str]: | ||
""" | ||
Task to monitor the list of files in the transform output's bucket. Any new files | ||
will be downloaded. | ||
""" | ||
|
||
files_seen = set() | ||
result_uris = [] | ||
download_tasks = [] | ||
|
@@ -557,15 +566,22 @@ async def get_signed_url( | |
if self.minio: | ||
# if self.minio exists, self.current_status will too | ||
if self.current_status.files_completed > len(files_seen): | ||
files = await self.minio.list_bucket() | ||
new_begin_at = datetime.datetime.now(tz=datetime.timezone.utc) | ||
files = await self.servicex.get_transformation_results( | ||
self.current_status.request_id, begin_at | ||
) | ||
begin_at = new_begin_at | ||
|
||
for file in files: | ||
if file.filename not in files_seen: | ||
file_path = file.get("file-path", "").replace("/", ":") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This relies on being kept in line with the transformer sidecar, and I think it may fail in certain cases (like the parquet output format is selected). Should we add a new field in the |
||
|
||
if file_path != "" and file_path not in files_seen: | ||
if signed_urls_only: | ||
download_tasks.append( | ||
loop.create_task( | ||
get_signed_url( | ||
self.minio, | ||
file.filename, | ||
file_path, | ||
progress, | ||
download_progress, | ||
) | ||
|
@@ -576,14 +592,14 @@ async def get_signed_url( | |
loop.create_task( | ||
download_file( | ||
self.minio, | ||
file.filename, | ||
file_path, | ||
progress, | ||
download_progress, | ||
shorten_filename=self.configuration.shortened_downloaded_filename, # NOQA: E501 | ||
) | ||
) | ||
) # NOQA 501 | ||
files_seen.add(file.filename) | ||
files_seen.add(file_path) | ||
|
||
# Once the transform is complete and all files are seen we can stop polling. | ||
# Also, if we are just downloading or signing urls for a previous transform | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is relying on synchronization of clocks between the client and the server. Wouldn't it be better to set
new_begin_at
to be the latest result timestamp we see in the transform_results?