-
Couldn't load subscription status.
- Fork 43
feat(trainer): Refactor get_job_logs() API with Iterator #83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
c140935
8e10eed
5ce8e7b
3d9951a
1b9b347
72a6b2e
70fb142
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,12 +15,12 @@ | |
| import copy | ||
| import logging | ||
| import multiprocessing | ||
| import queue | ||
| import random | ||
| import string | ||
| import time | ||
| import uuid | ||
| from typing import Optional, Union | ||
| from typing import Optional, Union, Iterator | ||
| import re | ||
|
|
||
| from kubeflow.trainer.constants import constants | ||
| from kubeflow.trainer.types import types | ||
|
|
@@ -173,7 +173,7 @@ def print_packages(): | |
| ) | ||
|
|
||
| self.wait_for_job_status(job_name) | ||
| print(self.get_job_logs(job_name)["node-0"]) | ||
| print(self.get_job_logs(job_name)) | ||
|
||
| self.delete_job(job_name) | ||
|
|
||
| def train( | ||
|
|
@@ -328,92 +328,48 @@ def get_job_logs( | |
| self, | ||
| name: str, | ||
| follow: Optional[bool] = False, | ||
| step: str = constants.NODE, | ||
| node_rank: int = 0, | ||
| ) -> dict[str, str]: | ||
| """Get the logs from TrainJob""" | ||
|
|
||
| step: str = constants.NODE + "-0", | ||
| ) -> Iterator[str]: | ||
| """Get the TrainJob logs""" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need this docstring here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessary, but this is just reminder how this API is used for developers and AI tools 🙂 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, sounds good to me! |
||
| # Get the TrainJob Pod name. | ||
| pod_name = None | ||
| for c in self.get_job(name).steps: | ||
| if c.status != constants.POD_PENDING: | ||
| if c.name == step or c.name == f"{step}-{node_rank}": | ||
| pod_name = c.pod_name | ||
| if c.status != constants.POD_PENDING and c.name == step: | ||
| pod_name = c.pod_name | ||
| break | ||
| if pod_name is None: | ||
| return {} | ||
|
|
||
| # Dict where key is the Pod type and value is the Pod logs. | ||
| logs_dict = {} | ||
|
|
||
| # TODO (andreyvelich): Potentially, refactor this. | ||
| # Support logging of multiple Pods. | ||
| # TODO (andreyvelich): Currently, follow is supported only for node container. | ||
| if follow and step == constants.NODE: | ||
| log_streams = [] | ||
| log_streams.append( | ||
| watch.Watch().stream( | ||
| return iter([]) | ||
|
||
|
|
||
| try: | ||
| if follow: | ||
| log_stream = watch.Watch().stream( | ||
| self.core_api.read_namespaced_pod_log, | ||
| name=pod_name, | ||
| namespace=self.namespace, | ||
| container=constants.NODE, | ||
| container=re.sub(r"-\d+$", "", step), # Remove the number for the node step. | ||
|
||
| follow=True, | ||
| ) | ||
| ) | ||
| finished = [False] * len(log_streams) | ||
|
|
||
| # Create thread and queue per stream, for non-blocking iteration. | ||
| log_queue_pool = utils.get_log_queue_pool(log_streams) | ||
|
|
||
| # Iterate over every watching pods' log queue | ||
| while True: | ||
| for index, log_queue in enumerate(log_queue_pool): | ||
| if all(finished): | ||
| # Stream logs incrementally | ||
| for logline in log_stream: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure each item are entire lines? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, log_stream is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've checked and it indeed does yield items line by line: ttps://github.com/kubernetes-client/python/blob/6e7c539f52dec4e993d2c32a4408920d8522f47e/kubernetes/base/watch/watch.py#L54-L83 I wasn't sure whether we had to do it ourselves or not. |
||
| if logline is None: | ||
|
||
| break | ||
| if finished[index]: | ||
| continue | ||
| # grouping the every 50 log lines of the same pod. | ||
| for _ in range(50): | ||
| try: | ||
| logline = log_queue.get(timeout=1) | ||
| if logline is None: | ||
| finished[index] = True | ||
| break | ||
| # Print logs to the StdOut and update results dict. | ||
| print(f"[{step}-{node_rank}]: {logline}") | ||
| logs_dict[f"{step}-{node_rank}"] = ( | ||
| logs_dict.get(f"{step}-{node_rank}", "") + logline + "\n" | ||
| ) | ||
| except queue.Empty: | ||
| break | ||
| if all(finished): | ||
| return logs_dict | ||
|
|
||
| try: | ||
| if step == constants.DATASET_INITIALIZER: | ||
| logs_dict[constants.DATASET_INITIALIZER] = self.core_api.read_namespaced_pod_log( | ||
| name=pod_name, | ||
| namespace=self.namespace, | ||
| container=constants.DATASET_INITIALIZER, | ||
| ) | ||
| elif step == constants.MODEL_INITIALIZER: | ||
| logs_dict[constants.MODEL_INITIALIZER] = self.core_api.read_namespaced_pod_log( | ||
| name=pod_name, | ||
| namespace=self.namespace, | ||
| container=constants.MODEL_INITIALIZER, | ||
| ) | ||
| yield logline # type:ignore | ||
| else: | ||
| logs_dict[f"{step}-{node_rank}"] = self.core_api.read_namespaced_pod_log( | ||
| logs = self.core_api.read_namespaced_pod_log( | ||
| name=pod_name, | ||
| namespace=self.namespace, | ||
| container=constants.NODE, | ||
| container=re.sub(r"-\d+$", "", step), # Remove the number for the node step. | ||
| ) | ||
|
|
||
| for line in logs.splitlines(): | ||
| yield line | ||
|
|
||
| except Exception as e: | ||
| raise RuntimeError( | ||
| f"Failed to read logs for the pod {self.namespace}/{pod_name}" | ||
| ) from e | ||
|
|
||
| return logs_dict | ||
|
|
||
| def wait_for_job_status( | ||
| self, | ||
| name: str, | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -917,14 +917,12 @@ def test_list_jobs(trainer_client, test_case): | |||||||
| name="valid flow with all defaults", | ||||||||
| expected_status=SUCCESS, | ||||||||
| config={"name": BASIC_TRAIN_JOB_NAME}, | ||||||||
| expected_output={ | ||||||||
| "node-0": "test log content", | ||||||||
| }, | ||||||||
| expected_output=["test log content"], | ||||||||
| ), | ||||||||
| TestCase( | ||||||||
| name="runtime error when getting logs", | ||||||||
| expected_status=FAILED, | ||||||||
| config={"name": RUNTIME}, | ||||||||
| config={"name": BASIC_TRAIN_JOB_NAME, "namespace": FAIL_LOGS}, | ||||||||
| expected_error=RuntimeError, | ||||||||
| ), | ||||||||
| ], | ||||||||
|
|
@@ -933,10 +931,12 @@ def test_get_job_logs(trainer_client, test_case): | |||||||
| """Test TrainerClient.get_job_logs with basic success path.""" | ||||||||
| print("Executing test:", test_case.name) | ||||||||
| try: | ||||||||
| trainer_client.namespace = test_case.config.get("namespace", DEFAULT_NAMESPACE) | ||||||||
|
||||||||
| trainer_client.namespace = test_case.config.get("namespace", DEFAULT_NAMESPACE) | |
| trainer_client.backend.namespace = test_case.config.get("namespace", DEFAULT_NAMESPACE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, Trainer Client here is the Kubernetes backend, not TrainerClient()
| yield KubernetesBackend(KubernetesBackendConfig()) |
Let me rename it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh I see, thank you!
Uh oh!
There was an error while loading. Please reload this page.