From c3b312f24d80e04a408e0c746c5d06d28beb8ed6 Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Wed, 21 Aug 2024 13:09:33 +0530 Subject: [PATCH 1/3] Added provision to assert job status, updated HPO script for disconnected and removed local-queue parameter usage from cluster configuration to make it optional --- go.mod | 2 +- go.sum | 4 +- tests/kfto/core/kfto_kueue_sft_test.go | 4 +- .../kfto_kueue_sft_upgrade_training_test.go | 2 +- tests/odh/mnist_ray_test.go | 72 ++- tests/odh/mnist_raytune_hpo_test.go | 51 +- tests/odh/notebook.go | 27 +- tests/odh/resources/custom-nb-small.yaml | 16 +- .../resources/hpo_raytune_requirements.txt | 1 + tests/odh/resources/mnist.py | 9 + tests/odh/resources/mnist_hpo.py | 80 ++- tests/odh/resources/mnist_hpo_raytune.ipynb | 466 +++++++++--------- tests/odh/resources/mnist_ray_mini.ipynb | 466 +++++++++--------- tests/odh/resources/requirements.txt | 2 - tests/odh/support.go | 41 +- 15 files changed, 720 insertions(+), 523 deletions(-) diff --git a/go.mod b/go.mod index c50684b1..5cf1a4a4 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ toolchain go1.21.5 require ( github.com/kubeflow/training-operator v1.7.0 github.com/onsi/gomega v1.31.1 - github.com/project-codeflare/codeflare-common v0.0.0-20240809123324-d44e319ba556 + github.com/project-codeflare/codeflare-common v0.0.0-20240827080155-9234d23ff47d github.com/prometheus/client_golang v1.18.0 github.com/prometheus/common v0.45.0 github.com/ray-project/kuberay/ray-operator v1.1.0-alpha.0 diff --git a/go.sum b/go.sum index 7e8390f4..2f46f88c 100644 --- a/go.sum +++ b/go.sum @@ -363,8 +363,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/project-codeflare/appwrapper v0.8.0 h1:vWHNtXUtHutN2EzYb6rryLdESnb8iDXsCokXOuNYXvg= github.com/project-codeflare/appwrapper v0.8.0/go.mod h1:FMQ2lI3fz6LakUVXgN1FTdpsc3BBkNIZZgtMmM9J5UM= -github.com/project-codeflare/codeflare-common v0.0.0-20240809123324-d44e319ba556 h1:4SI3d63CNZ+7sKQ1JEqLmNzGSgVXqz3aT3+aDXRgo18= -github.com/project-codeflare/codeflare-common v0.0.0-20240809123324-d44e319ba556/go.mod h1:unKTw+XoMANTES3WieG016im7rxZ7IR2/ph++L5Vp1Y= +github.com/project-codeflare/codeflare-common v0.0.0-20240827080155-9234d23ff47d h1:hbfF20rw/NHvXNXYLuxPjCnBS5Lotvt6rU0S9DLs0HU= +github.com/project-codeflare/codeflare-common v0.0.0-20240827080155-9234d23ff47d/go.mod h1:unKTw+XoMANTES3WieG016im7rxZ7IR2/ph++L5Vp1Y= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= diff --git a/tests/kfto/core/kfto_kueue_sft_test.go b/tests/kfto/core/kfto_kueue_sft_test.go index f191bbbc..6d9f0500 100644 --- a/tests/kfto/core/kfto_kueue_sft_test.go +++ b/tests/kfto/core/kfto_kueue_sft_test.go @@ -79,7 +79,7 @@ func runPytorchjobWithSFTtrainer(t *testing.T, modelConfigFile string) { } clusterQueue := CreateKueueClusterQueue(test, cqSpec) defer test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{}) - localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name) + localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name, AsDefaultQueue) // Create training PyTorch job tuningJob := createPyTorchJob(test, namespace.Name, localQueue.Name, *config) @@ -143,7 +143,7 @@ func TestPytorchjobUsingKueueQuota(t *testing.T) { } clusterQueue := CreateKueueClusterQueue(test, cqSpec) defer test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{}) - localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name) + localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name, AsDefaultQueue) // Create first training PyTorch job tuningJob := createPyTorchJob(test, namespace.Name, localQueue.Name, *config) diff --git a/tests/kfto/upgrade/kfto_kueue_sft_upgrade_training_test.go b/tests/kfto/upgrade/kfto_kueue_sft_upgrade_training_test.go index 282ad9d8..402686d8 100644 --- a/tests/kfto/upgrade/kfto_kueue_sft_upgrade_training_test.go +++ b/tests/kfto/upgrade/kfto_kueue_sft_upgrade_training_test.go @@ -99,7 +99,7 @@ func TestSetupPytorchjob(t *testing.T) { clusterQueue, err = test.Client().Kueue().KueueV1beta1().ClusterQueues().Create(test.Ctx(), clusterQueue, metav1.CreateOptions{}) test.Expect(err).NotTo(HaveOccurred()) - localQueue := CreateKueueLocalQueue(test, namespaceName, clusterQueue.Name) + localQueue := CreateKueueLocalQueue(test, namespaceName, clusterQueue.Name, AsDefaultQueue) // Create training PyTorch job tuningJob := createPyTorchJob(test, namespaceName, localQueue.Name, *config) diff --git a/tests/odh/mnist_ray_test.go b/tests/odh/mnist_ray_test.go index 2286114c..e52a8fae 100644 --- a/tests/odh/mnist_ray_test.go +++ b/tests/odh/mnist_ray_test.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "testing" + "time" . "github.com/onsi/gomega" . "github.com/project-codeflare/codeflare-common/support" @@ -77,11 +78,11 @@ func mnistRay(t *testing.T, numGpus int) { } clusterQueue := CreateKueueClusterQueue(test, cqSpec) defer test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{}) - localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name) + CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name, AsDefaultQueue) // Test configuration jupyterNotebookConfigMapFileName := "mnist_ray_mini.ipynb" - mnist := readMnistPy(test) + mnist := readMnistScriptTemplate(test, "resources/mnist.py") if numGpus > 0 { mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"gpu\""), 1) } else { @@ -91,7 +92,7 @@ func mnistRay(t *testing.T, numGpus int) { // MNIST Ray Notebook jupyterNotebookConfigMapFileName: ReadFile(test, "resources/mnist_ray_mini.ipynb"), "mnist.py": mnist, - "requirements.txt": readRequirementsTxt(test), + "requirements.txt": ReadFile(test, "resources/requirements.txt"), }) // Define the regular(non-admin) user @@ -102,7 +103,7 @@ func mnistRay(t *testing.T, numGpus int) { CreateUserRoleBindingWithClusterRole(test, userName, namespace.Name, "admin") // Create Notebook CR - createNotebook(test, namespace, userToken, localQueue.Name, config.Name, jupyterNotebookConfigMapFileName, numGpus) + createNotebook(test, namespace, userToken, config.Name, jupyterNotebookConfigMapFileName, numGpus) // Gracefully cleanup Notebook defer func() { @@ -111,7 +112,7 @@ func mnistRay(t *testing.T, numGpus int) { }() // Make sure the RayCluster is created and running - test.Eventually(rayClusters(test, namespace), TestTimeoutLong). + test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong). Should( And( HaveLen(1), @@ -128,32 +129,53 @@ func mnistRay(t *testing.T, numGpus int) { ), ) - // Make sure the RayCluster finishes and is deleted - test.Eventually(rayClusters(test, namespace), TestTimeoutLong). - Should(HaveLen(0)) -} + time.Sleep(30 * time.Second) -func readRequirementsTxt(test Test) []byte { - // Read the requirements.txt from resources and perform replacements for custom values using go template - props := struct { - PipIndexUrl string - PipTrustedHost string - }{ - PipIndexUrl: "--index " + string(GetPipIndexURL()), - } + // Fetch created raycluster + rayClusterName := "mnisttest" + rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{}) + test.Expect(err).ToNot(HaveOccurred()) - // Provide trusted host only if defined - if len(GetPipTrustedHost()) > 0 { - props.PipTrustedHost = "--trusted-host " + GetPipTrustedHost() + // Initialise raycluster client to interact with raycluster to get rayjob details using REST-API + dashboardUrl := GetDashboardUrl(test, namespace, rayCluster) + rayClusterClientConfig := RayClusterClientConfig{Address: dashboardUrl.String(), Client: nil, InsecureSkipVerify: true} + rayClient, err := NewRayClusterClient(rayClusterClientConfig, test.Config().BearerToken) + if err != nil { + test.T().Errorf("%s", err) } - template, err := files.ReadFile("resources/requirements.txt") - test.Expect(err).NotTo(HaveOccurred()) + jobID := GetTestJobId(test, rayClient, dashboardUrl.Host) + test.Expect(jobID).ToNot(Equal(nil)) + + // Wait for the job to be succeeded or failed + var rayJobStatus string + fmt.Printf("Waiting for job to be Succeeded...\n") + test.Eventually(func() string { + resp, err := rayClient.GetJobDetails(jobID) + test.Expect(err).ToNot(HaveOccurred()) + rayJobStatusVal := resp.Status + if rayJobStatusVal == "SUCCEEDED" || rayJobStatusVal == "FAILED" { + fmt.Printf("JobStatus : %s\n", rayJobStatusVal) + rayJobStatus = rayJobStatusVal + return rayJobStatus + } + if rayJobStatus != rayJobStatusVal && rayJobStatusVal != "SUCCEEDED" { + fmt.Printf("JobStatus : %s...\n", rayJobStatusVal) + rayJobStatus = rayJobStatusVal + } + return rayJobStatus + }, TestTimeoutDouble, 3*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time") + test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !") + + // Store job logs in output directory + WriteRayJobAPILogs(test, rayClient, jobID) - return ParseTemplate(test, template, props) + // Make sure the RayCluster finishes and is deleted + test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong). + Should(HaveLen(0)) } -func readMnistPy(test Test) []byte { +func readMnistScriptTemplate(test Test, filePath string) []byte { // Read the mnist.py from resources and perform replacements for custom values using go template storage_bucket_endpoint, storage_bucket_endpoint_exists := GetStorageBucketDefaultEndpoint() storage_bucket_access_key_id, storage_bucket_access_key_id_exists := GetStorageBucketAccessKeyId() @@ -184,7 +206,7 @@ func readMnistPy(test Test) []byte { StorageBucketMnistDir: storage_bucket_mnist_dir, StorageBucketMnistDirExists: storage_bucket_mnist_dir_exists, } - template, err := files.ReadFile("resources/mnist.py") + template, err := files.ReadFile(filePath) test.Expect(err).NotTo(HaveOccurred()) return ParseTemplate(test, template, props) diff --git a/tests/odh/mnist_raytune_hpo_test.go b/tests/odh/mnist_raytune_hpo_test.go index 200bd6b4..e1e32a8d 100644 --- a/tests/odh/mnist_raytune_hpo_test.go +++ b/tests/odh/mnist_raytune_hpo_test.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "testing" + "time" . "github.com/onsi/gomega" . "github.com/project-codeflare/codeflare-common/support" @@ -76,11 +77,11 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) { } clusterQueue := CreateKueueClusterQueue(test, cqSpec) defer test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{}) - localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name) + CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name, AsDefaultQueue) // Test configuration jupyterNotebookConfigMapFileName := "mnist_hpo_raytune.ipynb" - mnist_hpo := ReadFile(test, "resources/mnist_hpo.py") + mnist_hpo := readMnistScriptTemplate(test, "resources/mnist_hpo.py") if numGpus > 0 { mnist_hpo = bytes.Replace(mnist_hpo, []byte("gpu_value=\"has to be specified\""), []byte("gpu_value=\"1\""), 1) @@ -103,7 +104,7 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) { CreateUserRoleBindingWithClusterRole(test, userName, namespace.Name, "admin") // Create Notebook CR - createNotebook(test, namespace, userToken, localQueue.Name, config.Name, jupyterNotebookConfigMapFileName, numGpus) + createNotebook(test, namespace, userToken, config.Name, jupyterNotebookConfigMapFileName, numGpus) // Gracefully cleanup Notebook defer func() { @@ -112,7 +113,7 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) { }() // Make sure the RayCluster is created and running - test.Eventually(rayClusters(test, namespace), TestTimeoutLong). + test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong). Should( And( HaveLen(1), @@ -128,8 +129,48 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) { ContainElement(WithTransform(KueueWorkloadAdmitted, BeTrueBecause("Workload failed to be admitted"))), ), ) + time.Sleep(30 * time.Second) + + // Fetch created raycluster + rayClusterName := "mnisthpotest" + rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{}) + test.Expect(err).ToNot(HaveOccurred()) + + // Initialise raycluster client to interact with raycluster to get rayjob details using REST-API + dashboardUrl := GetDashboardUrl(test, namespace, rayCluster) + rayClusterClientConfig := RayClusterClientConfig{Address: dashboardUrl.String(), Client: nil, InsecureSkipVerify: true} + rayClient, err := NewRayClusterClient(rayClusterClientConfig, test.Config().BearerToken) + if err != nil { + test.T().Errorf("%s", err) + } + + jobID := GetTestJobId(test, rayClient, dashboardUrl.Host) + test.Expect(jobID).ToNot(Equal(nil)) + + // Wait for the job to be succeeded or failed + var rayJobStatus string + fmt.Printf("Waiting for job to be Succeeded...\n") + test.Eventually(func() string { + resp, err := rayClient.GetJobDetails(jobID) + test.Expect(err).ToNot(HaveOccurred()) + rayJobStatusVal := resp.Status + if rayJobStatusVal == "SUCCEEDED" || rayJobStatusVal == "FAILED" { + fmt.Printf("JobStatus : %s\n", rayJobStatusVal) + rayJobStatus = rayJobStatusVal + return rayJobStatus + } + if rayJobStatus != rayJobStatusVal && rayJobStatusVal != "SUCCEEDED" { + fmt.Printf("JobStatus : %s...\n", rayJobStatusVal) + rayJobStatus = rayJobStatusVal + } + return rayJobStatus + }, TestTimeoutDouble, 3*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time") + test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !") + + // Store job logs in output directory + WriteRayJobAPILogs(test, rayClient, jobID) // Make sure the RayCluster finishes and is deleted - test.Eventually(rayClusters(test, namespace), TestTimeoutLong). + test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong). Should(HaveLen(0)) } diff --git a/tests/odh/notebook.go b/tests/odh/notebook.go index 32651615..12b8aa51 100644 --- a/tests/odh/notebook.go +++ b/tests/odh/notebook.go @@ -41,16 +41,32 @@ type NotebookProps struct { OpenDataHubNamespace string RayImage string NotebookImage string - LocalQueue string NotebookConfigMapName string NotebookConfigMapFileName string NotebookPVC string NumGpus int + PipIndexUrl string + PipTrustedHost string + S3BucketName string + S3AccessKeyId string + S3SecretAccessKey string + S3DefaultRegion string } -func createNotebook(test Test, namespace *corev1.Namespace, notebookUserToken, localQueue, jupyterNotebookConfigMapName, jupyterNotebookConfigMapFileName string, numGpus int) { +func createNotebook(test Test, namespace *corev1.Namespace, notebookUserToken, jupyterNotebookConfigMapName, jupyterNotebookConfigMapFileName string, numGpus int) { // Create PVC for Notebook notebookPVC := CreatePersistentVolumeClaim(test, namespace.Name, "10Gi", corev1.ReadWriteOnce) + s3BucketName, s3BucketNameExists := GetStorageBucketName() + s3AccessKeyId, _ := GetStorageBucketAccessKeyId() + s3SecretAccessKey, _ := GetStorageBucketSecretKey() + s3DefaultRegion, _ := GetStorageBucketDefaultRegion() + + if !s3BucketNameExists { + s3BucketName = "''" + s3AccessKeyId = "''" + s3SecretAccessKey = "''" + s3DefaultRegion = "''" + } // Read the Notebook CR from resources and perform replacements for custom values using go template notebookProps := NotebookProps{ @@ -61,11 +77,16 @@ func createNotebook(test Test, namespace *corev1.Namespace, notebookUserToken, l OpenDataHubNamespace: GetOpenDataHubNamespace(test), RayImage: GetRayImage(), NotebookImage: GetNotebookImage(test), - LocalQueue: localQueue, NotebookConfigMapName: jupyterNotebookConfigMapName, NotebookConfigMapFileName: jupyterNotebookConfigMapFileName, NotebookPVC: notebookPVC.Name, NumGpus: numGpus, + S3BucketName: s3BucketName, + S3AccessKeyId: s3AccessKeyId, + S3SecretAccessKey: s3SecretAccessKey, + S3DefaultRegion: s3DefaultRegion, + PipIndexUrl: GetPipIndexURL(), + PipTrustedHost: GetPipTrustedHost(), } notebookTemplate, err := files.ReadFile("resources/custom-nb-small.yaml") test.Expect(err).NotTo(gomega.HaveOccurred()) diff --git a/tests/odh/resources/custom-nb-small.yaml b/tests/odh/resources/custom-nb-small.yaml index 88a4e0a9..19067aec 100644 --- a/tests/odh/resources/custom-nb-small.yaml +++ b/tests/odh/resources/custom-nb-small.yaml @@ -50,8 +50,20 @@ spec: value: {{.NotebookImage}} - name: JUPYTER_NOTEBOOK_PORT value: "8888" + - name: AWS_ACCESS_KEY_ID + value: {{.S3AccessKeyId}} + - name: AWS_SECRET_ACCESS_KEY + value: {{.S3SecretAccessKey}} + - name: AWS_DEFAULT_REGION + value: {{.S3DefaultRegion}} + - name: AWS_S3_BUCKET + value: {{.S3BucketName}} + - name: PIP_INDEX_URL + value: {{.PipIndexUrl}} + - name: PIP_TRUSTED_HOST + value: {{.PipTrustedHost}} image: {{.NotebookImage}} - command: ["/bin/sh", "-c", "pip install papermill && papermill /opt/app-root/notebooks/{{.NotebookConfigMapFileName}} /opt/app-root/src/mcad-out.ipynb -p namespace {{.Namespace}} -p ray_image {{.RayImage}} -p local_queue {{.LocalQueue}} -p openshift_api_url {{.OpenShiftApiUrl}} -p kubernetes_user_bearer_token {{.KubernetesUserBearerToken}} -p num_gpus {{ .NumGpus }} --log-output && sleep infinity"] + command: ["/bin/sh", "-c", "pip install papermill && papermill /opt/app-root/notebooks/{{.NotebookConfigMapFileName}} /opt/app-root/src/mcad-out.ipynb -p namespace {{.Namespace}} -p ray_image {{.RayImage}} -p openshift_api_url {{.OpenShiftApiUrl}} -p kubernetes_user_bearer_token {{.KubernetesUserBearerToken}} -p num_gpus {{ .NumGpus }} --log-output && sleep infinity"] # args: ["pip install papermill && oc login --token=${OCP_TOKEN} --server=${OCP_SERVER} --insecure-skip-tls-verify=true && papermill /opt/app-root/notebooks/mcad.ipynb /opt/app-root/src/mcad-out.ipynb" ] imagePullPolicy: Always # livenessProbe: @@ -158,4 +170,4 @@ spec: secretName: jupyter-nb-kube-3aadmin-tls - name: {{.NotebookConfigMapName}} configMap: - name: {{.NotebookConfigMapName}} + name: {{.NotebookConfigMapName}} \ No newline at end of file diff --git a/tests/odh/resources/hpo_raytune_requirements.txt b/tests/odh/resources/hpo_raytune_requirements.txt index 4708805b..a71db2e2 100755 --- a/tests/odh/resources/hpo_raytune_requirements.txt +++ b/tests/odh/resources/hpo_raytune_requirements.txt @@ -1 +1,2 @@ torchvision==0.18.0 +minio \ No newline at end of file diff --git a/tests/odh/resources/mnist.py b/tests/odh/resources/mnist.py index 4eb08152..459d7467 100644 --- a/tests/odh/resources/mnist.py +++ b/tests/odh/resources/mnist.py @@ -133,11 +133,20 @@ def prepare_data(self): secret_key = "{{.StorageBucketSecretKey}}" bucket_name = "{{.StorageBucketName}}" + # remove prefix if specified in storage bucket endpoint url + secure = True + if endpoint.startswith("https://"): + endpoint = endpoint[len("https://") :] + elif endpoint.startswith("http://"): + endpoint = endpoint[len("http://") :] + secure = False + client = Minio( endpoint, access_key=access_key, secret_key=secret_key, cert_check=False, + secure=secure ) if not os.path.exists(dataset_dir): diff --git a/tests/odh/resources/mnist_hpo.py b/tests/odh/resources/mnist_hpo.py index 026b1914..9fa82f48 100755 --- a/tests/odh/resources/mnist_hpo.py +++ b/tests/odh/resources/mnist_hpo.py @@ -8,14 +8,24 @@ from filelock import FileLock from torchvision import datasets, transforms -import ray +import ray,gzip, shutil from ray import train, tune from ray.train import Checkpoint from ray.tune.schedulers import AsyncHyperBandScheduler +from minio import Minio EPOCH_SIZE = 128 TEST_SIZE = 64 +local_mnist_path = os.path.dirname(os.path.abspath(__file__)) +# %% + +STORAGE_BUCKET_EXISTS = "{{.StorageBucketDefaultEndpointExists}}" +print("STORAGE_BUCKET_EXISTS: ",STORAGE_BUCKET_EXISTS) +print(f"{'Storage_Bucket_Default_Endpoint : is {{.StorageBucketDefaultEndpoint}}' if '{{.StorageBucketDefaultEndpointExists}}' == 'true' else ''}") +print(f"{'Storage_Bucket_Name : is {{.StorageBucketName}}' if '{{.StorageBucketNameExists}}' == 'true' else ''}") +print(f"{'Storage_Bucket_Mnist_Directory : is {{.StorageBucketMnistDir}}' if '{{.StorageBucketMnistDirExists}}' == 'true' else ''}") + class ConvNet(nn.Module): def __init__(self): @@ -66,21 +76,83 @@ def get_data_loaders(batch_size=128): mnist_transforms = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))] ) + # download + print("Downloading MNIST dataset...") + + if "{{.StorageBucketDefaultEndpointExists}}" == "true" and "{{.StorageBucketDefaultEndpoint}}" != "": + print("Using storage bucket to download datasets...") + dataset_dir = os.path.join(local_mnist_path, "MNIST/raw") + endpoint = "{{.StorageBucketDefaultEndpoint}}" + access_key = "{{.StorageBucketAccessKeyId}}" + secret_key = "{{.StorageBucketSecretKey}}" + bucket_name = "{{.StorageBucketName}}" + + #remove https prefix incase provided in endpoint url + if endpoint.startswith("https://"): + endpoint=endpoint[len("https://"):] + + # remove prefix if specified in storage bucket endpoint url + secure = True + if endpoint.startswith("https://"): + endpoint = endpoint[len("https://") :] + elif endpoint.startswith("http://"): + endpoint = endpoint[len("http://") :] + secure = False + + client = Minio( + endpoint, + access_key=access_key, + secret_key=secret_key, + cert_check=False, + secure=secure, + ) + if not os.path.exists(dataset_dir): + os.makedirs(dataset_dir) + else: + print(f"Directory '{dataset_dir}' already exists") + + # To download datasets from storage bucket's specific directory, use prefix to provide directory name + prefix="{{.StorageBucketMnistDir}}" + # download all files from prefix folder of storage bucket recursively + for item in client.list_objects( + bucket_name, prefix=prefix, recursive=True + ): + file_name=item.object_name[len(prefix)+1:] + dataset_file_path = os.path.join(dataset_dir, file_name) + print(dataset_file_path) + if not os.path.exists(dataset_file_path): + client.fget_object( + bucket_name, item.object_name, dataset_file_path + ) + else: + print(f"File-path '{dataset_file_path}' already exists") + # Unzip files + with gzip.open(dataset_file_path, "rb") as f_in: + with open(dataset_file_path.split(".")[:-1][0], "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + # delete zip file + os.remove(dataset_file_path) + download_datasets = False + + else: + print("Using default MNIST mirror reference to download datasets...") + download_datasets = True + # We add FileLock here because multiple workers will want to # download data, and this may cause overwrites since # DataLoader is not threadsafe. with FileLock(os.path.expanduser("~/data.lock")): train_loader = torch.utils.data.DataLoader( datasets.MNIST( - "~/data", train=True, download=True, transform=mnist_transforms + local_mnist_path, train=True, download=download_datasets, transform=mnist_transforms ), batch_size=batch_size, shuffle=True, ) test_loader = torch.utils.data.DataLoader( datasets.MNIST( - "~/data", train=False, download=True, transform=mnist_transforms + local_mnist_path, train=False, download=download_datasets, transform=mnist_transforms ), batch_size=batch_size, shuffle=True, @@ -142,4 +214,4 @@ def train_mnist(config): print("Best hyperparameters config is:", results.get_best_result().config) - assert not results.errors + assert not results.errors \ No newline at end of file diff --git a/tests/odh/resources/mnist_hpo_raytune.ipynb b/tests/odh/resources/mnist_hpo_raytune.ipynb index 5e2d785a..83c42563 100644 --- a/tests/odh/resources/mnist_hpo_raytune.ipynb +++ b/tests/odh/resources/mnist_hpo_raytune.ipynb @@ -1,232 +1,236 @@ { - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "id": "b55bc3ea-4ce3-49bf-bb1f-e209de8ca47a", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "import os\n", - "import yaml\n", - "\n", - "# Import pieces from codeflare-sdk\n", - "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication\n", - "from codeflare_sdk.job import RayJobClient\n", - "from time import sleep" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f6fb4c03", - "metadata": {}, - "outputs": [], - "source": [ - "%pip show codeflare-sdk" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "30888aed", - "metadata": { - "tags": [ - "parameters" - ] - }, - "outputs": [], - "source": [ - "#parameters\n", - "namespace = \"default\"\n", - "ray_image = \"has to be specified\"\n", - "local_queue = \"has to be specified\"\n", - "openshift_api_url = \"has to be specified\"\n", - "kubernetes_user_bearer_token = \"has to be specified\"\n", - "num_gpus = \"has to be specified\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a0538160", - "metadata": {}, - "outputs": [], - "source": [ - "auth = TokenAuthentication(\n", - " token=kubernetes_user_bearer_token,\n", - " server=openshift_api_url,\n", - " skip_tls=True,\n", - ")\n", - "auth.login()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "0f4bc870-091f-4e11-9642-cba145710159", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "# Create ray cluster\n", - "cluster = Cluster(\n", - " ClusterConfiguration(\n", - " namespace=namespace,\n", - " name='mnisthpotest',\n", - " head_cpus=1,\n", - " head_memory=4,\n", - " head_extended_resource_requests={'nvidia.com/gpu':0},\n", - " num_workers=1,\n", - " worker_cpu_requests=1,\n", - " worker_cpu_limits=1,\n", - " worker_memory_requests=1,\n", - " worker_memory_limits=4,\n", - " worker_extended_resource_requests={'nvidia.com/gpu':int(num_gpus)},\n", - " image=ray_image,\n", - " local_queue=local_queue,\n", - " write_to_file=True,\n", - " verify_tls=False\n", - " )\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "04a5ea40", - "metadata": {}, - "outputs": [], - "source": [ - "directory_path = os.path.expanduser(\"~/.codeflare/resources/\")\n", - "outfile = os.path.join(directory_path, \"mnisthpotest.yaml\")\n", - "cluster_yaml = None\n", - "with open(outfile) as f:\n", - " cluster_yaml = yaml.load(f, yaml.FullLoader)\n", - "\n", - "# Add toleration for GPU nodes to Ray cluster worker pod\n", - "cluster_yaml[\"spec\"][\"workerGroupSpecs\"][0][\"template\"][\"spec\"][\"tolerations\"]=[{\"key\": \"nvidia.com/gpu\", \"value\": \"NONE\", \"effect\": \"NoSchedule\"}]\n", - "\n", - "with open(outfile, \"w\") as f:\n", - " yaml.dump(cluster_yaml, f, default_flow_style=False)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f0884bbc-c224-4ca0-98a0-02dfa09c2200", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "# Bring up the cluster\n", - "cluster.up()\n", - "# Wait until status is updated\n", - "cluster.wait_ready()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "df71c1ed", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "cluster.status()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "7fd45bc5-03c0-4ae5-9ec5-dd1c30f1a084", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "cluster.details()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "47ca5c15", - "metadata": {}, - "outputs": [], - "source": [ - "ray_dashboard = cluster.cluster_dashboard_uri()\n", - "header = {\"Authorization\": f\"Bearer {kubernetes_user_bearer_token}\"}\n", - "client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\n", - "\n", - "submission_id = client.submit_job(\n", - " entrypoint=\"python mnist_hpo.py\",\n", - " runtime_env={\n", - " \"working_dir\": \"/opt/app-root/notebooks/..data\",\n", - " \"pip\": \"/opt/app-root/notebooks/hpo_raytune_requirements.txt\",\n", - " },\n", - " # entrypoint_num_gpus is not required here as the mnist_hpo script executes in parallel and requires more GPUs for each iteration\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f63a178a", - "metadata": {}, - "outputs": [], - "source": [ - "finished = False\n", - "while not finished:\n", - " sleep(1)\n", - " status = client.get_job_status(submission_id)\n", - " finished = (status == \"SUCCEEDED\")\n", - "if finished:\n", - " print(\"Job completed Successfully !\")\n", - "else:\n", - " print(\"Job failed !\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "6b099777", - "metadata": {}, - "outputs": [], - "source": [ - "cluster.down()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.18" - }, - "vscode": { - "interpreter": { - "hash": "f9f85f796d01129d0dd105a088854619f454435301f6ffec2fea96ecbd9be4ac" - } - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "b55bc3ea-4ce3-49bf-bb1f-e209de8ca47a", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import os\n", + "import yaml\n", + "\n", + "# Import pieces from codeflare-sdk\n", + "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication\n", + "from codeflare_sdk.job import RayJobClient\n", + "from time import sleep" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f6fb4c03", + "metadata": {}, + "outputs": [], + "source": [ + "%pip show codeflare-sdk" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "30888aed", + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "#parameters\n", + "namespace = \"default\"\n", + "ray_image = \"has to be specified\"\n", + "openshift_api_url = \"has to be specified\"\n", + "kubernetes_user_bearer_token = \"has to be specified\"\n", + "num_gpus = \"has to be specified\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a0538160", + "metadata": {}, + "outputs": [], + "source": [ + "auth = TokenAuthentication(\n", + " token=kubernetes_user_bearer_token,\n", + " server=openshift_api_url,\n", + " skip_tls=True,\n", + ")\n", + "auth.login()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0f4bc870-091f-4e11-9642-cba145710159", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Create ray cluster\n", + "cluster = Cluster(\n", + " ClusterConfiguration(\n", + " namespace=namespace,\n", + " name='mnisthpotest',\n", + " head_cpus=1,\n", + " head_memory=4,\n", + " head_gpus=0,\n", + " num_workers=1,\n", + " min_cpus=1,\n", + " max_cpus=1,\n", + " min_memory=1,\n", + " max_memory=4,\n", + " num_gpus=int(num_gpus),\n", + " image=ray_image,\n", + " write_to_file=True,\n", + " verify_tls=False\n", + " )\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "04a5ea40", + "metadata": {}, + "outputs": [], + "source": [ + "directory_path = os.path.expanduser(\"~/.codeflare/resources/\")\n", + "outfile = os.path.join(directory_path, \"mnisthpotest.yaml\")\n", + "cluster_yaml = None\n", + "with open(outfile) as f:\n", + " cluster_yaml = yaml.load(f, yaml.FullLoader)\n", + "\n", + "# Add toleration for GPU nodes to Ray cluster worker pod\n", + "cluster_yaml[\"spec\"][\"workerGroupSpecs\"][0][\"template\"][\"spec\"][\"tolerations\"]=[{\"key\": \"nvidia.com/gpu\", \"value\": \"NONE\", \"effect\": \"NoSchedule\"}]\n", + "\n", + "with open(outfile, \"w\") as f:\n", + " yaml.dump(cluster_yaml, f, default_flow_style=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f0884bbc-c224-4ca0-98a0-02dfa09c2200", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Bring up the cluster\n", + "cluster.up()\n", + "# Wait until status is updated\n", + "cluster.wait_ready()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "df71c1ed", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "cluster.status()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7fd45bc5-03c0-4ae5-9ec5-dd1c30f1a084", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "cluster.details()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "47ca5c15", + "metadata": {}, + "outputs": [], + "source": [ + "ray_dashboard = cluster.cluster_dashboard_uri()\n", + "header = {\"Authorization\": f\"Bearer {kubernetes_user_bearer_token}\"}\n", + "client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\n", + "\n", + "submission_id = client.submit_job(\n", + " entrypoint=\"python mnist_hpo.py\",\n", + " runtime_env={\n", + " \"env_vars\": {\n", + " \"PIP_INDEX_URL\":os.environ.get(\"PIP_INDEX_URL\"),\n", + " \"PIP_TRUSTED_HOST\":os.environ.get(\"PIP_TRUSTED_HOST\"),\n", + " },\n", + " \"working_dir\": \"/opt/app-root/notebooks/..data\",\n", + " \"pip\": \"/opt/app-root/notebooks/hpo_raytune_requirements.txt\",\n", + " },\n", + " # entrypoint_num_gpus is not required here as the mnist_hpo script executes in parallel and requires more GPUs for each iteration\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f63a178a", + "metadata": {}, + "outputs": [], + "source": [ + "finished = False\n", + "while not finished:\n", + " sleep(1)\n", + " status = client.get_job_status(submission_id)\n", + " finished = (status == \"SUCCEEDED\")\n", + "if finished:\n", + " print(\"Job completed Successfully !\")\n", + "else:\n", + " print(\"Job failed !\")\n", + "\n", + "sleep(10) # For avoiding race condition(raycluster gets deleted as soon as notebook execution completes after rayjob submission gets succeeded) and to assert rayjob success status before raycluster gets deleted during test excution\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6b099777", + "metadata": {}, + "outputs": [], + "source": [ + "cluster.down()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.18" + }, + "vscode": { + "interpreter": { + "hash": "f9f85f796d01129d0dd105a088854619f454435301f6ffec2fea96ecbd9be4ac" + } + } + }, + "nbformat": 4, + "nbformat_minor": 5 + } \ No newline at end of file diff --git a/tests/odh/resources/mnist_ray_mini.ipynb b/tests/odh/resources/mnist_ray_mini.ipynb index 58c30c46..61f6f24c 100644 --- a/tests/odh/resources/mnist_ray_mini.ipynb +++ b/tests/odh/resources/mnist_ray_mini.ipynb @@ -1,232 +1,236 @@ { - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "id": "b55bc3ea-4ce3-49bf-bb1f-e209de8ca47a", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "import os\n", - "import yaml\n", - "\n", - "# Import pieces from codeflare-sdk\n", - "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication\n", - "from codeflare_sdk.job import RayJobClient\n", - "from time import sleep" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f6fb4c03", - "metadata": {}, - "outputs": [], - "source": [ - "%pip show codeflare-sdk" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "30888aed", - "metadata": { - "tags": [ - "parameters" - ] - }, - "outputs": [], - "source": [ - "#parameters\n", - "namespace = \"default\"\n", - "ray_image = \"has to be specified\"\n", - "local_queue = \"has to be specified\"\n", - "openshift_api_url = \"has to be specified\"\n", - "kubernetes_user_bearer_token = \"has to be specified\"\n", - "num_gpus = \"has to be specified\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a0538160", - "metadata": {}, - "outputs": [], - "source": [ - "auth = TokenAuthentication(\n", - " token=kubernetes_user_bearer_token,\n", - " server=openshift_api_url,\n", - " skip_tls=True,\n", - ")\n", - "auth.login()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "0f4bc870-091f-4e11-9642-cba145710159", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "# Create our cluster and submit appwrapper\n", - "cluster = Cluster(\n", - " ClusterConfiguration(\n", - " namespace=namespace,\n", - " name='mnisttest',\n", - " head_cpus=1,\n", - " head_memory=4,\n", - " head_extended_resource_requests={'nvidia.com/gpu':0},\n", - " num_workers=1,\n", - " worker_cpu_requests=1,\n", - " worker_cpu_limits=1,\n", - " worker_memory_requests=1,\n", - " worker_memory_limits=4,\n", - " worker_extended_resource_requests={'nvidia.com/gpu': int(num_gpus)},\n", - " image=ray_image,\n", - " local_queue=local_queue,\n", - " write_to_file=True,\n", - " verify_tls=False\n", - " )\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "04a5ea40", - "metadata": {}, - "outputs": [], - "source": [ - "directory_path = os.path.expanduser(\"~/.codeflare/resources/\")\n", - "outfile = os.path.join(directory_path, \"mnisttest.yaml\")\n", - "cluster_yaml = None\n", - "with open(outfile) as f:\n", - " cluster_yaml = yaml.load(f, yaml.FullLoader)\n", - "\n", - "# Add toleration for GPU nodes to Ray cluster worker pod\n", - "cluster_yaml[\"spec\"][\"workerGroupSpecs\"][0][\"template\"][\"spec\"][\"tolerations\"]=[{\"key\": \"nvidia.com/gpu\", \"value\": \"NONE\", \"effect\": \"NoSchedule\"}]\n", - "\n", - "with open(outfile, \"w\") as f:\n", - " yaml.dump(cluster_yaml, f, default_flow_style=False)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f0884bbc-c224-4ca0-98a0-02dfa09c2200", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "# Bring up the cluster\n", - "cluster.up()\n", - "# Wait until status is updated\n", - "cluster.wait_ready()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "df71c1ed", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "cluster.status()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "7fd45bc5-03c0-4ae5-9ec5-dd1c30f1a084", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "cluster.details()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "47ca5c15", - "metadata": {}, - "outputs": [], - "source": [ - "ray_dashboard = cluster.cluster_dashboard_uri()\n", - "header = {\"Authorization\": f\"Bearer {kubernetes_user_bearer_token}\"}\n", - "client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\n", - "\n", - "submission_id = client.submit_job(\n", - " entrypoint=\"python mnist.py\",\n", - " runtime_env={\n", - " \"working_dir\": \"/opt/app-root/notebooks/..data\",\n", - " \"pip\": \"/opt/app-root/notebooks/requirements.txt\",\n", - " },\n", - " entrypoint_num_gpus=num_gpus\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f63a178a", - "metadata": {}, - "outputs": [], - "source": [ - "finished = False\n", - "while not finished:\n", - " sleep(1)\n", - " status = client.get_job_status(submission_id)\n", - " finished = (status == \"SUCCEEDED\")\n", - "if finished:\n", - " print(\"Job completed Successfully !\")\n", - "else:\n", - " print(\"Job failed !\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "6b099777", - "metadata": {}, - "outputs": [], - "source": [ - "cluster.down()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.18" - }, - "vscode": { - "interpreter": { - "hash": "f9f85f796d01129d0dd105a088854619f454435301f6ffec2fea96ecbd9be4ac" - } - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "b55bc3ea-4ce3-49bf-bb1f-e209de8ca47a", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import os\n", + "import yaml\n", + "\n", + "# Import pieces from codeflare-sdk\n", + "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication\n", + "from codeflare_sdk.job import RayJobClient\n", + "from time import sleep" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f6fb4c03", + "metadata": {}, + "outputs": [], + "source": [ + "%pip show codeflare-sdk" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "30888aed", + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "#parameters\n", + "namespace = \"default\"\n", + "ray_image = \"has to be specified\"\n", + "openshift_api_url = \"has to be specified\"\n", + "kubernetes_user_bearer_token = \"has to be specified\"\n", + "num_gpus = \"has to be specified\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a0538160", + "metadata": {}, + "outputs": [], + "source": [ + "auth = TokenAuthentication(\n", + " token=kubernetes_user_bearer_token,\n", + " server=openshift_api_url,\n", + " skip_tls=True,\n", + ")\n", + "auth.login()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0f4bc870-091f-4e11-9642-cba145710159", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Create our cluster and submit appwrapper\n", + "cluster = Cluster(\n", + " ClusterConfiguration(\n", + " namespace=namespace,\n", + " name='mnisttest',\n", + " head_cpus=1,\n", + " head_memory=4,\n", + " head_gpus=0,\n", + " num_workers=1,\n", + " min_cpus=1,\n", + " max_cpus=1,\n", + " min_memory=1,\n", + " max_memory=4,\n", + " num_gpus=int(num_gpus),\n", + " image=ray_image,\n", + " write_to_file=True,\n", + " verify_tls=False\n", + " )\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "04a5ea40", + "metadata": {}, + "outputs": [], + "source": [ + "directory_path = os.path.expanduser(\"~/.codeflare/resources/\")\n", + "outfile = os.path.join(directory_path, \"mnisttest.yaml\")\n", + "cluster_yaml = None\n", + "with open(outfile) as f:\n", + " cluster_yaml = yaml.load(f, yaml.FullLoader)\n", + "\n", + "# Add toleration for GPU nodes to Ray cluster worker pod\n", + "cluster_yaml[\"spec\"][\"workerGroupSpecs\"][0][\"template\"][\"spec\"][\"tolerations\"]=[{\"key\": \"nvidia.com/gpu\", \"value\": \"NONE\", \"effect\": \"NoSchedule\"}]\n", + "\n", + "with open(outfile, \"w\") as f:\n", + " yaml.dump(cluster_yaml, f, default_flow_style=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f0884bbc-c224-4ca0-98a0-02dfa09c2200", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Bring up the cluster\n", + "cluster.up()\n", + "# Wait until status is updated\n", + "cluster.wait_ready()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "df71c1ed", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "cluster.status()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7fd45bc5-03c0-4ae5-9ec5-dd1c30f1a084", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "cluster.details()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "47ca5c15", + "metadata": {}, + "outputs": [], + "source": [ + "ray_dashboard = cluster.cluster_dashboard_uri()\n", + "header = {\"Authorization\": f\"Bearer {kubernetes_user_bearer_token}\"}\n", + "client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\n", + "\n", + "submission_id = client.submit_job(\n", + " entrypoint=\"python mnist.py\",\n", + " runtime_env={\n", + " \"env_vars\": {\n", + " \"PIP_INDEX_URL\":os.environ.get(\"PIP_INDEX_URL\"),\n", + " \"PIP_TRUSTED_HOST\":os.environ.get(\"PIP_TRUSTED_HOST\"),\n", + " },\n", + " \"working_dir\": \"/opt/app-root/notebooks/..data\",\n", + " \"pip\": \"/opt/app-root/notebooks/requirements.txt\",\n", + " },\n", + " entrypoint_num_gpus=num_gpus\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f63a178a", + "metadata": {}, + "outputs": [], + "source": [ + "finished = False\n", + "while not finished:\n", + " sleep(1)\n", + " status = client.get_job_status(submission_id)\n", + " finished = (status == \"SUCCEEDED\")\n", + "if finished:\n", + " print(\"Job completed Successfully !\")\n", + "else:\n", + " print(\"Job failed !\")\n", + "\n", + "sleep(10) # For avoiding race condition(raycluster gets deleted as soon as notebook execution completes after rayjob submission gets succeeded) and to assert rayjob success status before raycluster gets deleted during test excution\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6b099777", + "metadata": {}, + "outputs": [], + "source": [ + "cluster.down()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.18" + }, + "vscode": { + "interpreter": { + "hash": "f9f85f796d01129d0dd105a088854619f454435301f6ffec2fea96ecbd9be4ac" + } + } + }, + "nbformat": 4, + "nbformat_minor": 5 + } \ No newline at end of file diff --git a/tests/odh/resources/requirements.txt b/tests/odh/resources/requirements.txt index 087acfc5..aa805d62 100644 --- a/tests/odh/resources/requirements.txt +++ b/tests/odh/resources/requirements.txt @@ -1,5 +1,3 @@ -{{.PipIndexUrl}} -{{.PipTrustedHost}} pytorch_lightning==1.9.5 ray_lightning torchmetrics==0.9.1 diff --git a/tests/odh/support.go b/tests/odh/support.go index d524c868..b84fce3c 100644 --- a/tests/odh/support.go +++ b/tests/odh/support.go @@ -18,15 +18,15 @@ package odh import ( "embed" + "net/http" + "net/url" - "github.com/onsi/gomega" . "github.com/onsi/gomega" + gomega "github.com/onsi/gomega" "github.com/project-codeflare/codeflare-common/support" . "github.com/project-codeflare/codeflare-common/support" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/api/core/v1" ) //go:embed resources/* @@ -39,17 +39,30 @@ func ReadFile(t support.Test, fileName string) []byte { return file } -// TODO: This belongs on codeflare-common/support/ray.go -func rayClusters(t Test, namespace *corev1.Namespace) func(g Gomega) []*rayv1.RayCluster { - return func(g Gomega) []*rayv1.RayCluster { - rcs, err := t.Client().Ray().RayV1().RayClusters(namespace.Name).List(t.Ctx(), metav1.ListOptions{}) - g.Expect(err).NotTo(HaveOccurred()) +func GetDashboardUrl(test support.Test, namespace *v1.Namespace, rayCluster *rayv1.RayCluster) *url.URL { + dashboardName := "ray-dashboard-" + rayCluster.Name + test.T().Logf("Raycluster created : %s\n", rayCluster.Name) + route := GetRoute(test, namespace.Name, dashboardName) + hostname := route.Status.Ingress[0].Host + dashboardUrl, _ := url.Parse("https://" + hostname) + test.T().Logf("Ray-dashboard route : %s\n", dashboardUrl.String()) + + return dashboardUrl +} + +func GetTestJobId(test Test, rayClient RayClusterClient, hostName string) string { + listJobsReq, err := http.NewRequest("GET", "https://"+hostName+"/api/jobs/", nil) + if err != nil { + test.T().Errorf("failed to do get request: %s\n", err) + } + listJobsReq.Header.Add("Authorization", "Bearer "+test.Config().BearerToken) - rcsp := []*rayv1.RayCluster{} - for _, v := range rcs.Items { - rcsp = append(rcsp, &v) - } + allJobsData, err := rayClient.GetJobs() + test.Expect(err).ToNot(HaveOccurred()) - return rcsp + jobID := (*allJobsData)[0].SubmissionID + if len(*allJobsData) > 0 { + test.T().Logf("Ray job has been successfully submitted to the raycluster with Submission-ID : %s\n", jobID) } + return jobID } From 872fa9c6ae8e55bd86f4249614780682408ee328 Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Tue, 27 Aug 2024 18:16:13 +0530 Subject: [PATCH 2/3] Replaced 30 seconds time buffer with polling logic to wait until rayjob exists --- tests/odh/mnist_ray_test.go | 32 ++++++++++++++++------------ tests/odh/mnist_raytune_hpo_test.go | 33 +++++++++++++++++------------ tests/odh/support.go | 8 ------- 3 files changed, 39 insertions(+), 34 deletions(-) diff --git a/tests/odh/mnist_ray_test.go b/tests/odh/mnist_ray_test.go index e52a8fae..728b51e7 100644 --- a/tests/odh/mnist_ray_test.go +++ b/tests/odh/mnist_ray_test.go @@ -129,10 +129,9 @@ func mnistRay(t *testing.T, numGpus int) { ), ) - time.Sleep(30 * time.Second) - // Fetch created raycluster rayClusterName := "mnisttest" + // Wait until raycluster is up and running rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{}) test.Expect(err).ToNot(HaveOccurred()) @@ -140,39 +139,46 @@ func mnistRay(t *testing.T, numGpus int) { dashboardUrl := GetDashboardUrl(test, namespace, rayCluster) rayClusterClientConfig := RayClusterClientConfig{Address: dashboardUrl.String(), Client: nil, InsecureSkipVerify: true} rayClient, err := NewRayClusterClient(rayClusterClientConfig, test.Config().BearerToken) - if err != nil { - test.T().Errorf("%s", err) - } + test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to create new raycluster client: %s", err)) + // wait until rayjob exists + test.Eventually(func() []RayJobDetailsResponse { + rayJobs, err := rayClient.GetJobs() + test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to fetch ray-jobs : %s", err)) + return *rayJobs + }, TestTimeoutMedium, 2*time.Second).Should(HaveLen(1), "Ray job not found") + + // Get test job-id jobID := GetTestJobId(test, rayClient, dashboardUrl.Host) - test.Expect(jobID).ToNot(Equal(nil)) + test.Expect(jobID).ToNot(BeEmpty()) // Wait for the job to be succeeded or failed var rayJobStatus string - fmt.Printf("Waiting for job to be Succeeded...\n") + test.T().Logf("Waiting for job to be Succeeded...\n") test.Eventually(func() string { resp, err := rayClient.GetJobDetails(jobID) - test.Expect(err).ToNot(HaveOccurred()) + test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to get job details :%s", err)) rayJobStatusVal := resp.Status if rayJobStatusVal == "SUCCEEDED" || rayJobStatusVal == "FAILED" { - fmt.Printf("JobStatus : %s\n", rayJobStatusVal) + test.T().Logf("JobStatus - %s\n", rayJobStatusVal) rayJobStatus = rayJobStatusVal return rayJobStatus } if rayJobStatus != rayJobStatusVal && rayJobStatusVal != "SUCCEEDED" { - fmt.Printf("JobStatus : %s...\n", rayJobStatusVal) + test.T().Logf("JobStatus - %s...\n", rayJobStatusVal) rayJobStatus = rayJobStatusVal } return rayJobStatus }, TestTimeoutDouble, 3*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time") - test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !") - // Store job logs in output directory WriteRayJobAPILogs(test, rayClient, jobID) + // Assert ray-job status after job execution + test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !") + // Make sure the RayCluster finishes and is deleted test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong). - Should(HaveLen(0)) + Should(BeEmpty()) } func readMnistScriptTemplate(test Test, filePath string) []byte { diff --git a/tests/odh/mnist_raytune_hpo_test.go b/tests/odh/mnist_raytune_hpo_test.go index e1e32a8d..11d3e02e 100644 --- a/tests/odh/mnist_raytune_hpo_test.go +++ b/tests/odh/mnist_raytune_hpo_test.go @@ -129,10 +129,10 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) { ContainElement(WithTransform(KueueWorkloadAdmitted, BeTrueBecause("Workload failed to be admitted"))), ), ) - time.Sleep(30 * time.Second) // Fetch created raycluster rayClusterName := "mnisthpotest" + // Wait until raycluster is up and running rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{}) test.Expect(err).ToNot(HaveOccurred()) @@ -140,37 +140,44 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) { dashboardUrl := GetDashboardUrl(test, namespace, rayCluster) rayClusterClientConfig := RayClusterClientConfig{Address: dashboardUrl.String(), Client: nil, InsecureSkipVerify: true} rayClient, err := NewRayClusterClient(rayClusterClientConfig, test.Config().BearerToken) - if err != nil { - test.T().Errorf("%s", err) - } + test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to create new raycluster client: %s", err)) + + // Wait until the rayjob is created and running + test.Eventually(func() []RayJobDetailsResponse { + rayJobs, err := rayClient.GetJobs() + test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to fetch ray-jobs : %s", err)) + return *rayJobs + }, TestTimeoutMedium, 2*time.Second).Should(HaveLen(1), "Ray job not found") + // Get rayjob-ID jobID := GetTestJobId(test, rayClient, dashboardUrl.Host) - test.Expect(jobID).ToNot(Equal(nil)) + test.Expect(jobID).ToNot(BeEmpty()) - // Wait for the job to be succeeded or failed + // Wait for the job to either succeed or fail var rayJobStatus string - fmt.Printf("Waiting for job to be Succeeded...\n") + test.T().Logf("Waiting for job to be Succeeded...\n") test.Eventually(func() string { resp, err := rayClient.GetJobDetails(jobID) - test.Expect(err).ToNot(HaveOccurred()) + test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to get job details :%s", err)) rayJobStatusVal := resp.Status if rayJobStatusVal == "SUCCEEDED" || rayJobStatusVal == "FAILED" { - fmt.Printf("JobStatus : %s\n", rayJobStatusVal) + test.T().Logf("JobStatus - %s\n", rayJobStatusVal) rayJobStatus = rayJobStatusVal return rayJobStatus } if rayJobStatus != rayJobStatusVal && rayJobStatusVal != "SUCCEEDED" { - fmt.Printf("JobStatus : %s...\n", rayJobStatusVal) + test.T().Logf("JobStatus - %s...\n", rayJobStatusVal) rayJobStatus = rayJobStatusVal } return rayJobStatus }, TestTimeoutDouble, 3*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time") - test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !") - // Store job logs in output directory WriteRayJobAPILogs(test, rayClient, jobID) + // Assert ray-job status after job execution + test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !") + // Make sure the RayCluster finishes and is deleted test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong). - Should(HaveLen(0)) + Should(BeEmpty()) } diff --git a/tests/odh/support.go b/tests/odh/support.go index b84fce3c..d73e4e91 100644 --- a/tests/odh/support.go +++ b/tests/odh/support.go @@ -18,7 +18,6 @@ package odh import ( "embed" - "net/http" "net/url" . "github.com/onsi/gomega" @@ -41,7 +40,6 @@ func ReadFile(t support.Test, fileName string) []byte { func GetDashboardUrl(test support.Test, namespace *v1.Namespace, rayCluster *rayv1.RayCluster) *url.URL { dashboardName := "ray-dashboard-" + rayCluster.Name - test.T().Logf("Raycluster created : %s\n", rayCluster.Name) route := GetRoute(test, namespace.Name, dashboardName) hostname := route.Status.Ingress[0].Host dashboardUrl, _ := url.Parse("https://" + hostname) @@ -51,12 +49,6 @@ func GetDashboardUrl(test support.Test, namespace *v1.Namespace, rayCluster *ray } func GetTestJobId(test Test, rayClient RayClusterClient, hostName string) string { - listJobsReq, err := http.NewRequest("GET", "https://"+hostName+"/api/jobs/", nil) - if err != nil { - test.T().Errorf("failed to do get request: %s\n", err) - } - listJobsReq.Header.Add("Authorization", "Bearer "+test.Config().BearerToken) - allJobsData, err := rayClient.GetJobs() test.Expect(err).ToNot(HaveOccurred()) From 42983499dfdfebacef4271cabf8082abf2d94e02 Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Tue, 27 Aug 2024 19:06:17 +0530 Subject: [PATCH 3/3] Update the test raycluster configuration parameters with the latest SDK options and reduced polling logic time interval to 1 sec --- tests/odh/mnist_ray_test.go | 4 +- tests/odh/mnist_raytune_hpo_test.go | 4 +- tests/odh/resources/mnist_hpo_raytune.ipynb | 470 ++++++++++---------- tests/odh/resources/mnist_ray_mini.ipynb | 470 ++++++++++---------- 4 files changed, 474 insertions(+), 474 deletions(-) diff --git a/tests/odh/mnist_ray_test.go b/tests/odh/mnist_ray_test.go index 728b51e7..0d74b2a5 100644 --- a/tests/odh/mnist_ray_test.go +++ b/tests/odh/mnist_ray_test.go @@ -146,7 +146,7 @@ func mnistRay(t *testing.T, numGpus int) { rayJobs, err := rayClient.GetJobs() test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to fetch ray-jobs : %s", err)) return *rayJobs - }, TestTimeoutMedium, 2*time.Second).Should(HaveLen(1), "Ray job not found") + }, TestTimeoutMedium, 1*time.Second).Should(HaveLen(1), "Ray job not found") // Get test job-id jobID := GetTestJobId(test, rayClient, dashboardUrl.Host) @@ -169,7 +169,7 @@ func mnistRay(t *testing.T, numGpus int) { rayJobStatus = rayJobStatusVal } return rayJobStatus - }, TestTimeoutDouble, 3*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time") + }, TestTimeoutDouble, 1*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time") // Store job logs in output directory WriteRayJobAPILogs(test, rayClient, jobID) diff --git a/tests/odh/mnist_raytune_hpo_test.go b/tests/odh/mnist_raytune_hpo_test.go index 11d3e02e..dbe436d4 100644 --- a/tests/odh/mnist_raytune_hpo_test.go +++ b/tests/odh/mnist_raytune_hpo_test.go @@ -147,7 +147,7 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) { rayJobs, err := rayClient.GetJobs() test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to fetch ray-jobs : %s", err)) return *rayJobs - }, TestTimeoutMedium, 2*time.Second).Should(HaveLen(1), "Ray job not found") + }, TestTimeoutMedium, 1*time.Second).Should(HaveLen(1), "Ray job not found") // Get rayjob-ID jobID := GetTestJobId(test, rayClient, dashboardUrl.Host) @@ -170,7 +170,7 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) { rayJobStatus = rayJobStatusVal } return rayJobStatus - }, TestTimeoutDouble, 3*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time") + }, TestTimeoutDouble, 1*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time") // Store job logs in output directory WriteRayJobAPILogs(test, rayClient, jobID) diff --git a/tests/odh/resources/mnist_hpo_raytune.ipynb b/tests/odh/resources/mnist_hpo_raytune.ipynb index 83c42563..6179d213 100644 --- a/tests/odh/resources/mnist_hpo_raytune.ipynb +++ b/tests/odh/resources/mnist_hpo_raytune.ipynb @@ -1,236 +1,236 @@ { - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "id": "b55bc3ea-4ce3-49bf-bb1f-e209de8ca47a", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "import os\n", - "import yaml\n", - "\n", - "# Import pieces from codeflare-sdk\n", - "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication\n", - "from codeflare_sdk.job import RayJobClient\n", - "from time import sleep" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f6fb4c03", - "metadata": {}, - "outputs": [], - "source": [ - "%pip show codeflare-sdk" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "30888aed", - "metadata": { - "tags": [ - "parameters" - ] - }, - "outputs": [], - "source": [ - "#parameters\n", - "namespace = \"default\"\n", - "ray_image = \"has to be specified\"\n", - "openshift_api_url = \"has to be specified\"\n", - "kubernetes_user_bearer_token = \"has to be specified\"\n", - "num_gpus = \"has to be specified\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a0538160", - "metadata": {}, - "outputs": [], - "source": [ - "auth = TokenAuthentication(\n", - " token=kubernetes_user_bearer_token,\n", - " server=openshift_api_url,\n", - " skip_tls=True,\n", - ")\n", - "auth.login()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "0f4bc870-091f-4e11-9642-cba145710159", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "# Create ray cluster\n", - "cluster = Cluster(\n", - " ClusterConfiguration(\n", - " namespace=namespace,\n", - " name='mnisthpotest',\n", - " head_cpus=1,\n", - " head_memory=4,\n", - " head_gpus=0,\n", - " num_workers=1,\n", - " min_cpus=1,\n", - " max_cpus=1,\n", - " min_memory=1,\n", - " max_memory=4,\n", - " num_gpus=int(num_gpus),\n", - " image=ray_image,\n", - " write_to_file=True,\n", - " verify_tls=False\n", - " )\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "04a5ea40", - "metadata": {}, - "outputs": [], - "source": [ - "directory_path = os.path.expanduser(\"~/.codeflare/resources/\")\n", - "outfile = os.path.join(directory_path, \"mnisthpotest.yaml\")\n", - "cluster_yaml = None\n", - "with open(outfile) as f:\n", - " cluster_yaml = yaml.load(f, yaml.FullLoader)\n", - "\n", - "# Add toleration for GPU nodes to Ray cluster worker pod\n", - "cluster_yaml[\"spec\"][\"workerGroupSpecs\"][0][\"template\"][\"spec\"][\"tolerations\"]=[{\"key\": \"nvidia.com/gpu\", \"value\": \"NONE\", \"effect\": \"NoSchedule\"}]\n", - "\n", - "with open(outfile, \"w\") as f:\n", - " yaml.dump(cluster_yaml, f, default_flow_style=False)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f0884bbc-c224-4ca0-98a0-02dfa09c2200", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "# Bring up the cluster\n", - "cluster.up()\n", - "# Wait until status is updated\n", - "cluster.wait_ready()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "df71c1ed", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "cluster.status()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "7fd45bc5-03c0-4ae5-9ec5-dd1c30f1a084", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "cluster.details()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "47ca5c15", - "metadata": {}, - "outputs": [], - "source": [ - "ray_dashboard = cluster.cluster_dashboard_uri()\n", - "header = {\"Authorization\": f\"Bearer {kubernetes_user_bearer_token}\"}\n", - "client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\n", - "\n", - "submission_id = client.submit_job(\n", - " entrypoint=\"python mnist_hpo.py\",\n", - " runtime_env={\n", - " \"env_vars\": {\n", - " \"PIP_INDEX_URL\":os.environ.get(\"PIP_INDEX_URL\"),\n", - " \"PIP_TRUSTED_HOST\":os.environ.get(\"PIP_TRUSTED_HOST\"),\n", - " },\n", - " \"working_dir\": \"/opt/app-root/notebooks/..data\",\n", - " \"pip\": \"/opt/app-root/notebooks/hpo_raytune_requirements.txt\",\n", - " },\n", - " # entrypoint_num_gpus is not required here as the mnist_hpo script executes in parallel and requires more GPUs for each iteration\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f63a178a", - "metadata": {}, - "outputs": [], - "source": [ - "finished = False\n", - "while not finished:\n", - " sleep(1)\n", - " status = client.get_job_status(submission_id)\n", - " finished = (status == \"SUCCEEDED\")\n", - "if finished:\n", - " print(\"Job completed Successfully !\")\n", - "else:\n", - " print(\"Job failed !\")\n", - "\n", - "sleep(10) # For avoiding race condition(raycluster gets deleted as soon as notebook execution completes after rayjob submission gets succeeded) and to assert rayjob success status before raycluster gets deleted during test excution\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "6b099777", - "metadata": {}, - "outputs": [], - "source": [ - "cluster.down()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.18" - }, - "vscode": { - "interpreter": { - "hash": "f9f85f796d01129d0dd105a088854619f454435301f6ffec2fea96ecbd9be4ac" - } - } - }, - "nbformat": 4, - "nbformat_minor": 5 - } \ No newline at end of file + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "b55bc3ea-4ce3-49bf-bb1f-e209de8ca47a", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import os\n", + "import yaml\n", + "\n", + "# Import pieces from codeflare-sdk\n", + "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication\n", + "from codeflare_sdk.job import RayJobClient\n", + "from time import sleep" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f6fb4c03", + "metadata": {}, + "outputs": [], + "source": [ + "%pip show codeflare-sdk" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "30888aed", + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "#parameters\n", + "namespace = \"default\"\n", + "ray_image = \"has to be specified\"\n", + "openshift_api_url = \"has to be specified\"\n", + "kubernetes_user_bearer_token = \"has to be specified\"\n", + "num_gpus = \"has to be specified\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a0538160", + "metadata": {}, + "outputs": [], + "source": [ + "auth = TokenAuthentication(\n", + " token=kubernetes_user_bearer_token,\n", + " server=openshift_api_url,\n", + " skip_tls=True,\n", + ")\n", + "auth.login()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0f4bc870-091f-4e11-9642-cba145710159", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Create ray cluster\n", + "cluster = Cluster(\n", + " ClusterConfiguration(\n", + " namespace=namespace,\n", + " name='mnisthpotest',\n", + " head_cpus=1,\n", + " head_memory=4,\n", + " head_extended_resource_requests={'nvidia.com/gpu':0},\n", + " num_workers=1,\n", + " worker_cpu_requests=1,\n", + " worker_cpu_limits=1,\n", + " worker_memory_requests=1,\n", + " worker_memory_limits=4,\n", + " worker_extended_resource_requests={'nvidia.com/gpu':int(num_gpus)},\n", + " image=ray_image,\n", + " write_to_file=True,\n", + " verify_tls=False\n", + " )\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "04a5ea40", + "metadata": {}, + "outputs": [], + "source": [ + "directory_path = os.path.expanduser(\"~/.codeflare/resources/\")\n", + "outfile = os.path.join(directory_path, \"mnisthpotest.yaml\")\n", + "cluster_yaml = None\n", + "with open(outfile) as f:\n", + " cluster_yaml = yaml.load(f, yaml.FullLoader)\n", + "\n", + "# Add toleration for GPU nodes to Ray cluster worker pod\n", + "cluster_yaml[\"spec\"][\"workerGroupSpecs\"][0][\"template\"][\"spec\"][\"tolerations\"]=[{\"key\": \"nvidia.com/gpu\", \"value\": \"NONE\", \"effect\": \"NoSchedule\"}]\n", + "\n", + "with open(outfile, \"w\") as f:\n", + " yaml.dump(cluster_yaml, f, default_flow_style=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f0884bbc-c224-4ca0-98a0-02dfa09c2200", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Bring up the cluster\n", + "cluster.up()\n", + "# Wait until status is updated\n", + "cluster.wait_ready()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "df71c1ed", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "cluster.status()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7fd45bc5-03c0-4ae5-9ec5-dd1c30f1a084", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "cluster.details()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "47ca5c15", + "metadata": {}, + "outputs": [], + "source": [ + "ray_dashboard = cluster.cluster_dashboard_uri()\n", + "header = {\"Authorization\": f\"Bearer {kubernetes_user_bearer_token}\"}\n", + "client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\n", + "\n", + "submission_id = client.submit_job(\n", + " entrypoint=\"python mnist_hpo.py\",\n", + " runtime_env={\n", + " \"env_vars\": {\n", + " \"PIP_INDEX_URL\":os.environ.get(\"PIP_INDEX_URL\"),\n", + " \"PIP_TRUSTED_HOST\":os.environ.get(\"PIP_TRUSTED_HOST\"),\n", + " },\n", + " \"working_dir\": \"/opt/app-root/notebooks/..data\",\n", + " \"pip\": \"/opt/app-root/notebooks/hpo_raytune_requirements.txt\",\n", + " },\n", + " # entrypoint_num_gpus is not required here as the mnist_hpo script executes in parallel and requires more GPUs for each iteration\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f63a178a", + "metadata": {}, + "outputs": [], + "source": [ + "finished = False\n", + "while not finished:\n", + " sleep(1)\n", + " status = client.get_job_status(submission_id)\n", + " finished = (status == \"SUCCEEDED\")\n", + "if finished:\n", + " print(\"Job completed Successfully !\")\n", + "else:\n", + " print(\"Job failed !\")\n", + "\n", + "sleep(10) # For avoiding race condition(raycluster gets deleted as soon as notebook execution completes after rayjob submission gets succeeded) and to assert rayjob success status before raycluster gets deleted during test excution\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6b099777", + "metadata": {}, + "outputs": [], + "source": [ + "cluster.down()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.18" + }, + "vscode": { + "interpreter": { + "hash": "f9f85f796d01129d0dd105a088854619f454435301f6ffec2fea96ecbd9be4ac" + } + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/tests/odh/resources/mnist_ray_mini.ipynb b/tests/odh/resources/mnist_ray_mini.ipynb index 61f6f24c..fbabd227 100644 --- a/tests/odh/resources/mnist_ray_mini.ipynb +++ b/tests/odh/resources/mnist_ray_mini.ipynb @@ -1,236 +1,236 @@ { - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "id": "b55bc3ea-4ce3-49bf-bb1f-e209de8ca47a", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "import os\n", - "import yaml\n", - "\n", - "# Import pieces from codeflare-sdk\n", - "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication\n", - "from codeflare_sdk.job import RayJobClient\n", - "from time import sleep" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f6fb4c03", - "metadata": {}, - "outputs": [], - "source": [ - "%pip show codeflare-sdk" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "30888aed", - "metadata": { - "tags": [ - "parameters" - ] - }, - "outputs": [], - "source": [ - "#parameters\n", - "namespace = \"default\"\n", - "ray_image = \"has to be specified\"\n", - "openshift_api_url = \"has to be specified\"\n", - "kubernetes_user_bearer_token = \"has to be specified\"\n", - "num_gpus = \"has to be specified\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a0538160", - "metadata": {}, - "outputs": [], - "source": [ - "auth = TokenAuthentication(\n", - " token=kubernetes_user_bearer_token,\n", - " server=openshift_api_url,\n", - " skip_tls=True,\n", - ")\n", - "auth.login()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "0f4bc870-091f-4e11-9642-cba145710159", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "# Create our cluster and submit appwrapper\n", - "cluster = Cluster(\n", - " ClusterConfiguration(\n", - " namespace=namespace,\n", - " name='mnisttest',\n", - " head_cpus=1,\n", - " head_memory=4,\n", - " head_gpus=0,\n", - " num_workers=1,\n", - " min_cpus=1,\n", - " max_cpus=1,\n", - " min_memory=1,\n", - " max_memory=4,\n", - " num_gpus=int(num_gpus),\n", - " image=ray_image,\n", - " write_to_file=True,\n", - " verify_tls=False\n", - " )\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "04a5ea40", - "metadata": {}, - "outputs": [], - "source": [ - "directory_path = os.path.expanduser(\"~/.codeflare/resources/\")\n", - "outfile = os.path.join(directory_path, \"mnisttest.yaml\")\n", - "cluster_yaml = None\n", - "with open(outfile) as f:\n", - " cluster_yaml = yaml.load(f, yaml.FullLoader)\n", - "\n", - "# Add toleration for GPU nodes to Ray cluster worker pod\n", - "cluster_yaml[\"spec\"][\"workerGroupSpecs\"][0][\"template\"][\"spec\"][\"tolerations\"]=[{\"key\": \"nvidia.com/gpu\", \"value\": \"NONE\", \"effect\": \"NoSchedule\"}]\n", - "\n", - "with open(outfile, \"w\") as f:\n", - " yaml.dump(cluster_yaml, f, default_flow_style=False)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f0884bbc-c224-4ca0-98a0-02dfa09c2200", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "# Bring up the cluster\n", - "cluster.up()\n", - "# Wait until status is updated\n", - "cluster.wait_ready()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "df71c1ed", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "cluster.status()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "7fd45bc5-03c0-4ae5-9ec5-dd1c30f1a084", - "metadata": { - "tags": [] - }, - "outputs": [], - "source": [ - "cluster.details()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "47ca5c15", - "metadata": {}, - "outputs": [], - "source": [ - "ray_dashboard = cluster.cluster_dashboard_uri()\n", - "header = {\"Authorization\": f\"Bearer {kubernetes_user_bearer_token}\"}\n", - "client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\n", - "\n", - "submission_id = client.submit_job(\n", - " entrypoint=\"python mnist.py\",\n", - " runtime_env={\n", - " \"env_vars\": {\n", - " \"PIP_INDEX_URL\":os.environ.get(\"PIP_INDEX_URL\"),\n", - " \"PIP_TRUSTED_HOST\":os.environ.get(\"PIP_TRUSTED_HOST\"),\n", - " },\n", - " \"working_dir\": \"/opt/app-root/notebooks/..data\",\n", - " \"pip\": \"/opt/app-root/notebooks/requirements.txt\",\n", - " },\n", - " entrypoint_num_gpus=num_gpus\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f63a178a", - "metadata": {}, - "outputs": [], - "source": [ - "finished = False\n", - "while not finished:\n", - " sleep(1)\n", - " status = client.get_job_status(submission_id)\n", - " finished = (status == \"SUCCEEDED\")\n", - "if finished:\n", - " print(\"Job completed Successfully !\")\n", - "else:\n", - " print(\"Job failed !\")\n", - "\n", - "sleep(10) # For avoiding race condition(raycluster gets deleted as soon as notebook execution completes after rayjob submission gets succeeded) and to assert rayjob success status before raycluster gets deleted during test excution\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "6b099777", - "metadata": {}, - "outputs": [], - "source": [ - "cluster.down()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.18" - }, - "vscode": { - "interpreter": { - "hash": "f9f85f796d01129d0dd105a088854619f454435301f6ffec2fea96ecbd9be4ac" - } - } - }, - "nbformat": 4, - "nbformat_minor": 5 - } \ No newline at end of file + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "b55bc3ea-4ce3-49bf-bb1f-e209de8ca47a", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import os\n", + "import yaml\n", + "\n", + "# Import pieces from codeflare-sdk\n", + "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication\n", + "from codeflare_sdk.job import RayJobClient\n", + "from time import sleep" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f6fb4c03", + "metadata": {}, + "outputs": [], + "source": [ + "%pip show codeflare-sdk" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "30888aed", + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "#parameters\n", + "namespace = \"default\"\n", + "ray_image = \"has to be specified\"\n", + "openshift_api_url = \"has to be specified\"\n", + "kubernetes_user_bearer_token = \"has to be specified\"\n", + "num_gpus = \"has to be specified\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a0538160", + "metadata": {}, + "outputs": [], + "source": [ + "auth = TokenAuthentication(\n", + " token=kubernetes_user_bearer_token,\n", + " server=openshift_api_url,\n", + " skip_tls=True,\n", + ")\n", + "auth.login()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0f4bc870-091f-4e11-9642-cba145710159", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Create our cluster and submit appwrapper\n", + "cluster = Cluster(\n", + " ClusterConfiguration(\n", + " namespace=namespace,\n", + " name='mnisttest',\n", + " head_cpus=1,\n", + " head_memory=4,\n", + " head_extended_resource_requests={'nvidia.com/gpu':0},\n", + " num_workers=1,\n", + " worker_cpu_requests=1,\n", + " worker_cpu_limits=1,\n", + " worker_memory_requests=1,\n", + " worker_memory_limits=4,\n", + " worker_extended_resource_requests={'nvidia.com/gpu':int(num_gpus)},\n", + " image=ray_image,\n", + " write_to_file=True,\n", + " verify_tls=False\n", + " )\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "04a5ea40", + "metadata": {}, + "outputs": [], + "source": [ + "directory_path = os.path.expanduser(\"~/.codeflare/resources/\")\n", + "outfile = os.path.join(directory_path, \"mnisttest.yaml\")\n", + "cluster_yaml = None\n", + "with open(outfile) as f:\n", + " cluster_yaml = yaml.load(f, yaml.FullLoader)\n", + "\n", + "# Add toleration for GPU nodes to Ray cluster worker pod\n", + "cluster_yaml[\"spec\"][\"workerGroupSpecs\"][0][\"template\"][\"spec\"][\"tolerations\"]=[{\"key\": \"nvidia.com/gpu\", \"value\": \"NONE\", \"effect\": \"NoSchedule\"}]\n", + "\n", + "with open(outfile, \"w\") as f:\n", + " yaml.dump(cluster_yaml, f, default_flow_style=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f0884bbc-c224-4ca0-98a0-02dfa09c2200", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Bring up the cluster\n", + "cluster.up()\n", + "# Wait until status is updated\n", + "cluster.wait_ready()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "df71c1ed", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "cluster.status()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7fd45bc5-03c0-4ae5-9ec5-dd1c30f1a084", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "cluster.details()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "47ca5c15", + "metadata": {}, + "outputs": [], + "source": [ + "ray_dashboard = cluster.cluster_dashboard_uri()\n", + "header = {\"Authorization\": f\"Bearer {kubernetes_user_bearer_token}\"}\n", + "client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\n", + "\n", + "submission_id = client.submit_job(\n", + " entrypoint=\"python mnist.py\",\n", + " runtime_env={\n", + " \"env_vars\": {\n", + " \"PIP_INDEX_URL\":os.environ.get(\"PIP_INDEX_URL\"),\n", + " \"PIP_TRUSTED_HOST\":os.environ.get(\"PIP_TRUSTED_HOST\"),\n", + " },\n", + " \"working_dir\": \"/opt/app-root/notebooks/..data\",\n", + " \"pip\": \"/opt/app-root/notebooks/requirements.txt\",\n", + " },\n", + " entrypoint_num_gpus=num_gpus\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f63a178a", + "metadata": {}, + "outputs": [], + "source": [ + "finished = False\n", + "while not finished:\n", + " sleep(1)\n", + " status = client.get_job_status(submission_id)\n", + " finished = (status == \"SUCCEEDED\")\n", + "if finished:\n", + " print(\"Job completed Successfully !\")\n", + "else:\n", + " print(\"Job failed !\")\n", + "\n", + "sleep(10) # For avoiding race condition(raycluster gets deleted as soon as notebook execution completes after rayjob submission gets succeeded) and to assert rayjob success status before raycluster gets deleted during test excution\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6b099777", + "metadata": {}, + "outputs": [], + "source": [ + "cluster.down()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.18" + }, + "vscode": { + "interpreter": { + "hash": "f9f85f796d01129d0dd105a088854619f454435301f6ffec2fea96ecbd9be4ac" + } + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}