-
Couldn't load subscription status.
- Fork 43
feat(trainer): Refactor get_job_logs() API with Iterator #83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c140935
8e10eed
5ce8e7b
3d9951a
1b9b347
72a6b2e
70fb142
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,12 +15,12 @@ | |
| import copy | ||
| import logging | ||
| import multiprocessing | ||
| import queue | ||
| import random | ||
| import string | ||
| import time | ||
| import uuid | ||
| from typing import Optional, Union | ||
| from typing import Optional, Union, Iterator | ||
| import re | ||
|
|
||
| from kubeflow.trainer.constants import constants | ||
| from kubeflow.trainer.types import types | ||
|
|
@@ -173,7 +173,7 @@ def print_packages(): | |
| ) | ||
|
|
||
| self.wait_for_job_status(job_name) | ||
| print(self.get_job_logs(job_name)["node-0"]) | ||
| print("\n".join(self.get_job_logs(name=job_name))) | ||
| self.delete_job(job_name) | ||
|
|
||
| def train( | ||
|
|
@@ -328,92 +328,48 @@ def get_job_logs( | |
| self, | ||
| name: str, | ||
| follow: Optional[bool] = False, | ||
| step: str = constants.NODE, | ||
| node_rank: int = 0, | ||
| ) -> dict[str, str]: | ||
| """Get the logs from TrainJob""" | ||
|
|
||
| step: str = constants.NODE + "-0", | ||
| ) -> Iterator[str]: | ||
| """Get the TrainJob logs""" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need this docstring here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessary, but this is just reminder how this API is used for developers and AI tools 🙂 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, sounds good to me! |
||
| # Get the TrainJob Pod name. | ||
| pod_name = None | ||
| for c in self.get_job(name).steps: | ||
| if c.status != constants.POD_PENDING: | ||
| if c.name == step or c.name == f"{step}-{node_rank}": | ||
| pod_name = c.pod_name | ||
| if c.status != constants.POD_PENDING and c.name == step: | ||
| pod_name = c.pod_name | ||
| break | ||
| if pod_name is None: | ||
| return {} | ||
|
|
||
| # Dict where key is the Pod type and value is the Pod logs. | ||
| logs_dict = {} | ||
|
|
||
| # TODO (andreyvelich): Potentially, refactor this. | ||
| # Support logging of multiple Pods. | ||
| # TODO (andreyvelich): Currently, follow is supported only for node container. | ||
| if follow and step == constants.NODE: | ||
| log_streams = [] | ||
| log_streams.append( | ||
| watch.Watch().stream( | ||
| self.core_api.read_namespaced_pod_log, | ||
| name=pod_name, | ||
| namespace=self.namespace, | ||
| container=constants.NODE, | ||
| ) | ||
| ) | ||
| finished = [False] * len(log_streams) | ||
|
|
||
| # Create thread and queue per stream, for non-blocking iteration. | ||
| log_queue_pool = utils.get_log_queue_pool(log_streams) | ||
|
|
||
| # Iterate over every watching pods' log queue | ||
| while True: | ||
| for index, log_queue in enumerate(log_queue_pool): | ||
| if all(finished): | ||
| break | ||
| if finished[index]: | ||
| continue | ||
| # grouping the every 50 log lines of the same pod. | ||
| for _ in range(50): | ||
| try: | ||
| logline = log_queue.get(timeout=1) | ||
| if logline is None: | ||
| finished[index] = True | ||
| break | ||
| # Print logs to the StdOut and update results dict. | ||
| print(f"[{step}-{node_rank}]: {logline}") | ||
| logs_dict[f"{step}-{node_rank}"] = ( | ||
| logs_dict.get(f"{step}-{node_rank}", "") + logline + "\n" | ||
| ) | ||
| except queue.Empty: | ||
| break | ||
| if all(finished): | ||
| return logs_dict | ||
| return | ||
|
|
||
| # Remove the number for the node step. | ||
| container_name = re.sub(r"-\d+$", "", step) | ||
| try: | ||
| if step == constants.DATASET_INITIALIZER: | ||
| logs_dict[constants.DATASET_INITIALIZER] = self.core_api.read_namespaced_pod_log( | ||
| name=pod_name, | ||
| namespace=self.namespace, | ||
| container=constants.DATASET_INITIALIZER, | ||
| ) | ||
| elif step == constants.MODEL_INITIALIZER: | ||
| logs_dict[constants.MODEL_INITIALIZER] = self.core_api.read_namespaced_pod_log( | ||
| if follow: | ||
| log_stream = watch.Watch().stream( | ||
| self.core_api.read_namespaced_pod_log, | ||
| name=pod_name, | ||
| namespace=self.namespace, | ||
| container=constants.MODEL_INITIALIZER, | ||
| container=container_name, | ||
| follow=True, | ||
| ) | ||
|
|
||
| # Stream logs incrementally. | ||
| for logline in log_stream: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure each item are entire lines? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, log_stream is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've checked and it indeed does yield items line by line: ttps://github.com/kubernetes-client/python/blob/6e7c539f52dec4e993d2c32a4408920d8522f47e/kubernetes/base/watch/watch.py#L54-L83 I wasn't sure whether we had to do it ourselves or not. |
||
| yield logline # type:ignore | ||
| else: | ||
| logs_dict[f"{step}-{node_rank}"] = self.core_api.read_namespaced_pod_log( | ||
| logs = self.core_api.read_namespaced_pod_log( | ||
| name=pod_name, | ||
| namespace=self.namespace, | ||
| container=constants.NODE, | ||
| container=container_name, | ||
| ) | ||
|
|
||
| for line in logs.splitlines(): | ||
| yield line | ||
|
|
||
| except Exception as e: | ||
| raise RuntimeError( | ||
| f"Failed to read logs for the pod {self.namespace}/{pod_name}" | ||
| ) from e | ||
|
|
||
| return logs_dict | ||
|
|
||
| def wait_for_job_status( | ||
| self, | ||
| name: str, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it worth showing
follow=Truehere?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default
wait_for_job_status()waits until TrainJob is complete, so showing example with follow is unnecessary here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, missed this part