Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions sdk/python/kubeflow/training/api/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ def create_job(
env_vars: Optional[
Union[Dict[str, str], List[Union[models.V1EnvVar, models.V1EnvVar]]]
] = None,
volumes: Optional[List[models.V1Volume]] = None,
volume_mounts: Optional[List[models.V1VolumeMount]] = None,
Comment on lines +357 to +358
Copy link
Member

@andreyvelich andreyvelich Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How we can make it easier to configure from the ML engineer perspective ?

I know that @truc0 has implemented it in a way that we can accept PVC, ConfigMap, or Secret as volume for Trial: kubeflow/katib#2508. But it still uses Kubernetes APIs.

@hbelmiro @rimolive @HumairAK Do we know if we have some sort of Kubernetes volume support in KFP V2 ?
I noticed that it is not intended to be implemented in V2: kubeflow/pipelines#8570
In that case, how users can attach volumes to their component in a workflow ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In KFP users can access underlying k8s pod spec apis like mounting secrets/volumes/configmaps etc. (example for volumes here)
This exposes k8s apis to the KFP sdk, so it begs the question how friendly this is to the ML engineer persona, one who is not so k8s aware.

I'm not caught up on what use case this PR is looking to resolve, but in KFP we have a specific case of data passing between pipeline jobs, the storage of this data is abstracted via object store, but we see that users sometimes want to use their PVC for this instead without having to manually configure these PVC's in the pipeline. The end result being, you just declare what inputs/outputs you want in your python pipeline sdk, and under the hood we automatically utilize the PVC that you provided when you deployed KFP. This is a feature we'd like to introduce to KFP in the future, you can track it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use case if for any distributed training job to be able to persist checkpoints. For example, this is what the InstructLab fine-tuning step does, and that can only be done via the PyTorchJob API at the moment, not via the SDK.
With this PR, it'll also be possible to do it with the SDK.

Stated differently, without a way to pass a PVC to the PyTorchJob via the SDK, the SDK is rather useless for any real distributed training jobs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @astefanutti, I just think whether we should give MLE control to all Volume and VolumeMount APIs.
Maybe specifying the volumeName is Sufficient enough.
Maybe we can improve it in V2 SDK given the separation between TrainJob and TrainingRuntime.

):
"""Create the Training Job.
Job can be created using one of the following options:
Expand Down Expand Up @@ -418,6 +420,8 @@ def create_job(
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1EnvVar.md)
or a kubernetes.client.models.V1EnvFromSource (documented here:
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1EnvFromSource.md)
volumes: Volume(s) to be attached to the replicas.
volume_mounts: VolumeMount(s) specifying where to mount the volume(s) into the replicas.

Raises:
ValueError: Invalid input parameters.
Expand Down Expand Up @@ -448,6 +452,12 @@ def create_job(
f"Job kind must be one of these: {constants.JOB_PARAMETERS.keys()}"
)

if len(volumes or []) != len(volume_mounts or []):
raise ValueError(
"Volumes and VolumeMounts must be the same length: "
f"{len(volumes or [])} vs. {len(volume_mounts or [])}"
)

# If Training function or base image is set, configure Job template.
if job is None and (train_func is not None or base_image is not None):
# Job name must be set to configure Job template.
Expand Down Expand Up @@ -496,11 +506,13 @@ def create_job(
args=args,
resources=resources_per_worker,
env_vars=env_vars,
volume_mounts=volume_mounts,
)

# Get Pod template spec using the above container.
pod_template_spec = utils.get_pod_template_spec(
containers=[container_spec],
volumes=volumes,
)

# Configure template for different Jobs.
Expand Down
44 changes: 42 additions & 2 deletions sdk/python/kubeflow/training/api/training_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
V1ObjectMeta,
V1PodSpec,
V1PodTemplateSpec,
V1Volume,
V1VolumeMount,
)

TEST_NAME = "test"
Expand Down Expand Up @@ -142,6 +144,8 @@ def create_job(
args=None,
num_workers=2,
env_vars=None,
volumes=None,
volume_mounts=None,
):
# Handle env_vars as either a dict or a list
if env_vars:
Expand All @@ -158,6 +162,7 @@ def create_job(
command=command,
args=args,
env=env_vars,
volume_mounts=volume_mounts,
)

master = KubeflowOrgV1ReplicaSpec(
Expand All @@ -166,7 +171,10 @@ def create_job(
metadata=V1ObjectMeta(
annotations={constants.ISTIO_SIDECAR_INJECTION: "false"}
),
spec=V1PodSpec(containers=[container]),
spec=V1PodSpec(
containers=[container],
volumes=volumes,
),
),
)

Expand All @@ -180,7 +188,10 @@ def create_job(
metadata=V1ObjectMeta(
annotations={constants.ISTIO_SIDECAR_INJECTION: "false"}
),
spec=V1PodSpec(containers=[container]),
spec=V1PodSpec(
containers=[container],
volumes=volumes,
),
),
)

Expand Down Expand Up @@ -530,6 +541,35 @@ def __init__(self):
env_vars=[V1EnvVar(name="ENV_VAR", value="env_value")], num_workers=2
),
),
(
"create job with a volume and a volume mount",
{
"name": TEST_NAME,
"namespace": TEST_NAME,
"base_image": TEST_IMAGE,
"num_workers": 1,
"volumes": [V1Volume(name="vol")],
"volume_mounts": [V1VolumeMount(name="vol", mount_path="/mnt")],
},
SUCCESS,
create_job(
num_workers=1,
volumes=[V1Volume(name="vol")],
volume_mounts=[V1VolumeMount(name="vol", mount_path="/mnt")],
),
),
(
"invalid number of volume mount",
{
"name": TEST_NAME,
"namespace": TEST_NAME,
"base_image": TEST_IMAGE,
"num_workers": 1,
"volumes": [V1Volume(name="vol")],
},
ValueError,
None,
),
]

test_data_get_job_pods = [
Expand Down
Loading