Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
[![Join Slack](https://img.shields.io/badge/Join_Slack-blue?logo=slack)](https://www.kubeflow.org/docs/about/community/#kubeflow-slack-channels)
[![Coverage Status](https://coveralls.io/repos/github/kubeflow/sdk/badge.svg?branch=main)](https://coveralls.io/github/kubeflow/sdk?branch=main)
[![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/kubeflow/sdk)

<!-- TODO(kramaranya): update when release [![Python Supported Versions](https://img.shields.io/pypi/pyversions/kubeflow.svg?color=%2334D058)](https://pypi.org/project/kubeflow/) -->

## Overview
Expand Down Expand Up @@ -36,6 +37,7 @@ ML applications rather than managing complex infrastrutcure.
```bash
pip install git+https://github.com/kubeflow/sdk.git@main
```

<!-- TODO(kramaranya): update before release pip install -U kubeflow -->

### Run your first PyTorch distributed job
Expand All @@ -49,7 +51,7 @@ def get_torch_dist():
import torch.distributed as dist

dist.init_process_group(backend="gloo")
print(f"PyTorch Distributed Environment")
print("PyTorch Distributed Environment")
print(f"WORLD_SIZE: {dist.get_world_size()}")
print(f"RANK: {dist.get_rank()}")
print(f"LOCAL_RANK: {os.environ['LOCAL_RANK']}")
Expand All @@ -70,17 +72,17 @@ job_id = TrainerClient().train(
TrainerClient().wait_for_job_status(job_id)

# Print TrainJob logs
print(TrainerClient().get_job_logs(name=job_id, node_rank=0)["node-0"])
print("\n".join(TrainerClient().get_job_logs(name=job_id)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it worth showing follow=True here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default wait_for_job_status() waits until TrainJob is complete, so showing example with follow is unnecessary here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, missed this part

```

## Supported Kubeflow Projects

| Project | Status | Description |
|-----------------------------|--------|------------------------------------------------------------|
| Project | Status | Description |
| --------------------------- | ---------------- | ---------------------------------------------------------- |
| **Kubeflow Trainer** | ✅ **Available** | Train and fine-tune AI models with various frameworks |
| **Kubeflow Katib** | 🚧 Planned | Hyperparameter optimization |
| **Kubeflow Pipelines** | 🚧 Planned | Build, run, and track AI workflows |
| **Kubeflow Model Registry** | 🚧 Planned | Manage model artifacts, versions and ML artifacts metadata |
| **Kubeflow Katib** | 🚧 Planned | Hyperparameter optimization |
| **Kubeflow Pipelines** | 🚧 Planned | Build, run, and track AI workflows |
| **Kubeflow Model Registry** | 🚧 Planned | Manage model artifacts, versions and ML artifacts metadata |

## Community

Expand All @@ -98,6 +100,7 @@ Kubeflow SDK is a community project and is still under active development. We we
## Documentation

<!-- TODO(kramaranya): add kubeflow sdk docs -->

- **[Design Document](https://docs.google.com/document/d/1rX7ELAHRb_lvh0Y7BK1HBYAbA0zi9enB0F_358ZC58w/edit)**: Kubeflow SDK design proposal
- **[Component Guides](https://www.kubeflow.org/docs/components/)**: Individual component documentation
- **[DeepWiki](https://deepwiki.com/kubeflow/sdk)**: AI-powered repository documentation
Expand Down
36 changes: 28 additions & 8 deletions kubeflow/trainer/api/trainer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import logging
from typing import Optional, Union
from typing import Optional, Union, Iterator

from kubeflow.trainer.constants import constants
from kubeflow.trainer.types import types
Expand Down Expand Up @@ -120,8 +120,7 @@ def list_jobs(self, runtime: Optional[types.Runtime] = None) -> list[types.Train
runtime: Reference to one of the existing runtimes.

Returns:
List: List of created TrainJobs.
If no TrainJob exist, an empty list is returned.
List of created TrainJobs. If no TrainJob exist, an empty list is returned.

Raises:
TimeoutError: Timeout to list TrainJobs.
Expand All @@ -148,12 +147,33 @@ def get_job(self, name: str) -> types.TrainJob:
def get_job_logs(
self,
name: str,
step: str = constants.NODE + "-0",
follow: Optional[bool] = False,
step: str = constants.NODE,
node_rank: int = 0,
) -> dict[str, str]:
"""Get the logs from TrainJob"""
return self.backend.get_job_logs(name=name, follow=follow, step=step, node_rank=node_rank)
) -> Iterator[str]:
"""Get logs from a specific step of a TrainJob.

You can watch for the logs in realtime as follows:
```python
from kubeflow.trainer import TrainerClient

for logline in TrainerClient().get_job_logs(name="s8d44aa4fb6d", follow=True):
print(logline)
```

Args:
name: Name of the TrainJob.
step: Step of the TrainJob to collect logs from, like dataset-initializer or node-0.
follow: Whether to stream logs in realtime as they are produced.

Returns:
Iterator of log lines.


Raises:
TimeoutError: Timeout to get a TrainJob.
RuntimeError: Failed to get a TrainJob.
"""
return self.backend.get_job_logs(name=name, follow=follow, step=step)

def wait_for_job_status(
self,
Expand Down
7 changes: 3 additions & 4 deletions kubeflow/trainer/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import abc

from typing import Optional, Union
from typing import Optional, Union, Iterator
from kubeflow.trainer.constants import constants
from kubeflow.trainer.types import types

Expand Down Expand Up @@ -47,9 +47,8 @@ def get_job_logs(
self,
name: str,
follow: Optional[bool] = False,
step: str = constants.NODE,
node_rank: int = 0,
) -> dict[str, str]:
step: str = constants.NODE + "-0",
) -> Iterator[str]:
raise NotImplementedError()

def wait_for_job_status(
Expand Down
96 changes: 26 additions & 70 deletions kubeflow/trainer/backends/kubernetes/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
import copy
import logging
import multiprocessing
import queue
import random
import string
import time
import uuid
from typing import Optional, Union
from typing import Optional, Union, Iterator
import re

from kubeflow.trainer.constants import constants
from kubeflow.trainer.types import types
Expand Down Expand Up @@ -173,7 +173,7 @@ def print_packages():
)

self.wait_for_job_status(job_name)
print(self.get_job_logs(job_name)["node-0"])
print("\n".join(self.get_job_logs(name=job_name)))
self.delete_job(job_name)

def train(
Expand Down Expand Up @@ -328,92 +328,48 @@ def get_job_logs(
self,
name: str,
follow: Optional[bool] = False,
step: str = constants.NODE,
node_rank: int = 0,
) -> dict[str, str]:
"""Get the logs from TrainJob"""

step: str = constants.NODE + "-0",
) -> Iterator[str]:
"""Get the TrainJob logs"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this docstring here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary, but this is just reminder how this API is used for developers and AI tools 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, sounds good to me!

# Get the TrainJob Pod name.
pod_name = None
for c in self.get_job(name).steps:
if c.status != constants.POD_PENDING:
if c.name == step or c.name == f"{step}-{node_rank}":
pod_name = c.pod_name
if c.status != constants.POD_PENDING and c.name == step:
pod_name = c.pod_name
break
if pod_name is None:
return {}

# Dict where key is the Pod type and value is the Pod logs.
logs_dict = {}

# TODO (andreyvelich): Potentially, refactor this.
# Support logging of multiple Pods.
# TODO (andreyvelich): Currently, follow is supported only for node container.
if follow and step == constants.NODE:
log_streams = []
log_streams.append(
watch.Watch().stream(
self.core_api.read_namespaced_pod_log,
name=pod_name,
namespace=self.namespace,
container=constants.NODE,
)
)
finished = [False] * len(log_streams)

# Create thread and queue per stream, for non-blocking iteration.
log_queue_pool = utils.get_log_queue_pool(log_streams)

# Iterate over every watching pods' log queue
while True:
for index, log_queue in enumerate(log_queue_pool):
if all(finished):
break
if finished[index]:
continue
# grouping the every 50 log lines of the same pod.
for _ in range(50):
try:
logline = log_queue.get(timeout=1)
if logline is None:
finished[index] = True
break
# Print logs to the StdOut and update results dict.
print(f"[{step}-{node_rank}]: {logline}")
logs_dict[f"{step}-{node_rank}"] = (
logs_dict.get(f"{step}-{node_rank}", "") + logline + "\n"
)
except queue.Empty:
break
if all(finished):
return logs_dict
return

# Remove the number for the node step.
container_name = re.sub(r"-\d+$", "", step)
try:
if step == constants.DATASET_INITIALIZER:
logs_dict[constants.DATASET_INITIALIZER] = self.core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self.namespace,
container=constants.DATASET_INITIALIZER,
)
elif step == constants.MODEL_INITIALIZER:
logs_dict[constants.MODEL_INITIALIZER] = self.core_api.read_namespaced_pod_log(
if follow:
log_stream = watch.Watch().stream(
self.core_api.read_namespaced_pod_log,
name=pod_name,
namespace=self.namespace,
container=constants.MODEL_INITIALIZER,
container=container_name,
follow=True,
)

# Stream logs incrementally.
for logline in log_stream:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure each item are entire lines?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, log_stream is <generator object Watch.stream at 0x1073c6340> object, and we can return it line by line.

Copy link
Contributor

@astefanutti astefanutti Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked and it indeed does yield items line by line: ttps://github.com/kubernetes-client/python/blob/6e7c539f52dec4e993d2c32a4408920d8522f47e/kubernetes/base/watch/watch.py#L54-L83

I wasn't sure whether we had to do it ourselves or not.

yield logline # type:ignore
else:
logs_dict[f"{step}-{node_rank}"] = self.core_api.read_namespaced_pod_log(
logs = self.core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self.namespace,
container=constants.NODE,
container=container_name,
)

for line in logs.splitlines():
yield line

except Exception as e:
raise RuntimeError(
f"Failed to read logs for the pod {self.namespace}/{pod_name}"
) from e

return logs_dict

def wait_for_job_status(
self,
name: str,
Expand Down
Loading