-
Notifications
You must be signed in to change notification settings - Fork 1
chore: Request status tracking #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
7e77204
7a52732
724215a
52a2968
d65985c
5fdc1e0
9869c68
76ef424
c9fc83c
18ed414
d2e1a4b
3326007
23133ce
ef424bf
f3452b1
4e4a009
8ef15fb
b7a837f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,6 +60,7 @@ | |
MAX_FAMILY_LENGTH, | ||
MAX_QUEUE_SIZE, | ||
MAX_RUN_ID_LENGTH, | ||
MINIMAL_WAIT_FOR_ACK_SLEEP_TIME, | ||
MINIMAL_WAIT_FOR_PUT_SLEEP_TIME, | ||
STOP_MESSAGE_FREQUENCY, | ||
) | ||
|
@@ -172,15 +173,23 @@ def __init__( | |
max_queue_size_exceeded_callback=max_queue_size_exceeded_callback, | ||
on_network_error_callback=on_network_error_callback, | ||
) | ||
|
||
self._last_put_seq: Synchronized[int] = multiprocessing.Value("i", -1) | ||
self._last_put_seq_wait: ConditionT = multiprocessing.Condition() | ||
|
||
self._last_ack_seq: Synchronized[int] = multiprocessing.Value("i", -1) | ||
self._last_ack_seq_wait: ConditionT = multiprocessing.Condition() | ||
|
||
self._sync_process = SyncProcess( | ||
project=self._project, | ||
family=self._family, | ||
operations_queue=self._operations_queue.queue, | ||
errors_queue=self._errors_queue, | ||
api_token=input_api_token, | ||
last_put_seq=self._last_put_seq, | ||
last_put_seq_wait=self._last_put_seq_wait, | ||
last_ack_seq=self._last_ack_seq, | ||
last_ack_seq_wait=self._last_ack_seq_wait, | ||
max_queue_size=max_queue_size, | ||
mode=mode, | ||
) | ||
|
@@ -198,6 +207,7 @@ def __init__( | |
from_run_id=from_run_id, | ||
from_step=from_step, | ||
) | ||
self.wait_for_processing(verbose=False) | ||
|
||
@property | ||
def resources(self) -> tuple[Resource, ...]: | ||
|
@@ -208,10 +218,9 @@ def resources(self) -> tuple[Resource, ...]: | |
) | ||
|
||
def _close(self) -> None: | ||
# TODO: Change to wait for all operations to be processed | ||
with self._lock: | ||
if self._sync_process.is_alive(): | ||
self.wait_for_submission() | ||
self.wait_for_processing() | ||
self._sync_process.terminate() | ||
self._sync_process.join() | ||
|
||
|
@@ -320,49 +329,99 @@ def log( | |
for operation in splitter: | ||
self._operations_queue.enqueue(operation=operation) | ||
|
||
def wait_for_submission(self, timeout: Optional[float] = None) -> None: | ||
""" | ||
Waits until all metadata is submitted to Neptune. | ||
""" | ||
begin_time = time.time() | ||
logger.info("Waiting for all operations to be processed") | ||
if timeout is None: | ||
def _wait( | ||
self, | ||
phrase: str, | ||
sleep_time: float, | ||
wait_condition: ConditionT, | ||
external_value: Synchronized[int], | ||
timeout: Optional[float] = None, | ||
verbose: bool = True, | ||
) -> None: | ||
if verbose: | ||
logger.info(f"Waiting for all operations to be {phrase}") | ||
|
||
if timeout is None and verbose: | ||
logger.warning("No timeout specified. Waiting indefinitely") | ||
|
||
with self._lock: | ||
if not self._sync_process.is_alive(): | ||
logger.warning("Sync process is not running") | ||
if verbose: | ||
logger.warning("Sync process is not running") | ||
return # No need to wait if the sync process is not running | ||
|
||
sleep_time_wait = ( | ||
min(MINIMAL_WAIT_FOR_PUT_SLEEP_TIME, timeout) if timeout is not None else MINIMAL_WAIT_FOR_PUT_SLEEP_TIME | ||
) | ||
begin_time = time.time() | ||
wait_time = min(sleep_time, timeout) if timeout is not None else sleep_time | ||
last_queued_sequence_id = self._operations_queue.last_sequence_id | ||
last_message_printed: Optional[float] = None | ||
|
||
while True: | ||
with self._last_put_seq_wait: | ||
self._last_put_seq_wait.wait(timeout=sleep_time_wait) | ||
value = self._last_put_seq.value | ||
with wait_condition: | ||
wait_condition.wait(timeout=wait_time) | ||
value = external_value.value | ||
|
||
if value == -1: | ||
if self._operations_queue.last_sequence_id != -1: | ||
if last_message_printed is None or time.time() - last_message_printed > STOP_MESSAGE_FREQUENCY: | ||
if verbose and should_print_message(last_message_printed): | ||
last_message_printed = time.time() | ||
logger.info( | ||
"Waiting. No operations processed yet. Operations to sync: %s", | ||
f"Waiting. No operations were {phrase} yet. Operations to sync: %s", | ||
self._operations_queue.last_sequence_id + 1, | ||
) | ||
else: | ||
if last_message_printed is None or time.time() - last_message_printed > STOP_MESSAGE_FREQUENCY: | ||
if verbose and should_print_message(last_message_printed): | ||
last_message_printed = time.time() | ||
logger.info("Waiting. No operations processed yet") | ||
logger.info(f"Waiting. No operations were {phrase} yet") | ||
else: | ||
if last_message_printed is None or time.time() - last_message_printed > STOP_MESSAGE_FREQUENCY: | ||
if verbose and should_print_message(last_message_printed): | ||
last_message_printed = time.time() | ||
logger.info( | ||
"Waiting for remaining %d operation(s) to be synced", | ||
f"Waiting for remaining %d operation(s) to be {phrase}", | ||
last_queued_sequence_id - value + 1, | ||
) | ||
|
||
# Reaching the last queued sequence ID means that all operations were submitted | ||
if value >= last_queued_sequence_id or (timeout is not None and time.time() - begin_time > timeout): | ||
break | ||
|
||
logger.info("All operations processed") | ||
if verbose: | ||
logger.info(f"All operations were {phrase}") | ||
|
||
def wait_for_submission(self, timeout: Optional[float] = None, verbose: bool = True) -> None: | ||
""" | ||
Waits until all metadata is submitted to Neptune. | ||
|
||
Args: | ||
timeout (float, optional): In seconds, the maximum time to wait for submission. | ||
verbose (bool): If True (default), prints messages about the waiting process. | ||
""" | ||
self._wait( | ||
phrase="submitted", | ||
sleep_time=MINIMAL_WAIT_FOR_PUT_SLEEP_TIME, | ||
wait_condition=self._last_put_seq_wait, | ||
external_value=self._last_put_seq, | ||
timeout=timeout, | ||
verbose=verbose, | ||
) | ||
|
||
def wait_for_processing(self, timeout: Optional[float] = None, verbose: bool = True) -> None: | ||
""" | ||
Waits until all metadata is processed by Neptune. | ||
|
||
Args: | ||
timeout (float, optional): In seconds, the maximum time to wait for processing. | ||
verbose (bool): If True (default), prints messages about the waiting process. | ||
""" | ||
self._wait( | ||
phrase="processed", | ||
sleep_time=MINIMAL_WAIT_FOR_ACK_SLEEP_TIME, | ||
wait_condition=self._last_ack_seq_wait, | ||
external_value=self._last_ack_seq, | ||
timeout=timeout, | ||
verbose=verbose, | ||
) | ||
|
||
|
||
def should_print_message(last_message_printed: Optional[float]) -> bool: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kgodlewski I've done the refactor requested by you in previous PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this user-facing? If so, what's the purpose? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not 😉 |
||
"""Check if enough time has passed to print a message.""" | ||
return last_message_printed is None or time.time() - last_message_printed > STOP_MESSAGE_FREQUENCY |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiny detail, but in the future we could do:
def print_message(msg: str, last_print_timestamp: Optional[float], verbose: bool) -> float
, the func would return the current time if it printed the message. So we could just do:last_message_printed = print_message(f"some message", last_message_printed, verbose)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done