From d792ebc83116d740de802509c87252190cdba38a Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Wed, 30 Jul 2025 12:49:00 +0200 Subject: [PATCH 1/3] feat(trainer): Add environment variables argument to CustomTrainer Signed-off-by: Antonin Stefanutti --- .../trainer/api/trainer_client_test.py | 33 +++++++++++++++++-- python/kubeflow/trainer/types/types.py | 2 ++ python/kubeflow/trainer/utils/utils.py | 12 +++++++ 3 files changed, 45 insertions(+), 2 deletions(-) diff --git a/python/kubeflow/trainer/api/trainer_client_test.py b/python/kubeflow/trainer/api/trainer_client_test.py index 410dbfaf..b53b3ef7 100644 --- a/python/kubeflow/trainer/api/trainer_client_test.py +++ b/python/kubeflow/trainer/api/trainer_client_test.py @@ -69,6 +69,7 @@ class TestCase: TRAIN_JOBS = "trainjobs" TRAIN_JOB_WITH_BUILT_IN_TRAINER = "train-job-with-built-in-trainer" TRAIN_JOB_WITH_CUSTOM_TRAINER = "train-job-with-custom-trainer" +TRAIN_JOB_WITH_CUSTOM_TRAINER_ENV = "train-job-with-custom-trainer-env" # -------------------------- @@ -221,12 +222,12 @@ def get_resource_requirements() -> models.IoK8sApiCoreV1ResourceRequirements: ) -def get_custom_trainer() -> models.TrainerV1alpha1Trainer: +def get_custom_trainer(include_env: bool = False) -> models.TrainerV1alpha1Trainer: """ Get the custom trainer for the TrainJob. """ - return models.TrainerV1alpha1Trainer( + trainer = models.TrainerV1alpha1Trainer( command=["bash", "-c"], args=[ '\nif ! [ -x "$(command -v pip)" ]; then\n python -m ensurepip ' @@ -241,6 +242,13 @@ def get_custom_trainer() -> models.TrainerV1alpha1Trainer: numNodes=2, ) + if include_env: + trainer.env = [ + models.IoK8sApiCoreV1EnvVar(name="TEST_ENV", value="test_value"), + models.IoK8sApiCoreV1EnvVar(name="ANOTHER_ENV", value="another_value"), + ] + + return trainer def get_builtin_trainer() -> models.TrainerV1alpha1Trainer: """ @@ -695,6 +703,27 @@ def test_list_runtimes(training_client, test_case): train_job_trainer=get_custom_trainer(), ), ), + TestCase( + name="valid flow with custom trainer and env vars", + expected_status=SUCCESS, + config={ + "trainer": types.CustomTrainer( + func=lambda: print("Hello World"), + func_args={"learning_rate": 0.001, "batch_size": 32}, + packages_to_install=["torch", "numpy"], + pip_index_url=constants.DEFAULT_PIP_INDEX_URL, + num_nodes=2, + env={ + "TEST_ENV": "test_value", + "ANOTHER_ENV": "another_value", + }, + ) + }, + expected_output=get_train_job( + train_job_name=TRAIN_JOB_WITH_CUSTOM_TRAINER_ENV, + train_job_trainer=get_custom_trainer(include_env=True), + ), + ), TestCase( name="timeout error when deleting job", expected_status=FAILED, diff --git a/python/kubeflow/trainer/types/types.py b/python/kubeflow/trainer/types/types.py index 2220b97c..17e8ca06 100644 --- a/python/kubeflow/trainer/types/types.py +++ b/python/kubeflow/trainer/types/types.py @@ -35,6 +35,7 @@ class CustomTrainer: pip_index_url (`Optional[str]`): The PyPI URL from which to install Python packages. num_nodes (`Optional[int]`): The number of nodes to use for training. resources_per_node (`Optional[Dict]`): The computing resources to allocate per node. + env (`Optional[Dict[str, str]]`): Environment variables to set in the training containers. """ func: Callable @@ -43,6 +44,7 @@ class CustomTrainer: pip_index_url: str = constants.DEFAULT_PIP_INDEX_URL num_nodes: Optional[int] = None resources_per_node: Optional[Dict] = None + env: Optional[Dict[str, str]] = None # TODO(Electronic-Waste): Add more loss functions. diff --git a/python/kubeflow/trainer/utils/utils.py b/python/kubeflow/trainer/utils/utils.py index 8837c5fc..66d882fb 100644 --- a/python/kubeflow/trainer/utils/utils.py +++ b/python/kubeflow/trainer/utils/utils.py @@ -413,6 +413,18 @@ def get_trainer_crd_from_custom_trainer( trainer.packages_to_install, ) + # Add environment variables to the Trainer. + if trainer.env: + env_vars = [] + for key, value in trainer.env.items(): + env_vars.append( + models.IoK8sApiCoreV1EnvVar( + name=key, + value=value + ) + ) + trainer_crd.env = env_vars + return trainer_crd From 440fcc200d555257d44d965dc286fd0725ffe719 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Fri, 1 Aug 2025 12:35:06 +0200 Subject: [PATCH 2/3] Review feedback Signed-off-by: Antonin Stefanutti --- python/kubeflow/trainer/types/types.py | 2 +- python/kubeflow/trainer/utils/utils.py | 13 ++++--------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/python/kubeflow/trainer/types/types.py b/python/kubeflow/trainer/types/types.py index 17e8ca06..39ed8793 100644 --- a/python/kubeflow/trainer/types/types.py +++ b/python/kubeflow/trainer/types/types.py @@ -35,7 +35,7 @@ class CustomTrainer: pip_index_url (`Optional[str]`): The PyPI URL from which to install Python packages. num_nodes (`Optional[int]`): The number of nodes to use for training. resources_per_node (`Optional[Dict]`): The computing resources to allocate per node. - env (`Optional[Dict[str, str]]`): Environment variables to set in the training containers. + env (`Optional[Dict[str, str]]`): The environment variables to set in the training nodes. """ func: Callable diff --git a/python/kubeflow/trainer/utils/utils.py b/python/kubeflow/trainer/utils/utils.py index 66d882fb..98eb777d 100644 --- a/python/kubeflow/trainer/utils/utils.py +++ b/python/kubeflow/trainer/utils/utils.py @@ -415,15 +415,10 @@ def get_trainer_crd_from_custom_trainer( # Add environment variables to the Trainer. if trainer.env: - env_vars = [] - for key, value in trainer.env.items(): - env_vars.append( - models.IoK8sApiCoreV1EnvVar( - name=key, - value=value - ) - ) - trainer_crd.env = env_vars + trainer_crd.env = [ + models.IoK8sApiCoreV1EnvVar(name=key, value=value) + for key, value in trainer.env.items() + ] return trainer_crd From 359cfd21690441fb583b84c33bb4372e2ce76b17 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Fri, 1 Aug 2025 16:05:51 +0200 Subject: [PATCH 3/3] Add env argument to get_custom_trainer Signed-off-by: Antonin Stefanutti --- .../trainer/api/trainer_client_test.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/python/kubeflow/trainer/api/trainer_client_test.py b/python/kubeflow/trainer/api/trainer_client_test.py index b53b3ef7..da1609cd 100644 --- a/python/kubeflow/trainer/api/trainer_client_test.py +++ b/python/kubeflow/trainer/api/trainer_client_test.py @@ -222,12 +222,14 @@ def get_resource_requirements() -> models.IoK8sApiCoreV1ResourceRequirements: ) -def get_custom_trainer(include_env: bool = False) -> models.TrainerV1alpha1Trainer: +def get_custom_trainer( + env: Optional[list[models.IoK8sApiCoreV1EnvVar]] = None, +) -> models.TrainerV1alpha1Trainer: """ Get the custom trainer for the TrainJob. """ - trainer = models.TrainerV1alpha1Trainer( + return models.TrainerV1alpha1Trainer( command=["bash", "-c"], args=[ '\nif ! [ -x "$(command -v pip)" ]; then\n python -m ensurepip ' @@ -240,16 +242,9 @@ def get_custom_trainer(include_env: bool = False) -> models.TrainerV1alpha1Train '"$SCRIPT" > "trainer_client_test.py"\ntorchrun "trainer_client_test.py"' ], numNodes=2, + env=env, ) - if include_env: - trainer.env = [ - models.IoK8sApiCoreV1EnvVar(name="TEST_ENV", value="test_value"), - models.IoK8sApiCoreV1EnvVar(name="ANOTHER_ENV", value="another_value"), - ] - - return trainer - def get_builtin_trainer() -> models.TrainerV1alpha1Trainer: """ Get the builtin trainer for the TrainJob. @@ -721,7 +716,12 @@ def test_list_runtimes(training_client, test_case): }, expected_output=get_train_job( train_job_name=TRAIN_JOB_WITH_CUSTOM_TRAINER_ENV, - train_job_trainer=get_custom_trainer(include_env=True), + train_job_trainer=get_custom_trainer( + env = [ + models.IoK8sApiCoreV1EnvVar(name="TEST_ENV", value="test_value"), + models.IoK8sApiCoreV1EnvVar(name="ANOTHER_ENV", value="another_value"), + ], + ), ), ), TestCase(