Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions sdk/python/v1beta1/kubeflow/katib/api/katib_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def tune(
packages_to_install: List[str] = None,
pip_index_url: str = "https://pypi.org/simple",
metrics_collector_config: Dict[str, Any] = {"kind": "StdOut"},
trial_active_deadline_seconds: Optional[int] = None,
):
"""
Create HyperParameter Tuning Katib Experiment using one of the following
Expand Down Expand Up @@ -355,6 +356,11 @@ class name in this argument.
metrics_collector_config: Specify the config of metrics collector,
for example, `metrics_collector_config = {"kind": "Push"}`.
Currently, we only support `StdOut` and `Push` metrics collector.
trial_active_deadline_seconds: Optional timeout in seconds for each trial.
If None, no timeout is applied. For Job-based trials, this sets the
activeDeadlineSeconds field. For PyTorchJob-based trials, this sets the
activeDeadlineSeconds field on the Master replica. This prevents individual
trials from running indefinitely and consuming resources.
Raises:
ValueError: Function arguments have incorrect type or value.
Expand Down Expand Up @@ -511,13 +517,15 @@ class name in this argument.
resources_per_trial,
training_utils.get_pod_template_spec(containers=[container_spec]),
training_utils.get_pod_template_spec(containers=[container_spec]),
trial_active_deadline_seconds,
)
# Otherwise, Trial uses Job for model training.
else:
trial_template = utils.get_trial_template_with_job(
retain_trials,
trial_parameters,
training_utils.get_pod_template_spec(containers=[container_spec]),
trial_active_deadline_seconds,
)

# If users choose to use external models and datasets.
Expand Down Expand Up @@ -694,6 +702,7 @@ class name in this argument.
resources_per_trial,
master_pod_template_spec,
worker_pod_template_spec,
trial_active_deadline_seconds,
)

# Add parameters to the Katib Experiment.
Expand Down
54 changes: 54 additions & 0 deletions sdk/python/v1beta1/kubeflow/katib/api/katib_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,34 @@ def create_experiment(
},
TEST_RESULT_SUCCESS,
),
(
"valid flow with trial_active_deadline_seconds for Job-based trials",
{
"name": "tune_test",
"objective": lambda x: print(f"a={x}"),
"parameters": {"a": katib.search.int(min=10, max=100)},
"objective_metric_name": "a",
"resources_per_trial": {"gpu": "2"},
"trial_active_deadline_seconds": 3600, # 1 hour timeout
},
TEST_RESULT_SUCCESS,
),
(
"valid flow with trial_active_deadline_seconds for PyTorchJob-based trials",
{
"name": "tune_test",
"objective": lambda x: print(f"a={x}"),
"parameters": {"a": katib.search.int(min=10, max=100)},
"objective_metric_name": "a",
"resources_per_trial": types.TrainerResources(
num_workers=2,
num_procs_per_worker=2,
resources_per_worker={"gpu": "2"},
),
"trial_active_deadline_seconds": 7200, # 2 hours timeout
},
TEST_RESULT_SUCCESS,
),
]


Expand Down Expand Up @@ -667,6 +695,32 @@ def test_tune(katib_client, test_name, kwargs, expected_output):
KubeflowOrgV1PyTorchJob,
)

elif test_name == (
"valid flow with trial_active_deadline_seconds for Job-based trials"
):
# Verify trial_active_deadline_seconds is set on Job spec
job_spec = experiment.spec.trial_template.trial_spec.spec
assert job_spec.active_deadline_seconds == 3600
# Verify other Job-specific fields
assert isinstance(experiment.spec.trial_template.trial_spec, V1Job)
args_content = "".join(
experiment.spec.trial_template.trial_spec.spec.template.spec.containers[
0
].args
)
assert "'a': '${trialParameters.a}'" in args_content

elif (
test_name
== "valid flow with trial_active_deadline_seconds for PyTorchJob-based trials"
):
# Verify trial_active_deadline_seconds is set on PyTorchJob run_policy
pytorch_spec = experiment.spec.trial_template.trial_spec.spec
assert pytorch_spec.run_policy.active_deadline_seconds == 7200
# Verify PyTorchJob type
trial_spec = experiment.spec.trial_template.trial_spec
assert isinstance(trial_spec, KubeflowOrgV1PyTorchJob)

elif test_name == "valid flow with external model tuning":
# Verify input_params
args_content = "".join(
Expand Down
11 changes: 9 additions & 2 deletions sdk/python/v1beta1/kubeflow/katib/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ def get_trial_template_with_job(
retain_trials: bool,
trial_parameters: List[models.V1beta1TrialParameterSpec],
pod_template_spec: client.V1PodTemplateSpec,
trial_active_deadline_seconds: Optional[int] = None,
) -> models.V1beta1TrialTemplate:
"""
Get Trial template with Job as a Trial's Worker
Expand All @@ -301,7 +302,9 @@ def get_trial_template_with_job(
job = client.V1Job(
api_version="batch/v1",
kind="Job",
spec=client.V1JobSpec(template=pod_template_spec),
spec=client.V1JobSpec(
template=pod_template_spec, active_deadline_seconds=trial_active_deadline_seconds
),
)

trial_template = models.V1beta1TrialTemplate(
Expand All @@ -319,6 +322,7 @@ def get_trial_template_with_pytorchjob(
resources_per_trial: types.TrainerResources,
master_pod_template_spec: models.V1PodTemplateSpec,
worker_pod_template_spec: models.V1PodTemplateSpec,
trial_active_deadline_seconds: Optional[int] = None,
) -> models.V1beta1TrialTemplate:
"""
Get Trial template with PyTorchJob as a Trial's Worker
Expand All @@ -329,7 +333,10 @@ def get_trial_template_with_pytorchjob(
api_version=API_VERSION,
kind=PYTORCHJOB_KIND,
spec=training_models.KubeflowOrgV1PyTorchJobSpec(
run_policy=training_models.KubeflowOrgV1RunPolicy(clean_pod_policy=None),
run_policy=training_models.KubeflowOrgV1RunPolicy(
clean_pod_policy=None,
active_deadline_seconds=trial_active_deadline_seconds,
),
nproc_per_node=str(resources_per_trial.num_procs_per_worker),
pytorch_replica_specs={
"Master": training_models.KubeflowOrgV1ReplicaSpec(
Expand Down