From c14093562c46c179b8886b43687e2f62c69e904a Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Wed, 3 Sep 2025 02:08:34 +0100 Subject: [PATCH 1/7] feat(trainer): Refactor get_job_logs() API Signed-off-by: Andrey Velichkevich --- kubeflow/trainer/api/trainer_client.py | 36 +++++-- kubeflow/trainer/backends/base.py | 7 +- .../trainer/backends/kubernetes/backend.py | 93 +++++-------------- 3 files changed, 56 insertions(+), 80 deletions(-) diff --git a/kubeflow/trainer/api/trainer_client.py b/kubeflow/trainer/api/trainer_client.py index 833cc82c..073e1418 100644 --- a/kubeflow/trainer/api/trainer_client.py +++ b/kubeflow/trainer/api/trainer_client.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import Optional, Union +from typing import Optional, Union, Iterator from kubeflow.trainer.constants import constants from kubeflow.trainer.types import types @@ -120,8 +120,7 @@ def list_jobs(self, runtime: Optional[types.Runtime] = None) -> list[types.Train runtime: Reference to one of the existing runtimes. Returns: - List: List of created TrainJobs. - If no TrainJob exist, an empty list is returned. + List of created TrainJobs. If no TrainJob exist, an empty list is returned. Raises: TimeoutError: Timeout to list TrainJobs. @@ -148,12 +147,33 @@ def get_job(self, name: str) -> types.TrainJob: def get_job_logs( self, name: str, + step: str = constants.NODE + "-0", follow: Optional[bool] = False, - step: str = constants.NODE, - node_rank: int = 0, - ) -> dict[str, str]: - """Get the logs from TrainJob""" - return self.backend.get_job_logs(name=name, follow=follow, step=step, node_rank=node_rank) + ) -> Iterator[str]: + """Get logs from a specific step of a TrainJob. + + You can watch for the logs in realtime as follows: + ```python + from kubeflow.trainer import TrainerClient + + for logline in TrainerClient().get_job_logs(name="s8d44aa4fb6d", follow=True): + print(logline) + ``` + + Args: + name: Name of the TrainJob. + step: Step of the TrainJob to collect logs from, like dataset-initializer or node-0. + follow: Whether to stream logs in realtime as they produced. + + Returns: + Iterator of log lines. + + + Raises: + TimeoutError: Timeout to get a TrainJob. + RuntimeError: Failed to get a TrainJob. + """ + return self.backend.get_job_logs(name=name, follow=follow, step=step) def wait_for_job_status( self, diff --git a/kubeflow/trainer/backends/base.py b/kubeflow/trainer/backends/base.py index 359651b5..729f769c 100644 --- a/kubeflow/trainer/backends/base.py +++ b/kubeflow/trainer/backends/base.py @@ -14,7 +14,7 @@ import abc -from typing import Optional, Union +from typing import Optional, Union, Iterator from kubeflow.trainer.constants import constants from kubeflow.trainer.types import types @@ -47,9 +47,8 @@ def get_job_logs( self, name: str, follow: Optional[bool] = False, - step: str = constants.NODE, - node_rank: int = 0, - ) -> dict[str, str]: + step: str = constants.NODE + "-0", + ) -> Iterator[str]: raise NotImplementedError() def wait_for_job_status( diff --git a/kubeflow/trainer/backends/kubernetes/backend.py b/kubeflow/trainer/backends/kubernetes/backend.py index b523dcf1..fdf3e890 100644 --- a/kubeflow/trainer/backends/kubernetes/backend.py +++ b/kubeflow/trainer/backends/kubernetes/backend.py @@ -15,12 +15,12 @@ import copy import logging import multiprocessing -import queue import random import string import time import uuid -from typing import Optional, Union +from typing import Optional, Union, Iterator +import re from kubeflow.trainer.constants import constants from kubeflow.trainer.types import types @@ -173,7 +173,7 @@ def print_packages(): ) self.wait_for_job_status(job_name) - print(self.get_job_logs(job_name)["node-0"]) + print(self.get_job_logs(job_name)) self.delete_job(job_name) def train( @@ -328,92 +328,49 @@ def get_job_logs( self, name: str, follow: Optional[bool] = False, - step: str = constants.NODE, - node_rank: int = 0, - ) -> dict[str, str]: - """Get the logs from TrainJob""" + step: str = constants.NODE + "-0", + ) -> Iterator[str]: + """Get the TrainJob logs""" # Get the TrainJob Pod name. pod_name = None for c in self.get_job(name).steps: - if c.status != constants.POD_PENDING: - if c.name == step or c.name == f"{step}-{node_rank}": - pod_name = c.pod_name + if c.status != constants.POD_PENDING and c.name == step: + pod_name = c.pod_name + break if pod_name is None: - return {} - - # Dict where key is the Pod type and value is the Pod logs. - logs_dict = {} - - # TODO (andreyvelich): Potentially, refactor this. - # Support logging of multiple Pods. - # TODO (andreyvelich): Currently, follow is supported only for node container. - if follow and step == constants.NODE: - log_streams = [] - log_streams.append( - watch.Watch().stream( + return iter([]) + + try: + if follow: + log_stream = watch.Watch().stream( self.core_api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, - container=constants.NODE, + container=re.sub(r"-\d+$", "", step), # Remove the number for the node step. + follow=True, ) - ) - finished = [False] * len(log_streams) - - # Create thread and queue per stream, for non-blocking iteration. - log_queue_pool = utils.get_log_queue_pool(log_streams) - # Iterate over every watching pods' log queue - while True: - for index, log_queue in enumerate(log_queue_pool): - if all(finished): + # Stream logs incrementally + for logline in log_stream: + if logline is None: break - if finished[index]: - continue - # grouping the every 50 log lines of the same pod. - for _ in range(50): - try: - logline = log_queue.get(timeout=1) - if logline is None: - finished[index] = True - break - # Print logs to the StdOut and update results dict. - print(f"[{step}-{node_rank}]: {logline}") - logs_dict[f"{step}-{node_rank}"] = ( - logs_dict.get(f"{step}-{node_rank}", "") + logline + "\n" - ) - except queue.Empty: - break - if all(finished): - return logs_dict - - try: - if step == constants.DATASET_INITIALIZER: - logs_dict[constants.DATASET_INITIALIZER] = self.core_api.read_namespaced_pod_log( - name=pod_name, - namespace=self.namespace, - container=constants.DATASET_INITIALIZER, - ) - elif step == constants.MODEL_INITIALIZER: - logs_dict[constants.MODEL_INITIALIZER] = self.core_api.read_namespaced_pod_log( - name=pod_name, - namespace=self.namespace, - container=constants.MODEL_INITIALIZER, - ) + yield logline else: - logs_dict[f"{step}-{node_rank}"] = self.core_api.read_namespaced_pod_log( + logs = self.core_api.read_namespaced_pod_log( name=pod_name, namespace=self.namespace, - container=constants.NODE, + container=re.sub(r"-\d+$", "", step), # Remove the number for the node step. ) + for line in logs.splitlines(): + yield line + except Exception as e: raise RuntimeError( f"Failed to read logs for the pod {self.namespace}/{pod_name}" ) from e - return logs_dict - def wait_for_job_status( self, name: str, From 8e10eed679cd3d07c8a5d46119aca6757822a48d Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Wed, 3 Sep 2025 02:36:37 +0100 Subject: [PATCH 2/7] Fix unit tests Signed-off-by: Andrey Velichkevich --- kubeflow/trainer/backends/kubernetes/backend.py | 1 - kubeflow/trainer/backends/kubernetes/backend_test.py | 12 ++++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/kubeflow/trainer/backends/kubernetes/backend.py b/kubeflow/trainer/backends/kubernetes/backend.py index fdf3e890..23d1edb2 100644 --- a/kubeflow/trainer/backends/kubernetes/backend.py +++ b/kubeflow/trainer/backends/kubernetes/backend.py @@ -331,7 +331,6 @@ def get_job_logs( step: str = constants.NODE + "-0", ) -> Iterator[str]: """Get the TrainJob logs""" - # Get the TrainJob Pod name. pod_name = None for c in self.get_job(name).steps: diff --git a/kubeflow/trainer/backends/kubernetes/backend_test.py b/kubeflow/trainer/backends/kubernetes/backend_test.py index 366c3c16..4c24404c 100644 --- a/kubeflow/trainer/backends/kubernetes/backend_test.py +++ b/kubeflow/trainer/backends/kubernetes/backend_test.py @@ -917,14 +917,12 @@ def test_list_jobs(trainer_client, test_case): name="valid flow with all defaults", expected_status=SUCCESS, config={"name": BASIC_TRAIN_JOB_NAME}, - expected_output={ - "node-0": "test log content", - }, + expected_output=["test log content"], ), TestCase( name="runtime error when getting logs", expected_status=FAILED, - config={"name": RUNTIME}, + config={"name": BASIC_TRAIN_JOB_NAME, "namespace": FAIL_LOGS}, expected_error=RuntimeError, ), ], @@ -933,10 +931,12 @@ def test_get_job_logs(trainer_client, test_case): """Test TrainerClient.get_job_logs with basic success path.""" print("Executing test:", test_case.name) try: + trainer_client.namespace = test_case.config.get("namespace", DEFAULT_NAMESPACE) logs = trainer_client.get_job_logs(test_case.config.get("name")) + # Convert iterator to list for comparison. + logs_list = list(logs) assert test_case.expected_status == SUCCESS - assert logs == test_case.expected_output - + assert logs_list == test_case.expected_output except Exception as e: assert type(e) is test_case.expected_error print("test execution complete") From 5ce8e7bf0836722629fec5a4d916648faf20f8d2 Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Wed, 3 Sep 2025 02:42:44 +0100 Subject: [PATCH 3/7] Remove unused func Signed-off-by: Andrey Velichkevich --- .../trainer/backends/kubernetes/backend.py | 2 +- kubeflow/trainer/utils/utils.py | 21 ------------------- 2 files changed, 1 insertion(+), 22 deletions(-) diff --git a/kubeflow/trainer/backends/kubernetes/backend.py b/kubeflow/trainer/backends/kubernetes/backend.py index 23d1edb2..580502dc 100644 --- a/kubeflow/trainer/backends/kubernetes/backend.py +++ b/kubeflow/trainer/backends/kubernetes/backend.py @@ -354,7 +354,7 @@ def get_job_logs( for logline in log_stream: if logline is None: break - yield logline + yield logline # type:ignore else: logs = self.core_api.read_namespaced_pod_log( name=pod_name, diff --git a/kubeflow/trainer/utils/utils.py b/kubeflow/trainer/utils/utils.py index b90127b4..23079201 100644 --- a/kubeflow/trainer/utils/utils.py +++ b/kubeflow/trainer/utils/utils.py @@ -14,9 +14,7 @@ import inspect import os -import queue import textwrap -import threading from typing import Any, Callable, Optional from urllib.parse import urlparse @@ -571,22 +569,3 @@ def get_model_initializer( ) return model_initializer - - -def wrap_log_stream(q: queue.Queue, log_stream: Any): - while True: - try: - logline = next(log_stream) - q.put(logline) - except StopIteration: - q.put(None) - return - - -def get_log_queue_pool(log_streams: list[Any]) -> list[queue.Queue]: - pool = [] - for log_stream in log_streams: - q = queue.Queue(maxsize=100) - pool.append(q) - threading.Thread(target=wrap_log_stream, args=(q, log_stream)).start() - return pool From 3d9951a651169521dba1f8e6a890436ca82f028e Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Wed, 3 Sep 2025 12:02:50 +0100 Subject: [PATCH 4/7] Update kubeflow/trainer/api/trainer_client.py Co-authored-by: Anya Kramar Signed-off-by: Andrey Velichkevich --- kubeflow/trainer/api/trainer_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubeflow/trainer/api/trainer_client.py b/kubeflow/trainer/api/trainer_client.py index 073e1418..c57ff711 100644 --- a/kubeflow/trainer/api/trainer_client.py +++ b/kubeflow/trainer/api/trainer_client.py @@ -163,7 +163,7 @@ def get_job_logs( Args: name: Name of the TrainJob. step: Step of the TrainJob to collect logs from, like dataset-initializer or node-0. - follow: Whether to stream logs in realtime as they produced. + follow: Whether to stream logs in realtime as they are produced. Returns: Iterator of log lines. From 1b9b34747f01f632074cb9db91d06ba6e8e486ab Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Wed, 3 Sep 2025 12:44:54 +0100 Subject: [PATCH 5/7] Fix print logs Signed-off-by: Andrey Velichkevich --- README.md | 17 ++++++++++------- kubeflow/trainer/backends/kubernetes/backend.py | 12 +++++++----- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 3a845940..0dc7107c 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ [![Join Slack](https://img.shields.io/badge/Join_Slack-blue?logo=slack)](https://www.kubeflow.org/docs/about/community/#kubeflow-slack-channels) [![Coverage Status](https://coveralls.io/repos/github/kubeflow/sdk/badge.svg?branch=main)](https://coveralls.io/github/kubeflow/sdk?branch=main) [![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/kubeflow/sdk) + ## Overview @@ -36,6 +37,7 @@ ML applications rather than managing complex infrastrutcure. ```bash pip install git+https://github.com/kubeflow/sdk.git@main ``` + ### Run your first PyTorch distributed job @@ -49,7 +51,7 @@ def get_torch_dist(): import torch.distributed as dist dist.init_process_group(backend="gloo") - print(f"PyTorch Distributed Environment") + print("PyTorch Distributed Environment") print(f"WORLD_SIZE: {dist.get_world_size()}") print(f"RANK: {dist.get_rank()}") print(f"LOCAL_RANK: {os.environ['LOCAL_RANK']}") @@ -70,17 +72,17 @@ job_id = TrainerClient().train( TrainerClient().wait_for_job_status(job_id) # Print TrainJob logs -print(TrainerClient().get_job_logs(name=job_id, node_rank=0)["node-0"]) +print("\n".join(TrainerClient().get_job_logs(name=job_id))) ``` ## Supported Kubeflow Projects -| Project | Status | Description | -|-----------------------------|--------|------------------------------------------------------------| +| Project | Status | Description | +| --------------------------- | ---------------- | ---------------------------------------------------------- | | **Kubeflow Trainer** | ✅ **Available** | Train and fine-tune AI models with various frameworks | -| **Kubeflow Katib** | 🚧 Planned | Hyperparameter optimization | -| **Kubeflow Pipelines** | 🚧 Planned | Build, run, and track AI workflows | -| **Kubeflow Model Registry** | 🚧 Planned | Manage model artifacts, versions and ML artifacts metadata | +| **Kubeflow Katib** | 🚧 Planned | Hyperparameter optimization | +| **Kubeflow Pipelines** | 🚧 Planned | Build, run, and track AI workflows | +| **Kubeflow Model Registry** | 🚧 Planned | Manage model artifacts, versions and ML artifacts metadata | ## Community @@ -98,6 +100,7 @@ Kubeflow SDK is a community project and is still under active development. We we ## Documentation + - **[Design Document](https://docs.google.com/document/d/1rX7ELAHRb_lvh0Y7BK1HBYAbA0zi9enB0F_358ZC58w/edit)**: Kubeflow SDK design proposal - **[Component Guides](https://www.kubeflow.org/docs/components/)**: Individual component documentation - **[DeepWiki](https://deepwiki.com/kubeflow/sdk)**: AI-powered repository documentation diff --git a/kubeflow/trainer/backends/kubernetes/backend.py b/kubeflow/trainer/backends/kubernetes/backend.py index 580502dc..531f22f2 100644 --- a/kubeflow/trainer/backends/kubernetes/backend.py +++ b/kubeflow/trainer/backends/kubernetes/backend.py @@ -173,7 +173,7 @@ def print_packages(): ) self.wait_for_job_status(job_name) - print(self.get_job_logs(job_name)) + print("\n".join(self.get_job_logs(name=job_name))) self.delete_job(job_name) def train( @@ -338,28 +338,30 @@ def get_job_logs( pod_name = c.pod_name break if pod_name is None: - return iter([]) + return + # Remove the number for the node step. + container_name = re.sub(r"-\d+$", "", step) try: if follow: log_stream = watch.Watch().stream( self.core_api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, - container=re.sub(r"-\d+$", "", step), # Remove the number for the node step. + container=container_name, follow=True, ) # Stream logs incrementally for logline in log_stream: if logline is None: - break + return yield logline # type:ignore else: logs = self.core_api.read_namespaced_pod_log( name=pod_name, namespace=self.namespace, - container=re.sub(r"-\d+$", "", step), # Remove the number for the node step. + container=container_name, ) for line in logs.splitlines(): From 72a6b2e968302c8ac4092acad4279fd203ce19fb Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Wed, 3 Sep 2025 12:52:34 +0100 Subject: [PATCH 6/7] Rename TrainerClient to KubernetesBackend in tests Signed-off-by: Andrey Velichkevich --- .../backends/kubernetes/backend_test.py | 82 +++++++++---------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/kubeflow/trainer/backends/kubernetes/backend_test.py b/kubeflow/trainer/backends/kubernetes/backend_test.py index 4c24404c..e77d5169 100644 --- a/kubeflow/trainer/backends/kubernetes/backend_test.py +++ b/kubeflow/trainer/backends/kubernetes/backend_test.py @@ -13,10 +13,10 @@ # limitations under the License. """ -Unit tests for the TrainerClient class in the Kubeflow Trainer SDK. +Unit tests for the KubernetesBackend class in the Kubeflow Trainer SDK. This module uses pytest and unittest.mock to simulate Kubernetes API interactions. -It tests TrainerClient's behavior across job listing, resource creation etc +It tests KubernetesBackend's behavior across job listing, resource creation etc """ import datetime @@ -77,8 +77,8 @@ class TestCase: @pytest.fixture -def trainer_client(request): - """Provide a TrainerClient with mocked Kubernetes APIs.""" +def kubernetes_backend(request): + """Provide a KubernetesBackend with mocked Kubernetes APIs.""" with ( patch("kubernetes.config.load_kube_config", return_value=None), patch( @@ -598,11 +598,11 @@ def get_train_job_data_type( ), ], ) -def test_get_runtime(trainer_client, test_case): - """Test TrainerClient.get_runtime with basic success path.""" +def test_get_runtime(kubernetes_backend, test_case): + """Test KubernetesBackend.get_runtime with basic success path.""" print("Executing test:", test_case.name) try: - runtime = trainer_client.get_runtime(**test_case.config) + runtime = kubernetes_backend.get_runtime(**test_case.config) assert test_case.expected_status == SUCCESS assert isinstance(runtime, types.Runtime) @@ -627,12 +627,12 @@ def test_get_runtime(trainer_client, test_case): ), ], ) -def test_list_runtimes(trainer_client, test_case): - """Test TrainerClient.list_runtimes with basic success path.""" +def test_list_runtimes(kubernetes_backend, test_case): + """Test KubernetesBackend.list_runtimes with basic success path.""" print("Executing test:", test_case.name) try: - trainer_client.namespace = test_case.config.get("namespace", DEFAULT_NAMESPACE) - runtimes = trainer_client.list_runtimes() + kubernetes_backend.namespace = test_case.config.get("namespace", DEFAULT_NAMESPACE) + runtimes = kubernetes_backend.list_runtimes() assert test_case.expected_status == SUCCESS assert isinstance(runtimes, list) @@ -671,12 +671,12 @@ def test_list_runtimes(trainer_client, test_case): ), ], ) -def test_get_runtime_packages(trainer_client, test_case): - """Test TrainerClient.get_runtime_packages with basic success path.""" +def test_get_runtime_packages(kubernetes_backend, test_case): + """Test KubernetesBackend.get_runtime_packages with basic success path.""" print("Executing test:", test_case.name) try: - trainer_client.get_runtime_packages(**test_case.config) + kubernetes_backend.get_runtime_packages(**test_case.config) except Exception as e: assert type(e) is test_case.expected_error @@ -790,14 +790,14 @@ def test_get_runtime_packages(trainer_client, test_case): ), ], ) -def test_train(trainer_client, test_case): - """Test TrainerClient.train with basic success path.""" +def test_train(kubernetes_backend, test_case): + """Test KubernetesBackend.train with basic success path.""" print("Executing test:", test_case.name) try: - trainer_client.namespace = test_case.config.get("namespace", DEFAULT_NAMESPACE) - runtime = trainer_client.get_runtime(test_case.config.get("runtime", TORCH_RUNTIME)) + kubernetes_backend.namespace = test_case.config.get("namespace", DEFAULT_NAMESPACE) + runtime = kubernetes_backend.get_runtime(test_case.config.get("runtime", TORCH_RUNTIME)) - train_job_name = trainer_client.train( + train_job_name = kubernetes_backend.train( runtime=runtime, trainer=test_case.config.get("trainer", None) ) @@ -808,7 +808,7 @@ def test_train(trainer_client, test_case): expected_output = test_case.expected_output expected_output.metadata.name = train_job_name - trainer_client.custom_api.create_namespaced_custom_object.assert_called_with( + kubernetes_backend.custom_api.create_namespaced_custom_object.assert_called_with( constants.GROUP, constants.VERSION, DEFAULT_NAMESPACE, @@ -847,11 +847,11 @@ def test_train(trainer_client, test_case): ), ], ) -def test_get_job(trainer_client, test_case): - """Test TrainerClient.get_job with basic success path.""" +def test_get_job(kubernetes_backend, test_case): + """Test KubernetesBackend.get_job with basic success path.""" print("Executing test:", test_case.name) try: - job = trainer_client.get_job(**test_case.config) + job = kubernetes_backend.get_job(**test_case.config) assert test_case.expected_status == SUCCESS assert asdict(job) == asdict(test_case.expected_output) @@ -893,12 +893,12 @@ def test_get_job(trainer_client, test_case): ), ], ) -def test_list_jobs(trainer_client, test_case): - """Test TrainerClient.list_jobs with basic success path.""" +def test_list_jobs(kubernetes_backend, test_case): + """Test KubernetesBackend.list_jobs with basic success path.""" print("Executing test:", test_case.name) try: - trainer_client.namespace = test_case.config.get("namespace", DEFAULT_NAMESPACE) - jobs = trainer_client.list_jobs() + kubernetes_backend.namespace = test_case.config.get("namespace", DEFAULT_NAMESPACE) + jobs = kubernetes_backend.list_jobs() assert test_case.expected_status == SUCCESS assert isinstance(jobs, list) @@ -927,12 +927,12 @@ def test_list_jobs(trainer_client, test_case): ), ], ) -def test_get_job_logs(trainer_client, test_case): - """Test TrainerClient.get_job_logs with basic success path.""" +def test_get_job_logs(kubernetes_backend, test_case): + """Test KubernetesBackend.get_job_logs with basic success path.""" print("Executing test:", test_case.name) try: - trainer_client.namespace = test_case.config.get("namespace", DEFAULT_NAMESPACE) - logs = trainer_client.get_job_logs(test_case.config.get("name")) + kubernetes_backend.namespace = test_case.config.get("namespace", DEFAULT_NAMESPACE) + logs = kubernetes_backend.get_job_logs(test_case.config.get("name")) # Convert iterator to list for comparison. logs_list = list(logs) assert test_case.expected_status == SUCCESS @@ -1007,11 +1007,11 @@ def test_get_job_logs(trainer_client, test_case): ), ], ) -def test_wait_for_job_status(trainer_client, test_case): - """Test TrainerClient.wait_for_job_status with various scenarios.""" +def test_wait_for_job_status(kubernetes_backend, test_case): + """Test KubernetesBackend.wait_for_job_status with various scenarios.""" print("Executing test:", test_case.name) - original_get_job = trainer_client.get_job + original_get_job = kubernetes_backend.get_job # TrainJob has unexpected failed status. def mock_get_job(name): @@ -1020,10 +1020,10 @@ def mock_get_job(name): job.status = constants.TRAINJOB_FAILED return job - trainer_client.get_job = mock_get_job + kubernetes_backend.get_job = mock_get_job try: - job = trainer_client.wait_for_job_status(**test_case.config) + job = kubernetes_backend.wait_for_job_status(**test_case.config) assert test_case.expected_status == SUCCESS assert isinstance(job, types.TrainJob) @@ -1059,15 +1059,15 @@ def mock_get_job(name): ), ], ) -def test_delete_job(trainer_client, test_case): - """Test TrainerClient.delete_job with basic success path.""" +def test_delete_job(kubernetes_backend, test_case): + """Test KubernetesBackend.delete_job with basic success path.""" print("Executing test:", test_case.name) try: - trainer_client.namespace = test_case.config.get("namespace", DEFAULT_NAMESPACE) - trainer_client.delete_job(test_case.config.get("name")) + kubernetes_backend.namespace = test_case.config.get("namespace", DEFAULT_NAMESPACE) + kubernetes_backend.delete_job(test_case.config.get("name")) assert test_case.expected_status == SUCCESS - trainer_client.custom_api.delete_namespaced_custom_object.assert_called_with( + kubernetes_backend.custom_api.delete_namespaced_custom_object.assert_called_with( constants.GROUP, constants.VERSION, test_case.config.get("namespace", DEFAULT_NAMESPACE), From 70fb14231dea5f75cc0107cc6075814712633be9 Mon Sep 17 00:00:00 2001 From: Andrey Velichkevich Date: Wed, 3 Sep 2025 15:19:29 +0100 Subject: [PATCH 7/7] Remove empty return from watch stream logs Signed-off-by: Andrey Velichkevich --- kubeflow/trainer/backends/kubernetes/backend.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kubeflow/trainer/backends/kubernetes/backend.py b/kubeflow/trainer/backends/kubernetes/backend.py index 531f22f2..5cd9af77 100644 --- a/kubeflow/trainer/backends/kubernetes/backend.py +++ b/kubeflow/trainer/backends/kubernetes/backend.py @@ -352,10 +352,8 @@ def get_job_logs( follow=True, ) - # Stream logs incrementally + # Stream logs incrementally. for logline in log_stream: - if logline is None: - return yield logline # type:ignore else: logs = self.core_api.read_namespaced_pod_log(