Skip to content

Added provision to assert job status, updated HPO script for disconne… #208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ toolchain go1.21.5
require (
github.com/kubeflow/training-operator v1.7.0
github.com/onsi/gomega v1.31.1
github.com/project-codeflare/codeflare-common v0.0.0-20240809123324-d44e319ba556
github.com/project-codeflare/codeflare-common v0.0.0-20240827080155-9234d23ff47d
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/common v0.45.0
github.com/ray-project/kuberay/ray-operator v1.1.0-alpha.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/project-codeflare/appwrapper v0.8.0 h1:vWHNtXUtHutN2EzYb6rryLdESnb8iDXsCokXOuNYXvg=
github.com/project-codeflare/appwrapper v0.8.0/go.mod h1:FMQ2lI3fz6LakUVXgN1FTdpsc3BBkNIZZgtMmM9J5UM=
github.com/project-codeflare/codeflare-common v0.0.0-20240809123324-d44e319ba556 h1:4SI3d63CNZ+7sKQ1JEqLmNzGSgVXqz3aT3+aDXRgo18=
github.com/project-codeflare/codeflare-common v0.0.0-20240809123324-d44e319ba556/go.mod h1:unKTw+XoMANTES3WieG016im7rxZ7IR2/ph++L5Vp1Y=
github.com/project-codeflare/codeflare-common v0.0.0-20240827080155-9234d23ff47d h1:hbfF20rw/NHvXNXYLuxPjCnBS5Lotvt6rU0S9DLs0HU=
github.com/project-codeflare/codeflare-common v0.0.0-20240827080155-9234d23ff47d/go.mod h1:unKTw+XoMANTES3WieG016im7rxZ7IR2/ph++L5Vp1Y=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
Expand Down
4 changes: 2 additions & 2 deletions tests/kfto/core/kfto_kueue_sft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func runPytorchjobWithSFTtrainer(t *testing.T, modelConfigFile string) {
}
clusterQueue := CreateKueueClusterQueue(test, cqSpec)
defer test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name)
localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name, AsDefaultQueue)

// Create training PyTorch job
tuningJob := createPyTorchJob(test, namespace.Name, localQueue.Name, *config)
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestPytorchjobUsingKueueQuota(t *testing.T) {
}
clusterQueue := CreateKueueClusterQueue(test, cqSpec)
defer test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name)
localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name, AsDefaultQueue)

// Create first training PyTorch job
tuningJob := createPyTorchJob(test, namespace.Name, localQueue.Name, *config)
Expand Down
2 changes: 1 addition & 1 deletion tests/kfto/upgrade/kfto_kueue_sft_upgrade_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestSetupPytorchjob(t *testing.T) {
clusterQueue, err = test.Client().Kueue().KueueV1beta1().ClusterQueues().Create(test.Ctx(), clusterQueue, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())

localQueue := CreateKueueLocalQueue(test, namespaceName, clusterQueue.Name)
localQueue := CreateKueueLocalQueue(test, namespaceName, clusterQueue.Name, AsDefaultQueue)

// Create training PyTorch job
tuningJob := createPyTorchJob(test, namespaceName, localQueue.Name, *config)
Expand Down
86 changes: 57 additions & 29 deletions tests/odh/mnist_ray_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"testing"
"time"

. "github.com/onsi/gomega"
. "github.com/project-codeflare/codeflare-common/support"
Expand Down Expand Up @@ -77,11 +78,11 @@ func mnistRay(t *testing.T, numGpus int) {
}
clusterQueue := CreateKueueClusterQueue(test, cqSpec)
defer test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name)
CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name, AsDefaultQueue)

// Test configuration
jupyterNotebookConfigMapFileName := "mnist_ray_mini.ipynb"
mnist := readMnistPy(test)
mnist := readMnistScriptTemplate(test, "resources/mnist.py")
if numGpus > 0 {
mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"gpu\""), 1)
} else {
Expand All @@ -91,7 +92,7 @@ func mnistRay(t *testing.T, numGpus int) {
// MNIST Ray Notebook
jupyterNotebookConfigMapFileName: ReadFile(test, "resources/mnist_ray_mini.ipynb"),
"mnist.py": mnist,
"requirements.txt": readRequirementsTxt(test),
"requirements.txt": ReadFile(test, "resources/requirements.txt"),
})

// Define the regular(non-admin) user
Expand All @@ -102,7 +103,7 @@ func mnistRay(t *testing.T, numGpus int) {
CreateUserRoleBindingWithClusterRole(test, userName, namespace.Name, "admin")

// Create Notebook CR
createNotebook(test, namespace, userToken, localQueue.Name, config.Name, jupyterNotebookConfigMapFileName, numGpus)
createNotebook(test, namespace, userToken, config.Name, jupyterNotebookConfigMapFileName, numGpus)

// Gracefully cleanup Notebook
defer func() {
Expand All @@ -111,7 +112,7 @@ func mnistRay(t *testing.T, numGpus int) {
}()

// Make sure the RayCluster is created and running
test.Eventually(rayClusters(test, namespace), TestTimeoutLong).
test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong).
Should(
And(
HaveLen(1),
Expand All @@ -128,32 +129,59 @@ func mnistRay(t *testing.T, numGpus int) {
),
)

// Make sure the RayCluster finishes and is deleted
test.Eventually(rayClusters(test, namespace), TestTimeoutLong).
Should(HaveLen(0))
}

func readRequirementsTxt(test Test) []byte {
// Read the requirements.txt from resources and perform replacements for custom values using go template
props := struct {
PipIndexUrl string
PipTrustedHost string
}{
PipIndexUrl: "--index " + string(GetPipIndexURL()),
}
// Fetch created raycluster
rayClusterName := "mnisttest"
// Wait until raycluster is up and running
rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{})
test.Expect(err).ToNot(HaveOccurred())

// Initialise raycluster client to interact with raycluster to get rayjob details using REST-API
dashboardUrl := GetDashboardUrl(test, namespace, rayCluster)
rayClusterClientConfig := RayClusterClientConfig{Address: dashboardUrl.String(), Client: nil, InsecureSkipVerify: true}
rayClient, err := NewRayClusterClient(rayClusterClientConfig, test.Config().BearerToken)
test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to create new raycluster client: %s", err))

// wait until rayjob exists
test.Eventually(func() []RayJobDetailsResponse {
rayJobs, err := rayClient.GetJobs()
test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to fetch ray-jobs : %s", err))
return *rayJobs
}, TestTimeoutMedium, 2*time.Second).Should(HaveLen(1), "Ray job not found")

// Get test job-id
jobID := GetTestJobId(test, rayClient, dashboardUrl.Host)
test.Expect(jobID).ToNot(BeEmpty())

// Wait for the job to be succeeded or failed
var rayJobStatus string
test.T().Logf("Waiting for job to be Succeeded...\n")
test.Eventually(func() string {
resp, err := rayClient.GetJobDetails(jobID)
test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to get job details :%s", err))
rayJobStatusVal := resp.Status
if rayJobStatusVal == "SUCCEEDED" || rayJobStatusVal == "FAILED" {
test.T().Logf("JobStatus - %s\n", rayJobStatusVal)
rayJobStatus = rayJobStatusVal
return rayJobStatus
}
if rayJobStatus != rayJobStatusVal && rayJobStatusVal != "SUCCEEDED" {
test.T().Logf("JobStatus - %s...\n", rayJobStatusVal)
rayJobStatus = rayJobStatusVal
}
return rayJobStatus
}, TestTimeoutDouble, 3*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time")
// Store job logs in output directory
WriteRayJobAPILogs(test, rayClient, jobID)

// Assert ray-job status after job execution
test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !")

// Provide trusted host only if defined
if len(GetPipTrustedHost()) > 0 {
props.PipTrustedHost = "--trusted-host " + GetPipTrustedHost()
}

template, err := files.ReadFile("resources/requirements.txt")
test.Expect(err).NotTo(HaveOccurred())

return ParseTemplate(test, template, props)
// Make sure the RayCluster finishes and is deleted
test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong).
Should(BeEmpty())
}

func readMnistPy(test Test) []byte {
func readMnistScriptTemplate(test Test, filePath string) []byte {
// Read the mnist.py from resources and perform replacements for custom values using go template
storage_bucket_endpoint, storage_bucket_endpoint_exists := GetStorageBucketDefaultEndpoint()
storage_bucket_access_key_id, storage_bucket_access_key_id_exists := GetStorageBucketAccessKeyId()
Expand Down Expand Up @@ -184,7 +212,7 @@ func readMnistPy(test Test) []byte {
StorageBucketMnistDir: storage_bucket_mnist_dir,
StorageBucketMnistDirExists: storage_bucket_mnist_dir_exists,
}
template, err := files.ReadFile("resources/mnist.py")
template, err := files.ReadFile(filePath)
test.Expect(err).NotTo(HaveOccurred())

return ParseTemplate(test, template, props)
Expand Down
60 changes: 54 additions & 6 deletions tests/odh/mnist_raytune_hpo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"testing"
"time"

. "github.com/onsi/gomega"
. "github.com/project-codeflare/codeflare-common/support"
Expand Down Expand Up @@ -76,11 +77,11 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) {
}
clusterQueue := CreateKueueClusterQueue(test, cqSpec)
defer test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name)
CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name, AsDefaultQueue)

// Test configuration
jupyterNotebookConfigMapFileName := "mnist_hpo_raytune.ipynb"
mnist_hpo := ReadFile(test, "resources/mnist_hpo.py")
mnist_hpo := readMnistScriptTemplate(test, "resources/mnist_hpo.py")

if numGpus > 0 {
mnist_hpo = bytes.Replace(mnist_hpo, []byte("gpu_value=\"has to be specified\""), []byte("gpu_value=\"1\""), 1)
Expand All @@ -103,7 +104,7 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) {
CreateUserRoleBindingWithClusterRole(test, userName, namespace.Name, "admin")

// Create Notebook CR
createNotebook(test, namespace, userToken, localQueue.Name, config.Name, jupyterNotebookConfigMapFileName, numGpus)
createNotebook(test, namespace, userToken, config.Name, jupyterNotebookConfigMapFileName, numGpus)

// Gracefully cleanup Notebook
defer func() {
Expand All @@ -112,7 +113,7 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) {
}()

// Make sure the RayCluster is created and running
test.Eventually(rayClusters(test, namespace), TestTimeoutLong).
test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong).
Should(
And(
HaveLen(1),
Expand All @@ -129,7 +130,54 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) {
),
)

// Fetch created raycluster
rayClusterName := "mnisthpotest"
// Wait until raycluster is up and running
rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{})
test.Expect(err).ToNot(HaveOccurred())

// Initialise raycluster client to interact with raycluster to get rayjob details using REST-API
dashboardUrl := GetDashboardUrl(test, namespace, rayCluster)
rayClusterClientConfig := RayClusterClientConfig{Address: dashboardUrl.String(), Client: nil, InsecureSkipVerify: true}
rayClient, err := NewRayClusterClient(rayClusterClientConfig, test.Config().BearerToken)
test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to create new raycluster client: %s", err))

// Wait until the rayjob is created and running
test.Eventually(func() []RayJobDetailsResponse {
rayJobs, err := rayClient.GetJobs()
test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to fetch ray-jobs : %s", err))
return *rayJobs
}, TestTimeoutMedium, 2*time.Second).Should(HaveLen(1), "Ray job not found")

// Get rayjob-ID
jobID := GetTestJobId(test, rayClient, dashboardUrl.Host)
test.Expect(jobID).ToNot(BeEmpty())

// Wait for the job to either succeed or fail
var rayJobStatus string
test.T().Logf("Waiting for job to be Succeeded...\n")
test.Eventually(func() string {
resp, err := rayClient.GetJobDetails(jobID)
test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to get job details :%s", err))
rayJobStatusVal := resp.Status
if rayJobStatusVal == "SUCCEEDED" || rayJobStatusVal == "FAILED" {
test.T().Logf("JobStatus - %s\n", rayJobStatusVal)
rayJobStatus = rayJobStatusVal
return rayJobStatus
}
if rayJobStatus != rayJobStatusVal && rayJobStatusVal != "SUCCEEDED" {
test.T().Logf("JobStatus - %s...\n", rayJobStatusVal)
rayJobStatus = rayJobStatusVal
}
return rayJobStatus
}, TestTimeoutDouble, 3*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time")
// Store job logs in output directory
WriteRayJobAPILogs(test, rayClient, jobID)

// Assert ray-job status after job execution
test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !")

// Make sure the RayCluster finishes and is deleted
test.Eventually(rayClusters(test, namespace), TestTimeoutLong).
Should(HaveLen(0))
test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong).
Should(BeEmpty())
}
27 changes: 24 additions & 3 deletions tests/odh/notebook.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,32 @@ type NotebookProps struct {
OpenDataHubNamespace string
RayImage string
NotebookImage string
LocalQueue string
NotebookConfigMapName string
NotebookConfigMapFileName string
NotebookPVC string
NumGpus int
PipIndexUrl string
PipTrustedHost string
S3BucketName string
S3AccessKeyId string
S3SecretAccessKey string
S3DefaultRegion string
}

func createNotebook(test Test, namespace *corev1.Namespace, notebookUserToken, localQueue, jupyterNotebookConfigMapName, jupyterNotebookConfigMapFileName string, numGpus int) {
func createNotebook(test Test, namespace *corev1.Namespace, notebookUserToken, jupyterNotebookConfigMapName, jupyterNotebookConfigMapFileName string, numGpus int) {
// Create PVC for Notebook
notebookPVC := CreatePersistentVolumeClaim(test, namespace.Name, "10Gi", corev1.ReadWriteOnce)
s3BucketName, s3BucketNameExists := GetStorageBucketName()
s3AccessKeyId, _ := GetStorageBucketAccessKeyId()
s3SecretAccessKey, _ := GetStorageBucketSecretKey()
s3DefaultRegion, _ := GetStorageBucketDefaultRegion()

if !s3BucketNameExists {
s3BucketName = "''"
s3AccessKeyId = "''"
s3SecretAccessKey = "''"
s3DefaultRegion = "''"
}

// Read the Notebook CR from resources and perform replacements for custom values using go template
notebookProps := NotebookProps{
Expand All @@ -61,11 +77,16 @@ func createNotebook(test Test, namespace *corev1.Namespace, notebookUserToken, l
OpenDataHubNamespace: GetOpenDataHubNamespace(test),
RayImage: GetRayImage(),
NotebookImage: GetNotebookImage(test),
LocalQueue: localQueue,
NotebookConfigMapName: jupyterNotebookConfigMapName,
NotebookConfigMapFileName: jupyterNotebookConfigMapFileName,
NotebookPVC: notebookPVC.Name,
NumGpus: numGpus,
S3BucketName: s3BucketName,
S3AccessKeyId: s3AccessKeyId,
S3SecretAccessKey: s3SecretAccessKey,
S3DefaultRegion: s3DefaultRegion,
PipIndexUrl: GetPipIndexURL(),
PipTrustedHost: GetPipTrustedHost(),
}
notebookTemplate, err := files.ReadFile("resources/custom-nb-small.yaml")
test.Expect(err).NotTo(gomega.HaveOccurred())
Expand Down
16 changes: 14 additions & 2 deletions tests/odh/resources/custom-nb-small.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,20 @@ spec:
value: {{.NotebookImage}}
- name: JUPYTER_NOTEBOOK_PORT
value: "8888"
- name: AWS_ACCESS_KEY_ID
value: {{.S3AccessKeyId}}
- name: AWS_SECRET_ACCESS_KEY
value: {{.S3SecretAccessKey}}
- name: AWS_DEFAULT_REGION
value: {{.S3DefaultRegion}}
- name: AWS_S3_BUCKET
value: {{.S3BucketName}}
- name: PIP_INDEX_URL
value: {{.PipIndexUrl}}
- name: PIP_TRUSTED_HOST
value: {{.PipTrustedHost}}
image: {{.NotebookImage}}
command: ["/bin/sh", "-c", "pip install papermill && papermill /opt/app-root/notebooks/{{.NotebookConfigMapFileName}} /opt/app-root/src/mcad-out.ipynb -p namespace {{.Namespace}} -p ray_image {{.RayImage}} -p local_queue {{.LocalQueue}} -p openshift_api_url {{.OpenShiftApiUrl}} -p kubernetes_user_bearer_token {{.KubernetesUserBearerToken}} -p num_gpus {{ .NumGpus }} --log-output && sleep infinity"]
command: ["/bin/sh", "-c", "pip install papermill && papermill /opt/app-root/notebooks/{{.NotebookConfigMapFileName}} /opt/app-root/src/mcad-out.ipynb -p namespace {{.Namespace}} -p ray_image {{.RayImage}} -p openshift_api_url {{.OpenShiftApiUrl}} -p kubernetes_user_bearer_token {{.KubernetesUserBearerToken}} -p num_gpus {{ .NumGpus }} --log-output && sleep infinity"]
# args: ["pip install papermill && oc login --token=${OCP_TOKEN} --server=${OCP_SERVER} --insecure-skip-tls-verify=true && papermill /opt/app-root/notebooks/mcad.ipynb /opt/app-root/src/mcad-out.ipynb" ]
imagePullPolicy: Always
# livenessProbe:
Expand Down Expand Up @@ -158,4 +170,4 @@ spec:
secretName: jupyter-nb-kube-3aadmin-tls
- name: {{.NotebookConfigMapName}}
configMap:
name: {{.NotebookConfigMapName}}
name: {{.NotebookConfigMapName}}
1 change: 1 addition & 0 deletions tests/odh/resources/hpo_raytune_requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
torchvision==0.18.0
minio
9 changes: 9 additions & 0 deletions tests/odh/resources/mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,20 @@ def prepare_data(self):
secret_key = "{{.StorageBucketSecretKey}}"
bucket_name = "{{.StorageBucketName}}"

# remove prefix if specified in storage bucket endpoint url
secure = True
if endpoint.startswith("https://"):
endpoint = endpoint[len("https://") :]
elif endpoint.startswith("http://"):
endpoint = endpoint[len("http://") :]
secure = False

client = Minio(
endpoint,
access_key=access_key,
secret_key=secret_key,
cert_check=False,
secure=secure
)

if not os.path.exists(dataset_dir):
Expand Down
Loading