Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kubeflow/trainer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
Initializer,
Loss,
Runtime,
RuntimeTrainer,
TorchTuneConfig,
TorchTuneInstructDataset,
RuntimeTrainer,
TrainerType,
)

Expand Down
1 change: 0 additions & 1 deletion kubeflow/trainer/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# ruff: noqa

# import apis into api package

17 changes: 11 additions & 6 deletions kubeflow/trainer/api/trainer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
import logging
from typing import Dict, List, Optional, Set, Union

from kubeflow.trainer.constants import constants
from kubeflow.trainer.types import types
from kubeflow.trainer.backends.kubernetes.backend import KubernetesBackend
from kubeflow.trainer.backends.kubernetes.types import KubernetesBackendConfig

from kubeflow.trainer.constants import constants
from kubeflow.trainer.types import types

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -104,9 +103,13 @@ def train(
TimeoutError: Timeout to create TrainJobs.
RuntimeError: Failed to create TrainJobs.
"""
return self.__backend.train(runtime=runtime, initializer=initializer, trainer=trainer)
return self.__backend.train(
runtime=runtime, initializer=initializer, trainer=trainer
)

def list_jobs(self, runtime: Optional[types.Runtime] = None) -> List[types.TrainJob]:
def list_jobs(
self, runtime: Optional[types.Runtime] = None
) -> List[types.TrainJob]:
"""List of all TrainJobs.

Returns:
Expand All @@ -131,7 +134,9 @@ def get_job_logs(
node_rank: int = 0,
) -> Dict[str, str]:
"""Get the logs from TrainJob"""
return self.__backend.get_job_logs(name=name, follow=follow, step=step, node_rank=node_rank)
return self.__backend.get_job_logs(
name=name, follow=follow, step=step, node_rank=node_rank
)

def wait_for_job_status(
self,
Expand Down
6 changes: 4 additions & 2 deletions kubeflow/trainer/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# limitations under the License.

import abc

from typing import Dict, List, Optional, Set, Union

from kubeflow.trainer.constants import constants
from kubeflow.trainer.types import types

Expand All @@ -37,7 +37,9 @@ def train(
) -> str:
raise NotImplementedError()

def list_jobs(self, runtime: Optional[types.Runtime] = None) -> List[types.TrainJob]:
def list_jobs(
self, runtime: Optional[types.Runtime] = None
) -> List[types.TrainJob]:
raise NotImplementedError()

def get_job(self, name: str) -> types.TrainJob:
Expand Down
118 changes: 83 additions & 35 deletions kubeflow/trainer/backends/kubernetes/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@
import string
import time
import uuid
from typing import Dict, List, Optional, Union, Set
from typing import Dict, List, Optional, Set, Union

from kubeflow.trainer.constants import constants
from kubeflow.trainer.types import types
from kubeflow.trainer.utils import utils
from kubeflow_trainer_api import models
from kubernetes import client, config, watch

from kubeflow.trainer.backends.base import ExecutionBackend
from kubeflow.trainer.backends.kubernetes import types as k8s_types
from kubeflow.trainer.constants import constants
from kubeflow.trainer.types import types
from kubeflow.trainer.utils import utils

logger = logging.getLogger(__name__)

Expand All @@ -45,7 +46,9 @@ def __init__(
if cfg.client_configuration is None:
# Load kube-config or in-cluster config.
if cfg.config_file or not utils.is_running_in_k8s():
config.load_kube_config(config_file=cfg.config_file, context=cfg.context)
config.load_kube_config(
config_file=cfg.config_file, context=cfg.context
)
else:
config.load_incluster_config()

Expand Down Expand Up @@ -141,24 +144,28 @@ def get_runtime_packages(self, runtime: types.Runtime):
runtime_copy.trainer.set_command(tuple(mpi_command))

def print_packages():
import subprocess
import shutil
import subprocess
import sys

# Print Python version.
print(f"Python: {sys.version}")

# Print Python packages.
if shutil.which("pip"):
pip_list = subprocess.run(["pip", "list"], capture_output=True, text=True)
pip_list = subprocess.run(
["pip", "list"], capture_output=True, text=True
)
print(pip_list.stdout)
else:
print("Unable to get installed packages: pip command not found")

# Print nvidia-smi if GPUs are available.
if shutil.which("nvidia-smi"):
print("Available GPUs on the single training node")
nvidia_smi = subprocess.run(["nvidia-smi"], capture_output=True, text=True)
nvidia_smi = subprocess.run(
["nvidia-smi"], capture_output=True, text=True
)
print(nvidia_smi.stdout)

# Create the TrainJob and wait until it completes.
Expand All @@ -168,7 +175,9 @@ def print_packages():
trainer=types.CustomTrainer(
func=print_packages,
num_nodes=1,
resources_per_node=({"cpu": 1} if runtime_copy.trainer.device != "gpu" else None),
resources_per_node=(
{"cpu": 1} if runtime_copy.trainer.device != "gpu" else None
),
),
)

Expand Down Expand Up @@ -196,13 +205,19 @@ def train(
# If users choose to use a custom training function.
if isinstance(trainer, types.CustomTrainer):
if runtime.trainer.trainer_type != types.TrainerType.CUSTOM_TRAINER:
raise ValueError(f"CustomTrainer can't be used with {runtime} runtime")
trainer_crd = utils.get_trainer_crd_from_custom_trainer(runtime, trainer)
raise ValueError(
f"CustomTrainer can't be used with {runtime} runtime"
)
trainer_crd = utils.get_trainer_crd_from_custom_trainer(
runtime, trainer
)

# If users choose to use a builtin trainer for post-training.
elif isinstance(trainer, types.BuiltinTrainer):
if runtime.trainer.trainer_type != types.TrainerType.BUILTIN_TRAINER:
raise ValueError(f"BuiltinTrainer can't be used with {runtime} runtime")
raise ValueError(
f"BuiltinTrainer can't be used with {runtime} runtime"
)
trainer_crd = utils.get_trainer_crd_from_builtin_trainer(
runtime, trainer, initializer
)
Expand All @@ -216,10 +231,16 @@ def train(
train_job = models.TrainerV1alpha1TrainJob(
apiVersion=constants.API_VERSION,
kind=constants.TRAINJOB_KIND,
metadata=models.IoK8sApimachineryPkgApisMetaV1ObjectMeta(name=train_job_name),
metadata=models.IoK8sApimachineryPkgApisMetaV1ObjectMeta(
name=train_job_name
),
spec=models.TrainerV1alpha1TrainJobSpec(
runtimeRef=models.TrainerV1alpha1RuntimeRef(name=runtime.name),
trainer=(trainer_crd if trainer_crd != models.TrainerV1alpha1Trainer() else None),
trainer=(
trainer_crd
if trainer_crd != models.TrainerV1alpha1Trainer()
else None
),
initializer=(
models.TrainerV1alpha1Initializer(
dataset=utils.get_dataset_initializer(initializer.dataset),
Expand Down Expand Up @@ -255,7 +276,9 @@ def train(

return train_job_name

def list_jobs(self, runtime: Optional[types.Runtime] = None) -> List[types.TrainJob]:
def list_jobs(
self, runtime: Optional[types.Runtime] = None
) -> List[types.TrainJob]:
result = []
try:
thread = self.custom_api.list_namespaced_custom_object(
Expand Down Expand Up @@ -314,9 +337,13 @@ def get_job(self, name: str) -> types.TrainJob:
)

except multiprocessing.TimeoutError:
raise TimeoutError(f"Timeout to get {constants.TRAINJOB_KIND}: {self.namespace}/{name}")
raise TimeoutError(
f"Timeout to get {constants.TRAINJOB_KIND}: {self.namespace}/{name}"
)
except Exception:
raise RuntimeError(f"Failed to get {constants.TRAINJOB_KIND}: {self.namespace}/{name}")
raise RuntimeError(
f"Failed to get {constants.TRAINJOB_KIND}: {self.namespace}/{name}"
)

return self.__get_trainjob_from_crd(trainjob) # type: ignore

Expand Down Expand Up @@ -376,7 +403,9 @@ def get_job_logs(
# Print logs to the StdOut and update results dict.
print(f"[{step}-{node_rank}]: {logline}")
logs_dict[f"{step}-{node_rank}"] = (
logs_dict.get(f"{step}-{node_rank}", "") + logline + "\n"
logs_dict.get(f"{step}-{node_rank}", "")
+ logline
+ "\n"
)
except queue.Empty:
break
Expand All @@ -385,26 +414,34 @@ def get_job_logs(

try:
if step == constants.DATASET_INITIALIZER:
logs_dict[constants.DATASET_INITIALIZER] = self.core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self.namespace,
container=constants.DATASET_INITIALIZER,
logs_dict[constants.DATASET_INITIALIZER] = (
self.core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self.namespace,
container=constants.DATASET_INITIALIZER,
)
)
elif step == constants.MODEL_INITIALIZER:
logs_dict[constants.MODEL_INITIALIZER] = self.core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self.namespace,
container=constants.MODEL_INITIALIZER,
logs_dict[constants.MODEL_INITIALIZER] = (
self.core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self.namespace,
container=constants.MODEL_INITIALIZER,
)
)
else:
logs_dict[f"{step}-{node_rank}"] = self.core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self.namespace,
container=constants.NODE,
logs_dict[f"{step}-{node_rank}"] = (
self.core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self.namespace,
container=constants.NODE,
)
)

except Exception:
raise RuntimeError(f"Failed to read logs for the pod {self.namespace}/{pod_name}")
raise RuntimeError(
f"Failed to read logs for the pod {self.namespace}/{pod_name}"
)

return logs_dict

Expand All @@ -422,7 +459,9 @@ def wait_for_job_status(
constants.TRAINJOB_FAILED,
}
if not status.issubset(job_statuses):
raise ValueError(f"Expected status {status} must be a subset of {job_statuses}")
raise ValueError(
f"Expected status {status} must be a subset of {job_statuses}"
)

if polling_interval > timeout:
raise ValueError(
Expand All @@ -447,7 +486,9 @@ def wait_for_job_status(

time.sleep(polling_interval)

raise TimeoutError(f"Timeout waiting for TrainJob {name} to reach status: {status} status")
raise TimeoutError(
f"Timeout waiting for TrainJob {name} to reach status: {status} status"
)

def delete_job(self, name: str):
try:
Expand All @@ -467,7 +508,9 @@ def delete_job(self, name: str):
f"Failed to delete {constants.TRAINJOB_KIND}: {self.namespace}/{name}"
)

logger.debug(f"{constants.TRAINJOB_KIND} {self.namespace}/{name} has been deleted")
logger.debug(
f"{constants.TRAINJOB_KIND} {self.namespace}/{name} has been deleted"
)

def __get_runtime_from_crd(
self,
Expand Down Expand Up @@ -550,7 +593,12 @@ def __get_trainjob_from_crd(
for pod in pod_list.items:
# Pod must have labels to detect the TrainJob step.
# Every Pod always has a single TrainJob step.
if not (pod.metadata and pod.metadata.name and pod.metadata.labels and pod.spec):
if not (
pod.metadata
and pod.metadata.name
and pod.metadata.labels
and pod.spec
):
raise Exception(f"TrainJob Pod is invalid: {pod}")

# Get the Initializer step.
Expand Down
Loading