From 030e22d80c66f45ded91cf394b2576bdcf5754b5 Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Mon, 17 Feb 2025 13:42:38 +0100 Subject: [PATCH 1/3] Add logic and tests for ListJobs --- databricks/sdk/mixins/jobs.py | 57 +++++- tests/test_jobs_mixin.py | 354 +++++++++++++++++++++++++++++++++- 2 files changed, 406 insertions(+), 5 deletions(-) diff --git a/databricks/sdk/mixins/jobs.py b/databricks/sdk/mixins/jobs.py index d5e2a1728..6dd06a13d 100644 --- a/databricks/sdk/mixins/jobs.py +++ b/databricks/sdk/mixins/jobs.py @@ -1,11 +1,64 @@ -from typing import Optional +from typing import Iterator, Optional from databricks.sdk.service import jobs -from databricks.sdk.service.jobs import Job +from databricks.sdk.service.jobs import BaseJob, Job class JobsExt(jobs.JobsAPI): + def list(self, + *, + expand_tasks: Optional[bool] = None, + limit: Optional[int] = None, + name: Optional[str] = None, + offset: Optional[int] = None, + page_token: Optional[str] = None) -> Iterator[BaseJob]: + """List jobs. + + Retrieves a list of jobs. If the job has multiple pages of tasks, job_clusters, parameters or environments, + it will paginate through all pages and aggregate the results. + + :param expand_tasks: bool (optional) + Whether to include task and cluster details in the response. Note that in API 2.2, only the first + 100 elements will be shown. Use :method:jobs/get to paginate through all tasks and clusters. + :param limit: int (optional) + The number of jobs to return. This value must be greater than 0 and less or equal to 100. The + default value is 20. + :param name: str (optional) + A filter on the list based on the exact (case insensitive) job name. + :param offset: int (optional) + The offset of the first job to return, relative to the most recently created job. Deprecated since + June 2023. Use `page_token` to iterate through the pages instead. + :param page_token: str (optional) + Use `next_page_token` or `prev_page_token` returned from the previous request to list the next or + previous page of jobs respectively. + + :returns: Iterator over :class:`BaseJob` + """ + # fetch jobs with limited elements in top level arrays + jobs_list = super().list(expand_tasks=expand_tasks, + limit=limit, + name=name, + offset=offset, + page_token=page_token) + if not expand_tasks: + yield from jobs_list + + # fully fetch all top level arrays for each job in the list + for job in jobs_list: + if job.has_more: + job_from_get_call = self.get(job.job_id) + job.settings.tasks = job_from_get_call.settings.tasks + job.settings.job_clusters = job_from_get_call.settings.job_clusters + job.settings.parameters = job_from_get_call.settings.parameters + job.settings.environments = job_from_get_call.settings.environments + # Remove has_more fields for each job in the list. + # This field in Jobs API 2.2 is useful for pagination. It indicates if there are more than 100 tasks or job_clusters in the job. + # This function hides pagination details from the user. So the field does not play useful role here. + if hasattr(job, 'has_more'): + delattr(job, 'has_more') + yield job + def get_run(self, run_id: int, *, diff --git a/tests/test_jobs_mixin.py b/tests/test_jobs_mixin.py index 2c39d41d9..ebe9f9fa3 100644 --- a/tests/test_jobs_mixin.py +++ b/tests/test_jobs_mixin.py @@ -1,6 +1,6 @@ import json import re -from typing import Pattern +from typing import Optional, Pattern from databricks.sdk import WorkspaceClient @@ -10,10 +10,18 @@ def make_getrun_path_pattern(run_id: int, page_token: str) -> Pattern[str]: rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}' ) +def make_getjob_path_pattern(job_id: int, page_token: Optional[str] = None) -> Pattern[str]: + if page_token: + return re.compile( + rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/get?job_id={job_id}&page_token={page_token}")}' + ) + else: + return re.compile( + rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/get?job_id={job_id}")}') -def make_getjob_path_pattern(job_id: int, page_token: str) -> Pattern[str]: +def make_listjobs_path_pattern(page_token: str) -> Pattern[str]: return re.compile( - rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/get?job_id={job_id}&page_token={page_token}")}' + rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/list")}\?(?:expand_tasks=(?:true|false)&)?page_token={re.escape(page_token)}' ) @@ -261,3 +269,343 @@ def test_get_job_pagination_with_tasks(config, requests_mock): }] } } + +def test_list_jobs_without_task_expansion(config, requests_mock): + listjobs_page1 = { + "jobs": [{ + "job_id": 100, + "settings": { + "name": "job100", + }, + }, { + "job_id": 200, + "settings": { + "name": "job200", + } + }, { + "job_id": 300, + "settings": { + "name": "job300", + } + }], + "next_page_token": + "tokenToSecondPage" + } + listjobs_page2 = { + "jobs": [{ + "job_id": 400, + "settings": { + "name": "job400", + } + }, { + "job_id": 500, + "settings": { + "name": "job500", + } + }] + } + + requests_mock.get(make_listjobs_path_pattern("initialToken"), text=json.dumps(listjobs_page1)) + requests_mock.get(make_listjobs_path_pattern("tokenToSecondPage"), text=json.dumps(listjobs_page2)) + w = WorkspaceClient(config=config) + + # Converts the iterator to a list in order to compare the results + jobs_list = list(w.jobs.list(expand_tasks=False, page_token="initialToken")) + jobs_dict = [job.as_dict() for job in jobs_list] + + assert jobs_dict == [{ + "job_id": 100, + "settings": { + "name": "job100", + } + }, { + "job_id": 200, + "settings": { + "name": "job200", + } + }, { + "job_id": 300, + "settings": { + "name": "job300", + } + }, { + "job_id": 400, + "settings": { + "name": "job400", + } + }, { + "job_id": 500, + "settings": { + "name": "job500", + } + }] + + # only two requests should be made which are jobs/list requests + assert requests_mock.call_count == 2 + +def test_list_jobs_with_many_tasks(config, requests_mock): + from databricks.sdk.service import compute, jobs + cluster_spec = compute.ClusterSpec(spark_version="11.3.x-scala2.12", + custom_tags={"ResourceClass": "SingleNode"}, + num_workers=0, + node_type_id="Standard_DS3_v2", + ) + cluster1 = jobs.JobCluster(job_cluster_key="cluster1", new_cluster=cluster_spec) + cluster2 = jobs.JobCluster(job_cluster_key="cluster2", new_cluster=cluster_spec) + cluster3 = jobs.JobCluster(job_cluster_key="cluster3", new_cluster=cluster_spec) + cluster4 = jobs.JobCluster(job_cluster_key="cluster4", new_cluster=cluster_spec) + listjobs_page1 = { + "jobs": [{ + "job_id": 100, + "settings": { + "tasks": [{ + "task_key": "taskkey105" + }, { + "task_key": "taskkey103" + }], + "job_clusters": [cluster1.as_dict(), cluster2.as_dict()], + "parameters": [{ + "name": "param1", + "default": "default1" + }], + "environments": [{ + "environment_key": "key1" + }, { + "environment_key": "key2" + }] + }, + "has_more": True + }, { + "job_id": 200, + "settings": { + "tasks": [{ + "task_key": "taskkey201" + }, { + "task_key": "taskkey202" + }] + }, + "has_more": True + }, { + "job_id": 300, + "settings": { + "tasks": [{ + "task_key": "taskkey301" + }] + } + }], + "next_page_token": "tokenToSecondPage" + } + listjobs_page2 = { + "jobs": [{ + "job_id": 400, + "settings": { + "tasks": [{ + "task_key": "taskkey401" + }, { + "task_key": "taskkey402" + }], + "job_clusters": [cluster1.as_dict()], + }, + "has_more": True + }] + } + + getjob_100_page1 = { + "job_id": 100, + "settings": { + "tasks": [{ + "task_key": "taskkey101" + }, { + "task_key": "taskkey102" + }], + "job_clusters": [cluster1.as_dict(), cluster2.as_dict()], + "parameters": [{ + "name": "param1", + "default": "default1" + }], + "environments": [{ + "environment_key": "key1" + }, { + "environment_key": "key2" + }] + }, + "next_page_token": "tokenToSecondPage_100" + } + getjob_100_page2 = { + "job_id": 100, + "settings": { + "tasks": [{ + "task_key": "taskkey103" + }, { + "task_key": "taskkey104" + }], + "job_clusters": [cluster3.as_dict(), cluster4.as_dict()], + "parameters": [{ + "name": "param2", + "default": "default2" + }], + "environments": [{ + "environment_key": "key3" + }, { + "environment_key": "key4" + }] + }, + "next_page_token": "tokenToThirdPage_100" + } + getjob_100_page3 = { + "job_id": 100, + "settings": { + "tasks": [{ + "task_key": "taskkey105" + }], + "environments": [{ + "environment_key": "key5" + }] + } + } + + getjob_200_page1 = { + "job_id": 200, + "settings": { + "tasks": [{ + "task_key": "taskkey201" + }, { + "task_key": "taskkey202" + }] + }, + "next_page_token": "tokenToSecondPage_200" + } + getjob_200_page2 = { + "job_id": 200, + "settings": { + "tasks": [{ + "task_key": "taskkey203" + }, { + "task_key": "taskkey204" + }] + } + } + getjob_300_page1 = {"job_id": 300, "settings": {"tasks": [{"task_key": "taskkey301"}]}} + getjob_400_page1 = { + "job_id": 400, + "settings": { + "tasks": [{ + "task_key": "taskkey401" + }, { + "task_key": + "taskkey403" # jobs/get returns tasks in different order. jobs/get order is the ground truth + }], + "job_clusters": [cluster1.as_dict()] + }, + "next_page_token": "tokenToSecondPage_400" + } + getjob_400_page2 = { + "job_id": 400, + "settings": { + "tasks": [{ + "task_key": "taskkey402" + }], + "job_clusters": [cluster2.as_dict()] + } + } + + requests_mock.get(make_listjobs_path_pattern("initialToken"), text=json.dumps(listjobs_page1)) + requests_mock.get(make_listjobs_path_pattern("tokenToSecondPage"), text=json.dumps(listjobs_page2)) + + requests_mock.get(make_getjob_path_pattern(100), text=json.dumps(getjob_100_page1)) + requests_mock.get(make_getjob_path_pattern(100, "tokenToSecondPage_100"), + text=json.dumps(getjob_100_page2)) + requests_mock.get(make_getjob_path_pattern(100, "tokenToThirdPage_100"), + text=json.dumps(getjob_100_page3)) + + requests_mock.get(make_getjob_path_pattern(200), text=json.dumps(getjob_200_page1)) + requests_mock.get(make_getjob_path_pattern(200, "tokenToSecondPage_200"), + text=json.dumps(getjob_200_page2)) + + requests_mock.get(make_getjob_path_pattern(300), text=json.dumps(getjob_300_page1)) + + requests_mock.get(make_getjob_path_pattern(400), text=json.dumps(getjob_400_page1)) + requests_mock.get(make_getjob_path_pattern(400, "tokenToSecondPage_400"), + text=json.dumps(getjob_400_page2)) + w = WorkspaceClient(config=config) + + # Converts the iterator to a list in order to compare the results + jobs_list = list(w.jobs.list(expand_tasks=True, page_token="initialToken")) + jobs_dict = [job.as_dict() for job in jobs_list] + + assert jobs_dict == [{ + "job_id": 100, + "settings": { + "tasks": [{ + "task_key": "taskkey101" + }, { + "task_key": "taskkey102" + }, { + "task_key": "taskkey103" + }, { + "task_key": "taskkey104" + }, { + "task_key": "taskkey105" + }], + "job_clusters": [cluster1.as_dict(), + cluster2.as_dict(), + cluster3.as_dict(), + cluster4.as_dict()], + "parameters": [{ + "name": "param1", + "default": "default1" + }, { + "name": "param2", + "default": "default2" + }], + "environments": [{ + "environment_key": "key1" + }, { + "environment_key": "key2" + }, { + "environment_key": "key3" + }, { + "environment_key": "key4" + }, { + "environment_key": "key5" + }] + } + }, { + "job_id": 200, + "settings": { + "tasks": [{ + "task_key": "taskkey201" + }, { + "task_key": "taskkey202" + }, { + "task_key": "taskkey203" + }, { + "task_key": "taskkey204" + }] + } + }, { + "job_id": 300, + "settings": { + "tasks": [{ + "task_key": "taskkey301" + }] + } + }, { + "job_id": 400, + "settings": { + "tasks": [{ + "task_key": "taskkey401" + }, { + "task_key": "taskkey403" + }, { + "task_key": "taskkey402" + }], + "job_clusters": [cluster1.as_dict(), cluster2.as_dict()] + } + }] + + # only two requests should be made which are jobs/list requests + assert requests_mock.call_count == 9 + # check that job_id 300 was never used in jobs/get call + history = requests_mock.request_history + assert all('300' not in request.qs.get("job_id", ['']) for request in history) From 942a91e6cdbecfd8d7b4d97c01fecca5b11b746f Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Mon, 17 Feb 2025 14:12:40 +0100 Subject: [PATCH 2/3] fmt --- tests/test_jobs_mixin.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/test_jobs_mixin.py b/tests/test_jobs_mixin.py index ebe9f9fa3..2c3c300f1 100644 --- a/tests/test_jobs_mixin.py +++ b/tests/test_jobs_mixin.py @@ -10,6 +10,7 @@ def make_getrun_path_pattern(run_id: int, page_token: str) -> Pattern[str]: rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}' ) + def make_getjob_path_pattern(job_id: int, page_token: Optional[str] = None) -> Pattern[str]: if page_token: return re.compile( @@ -19,6 +20,7 @@ def make_getjob_path_pattern(job_id: int, page_token: Optional[str] = None) -> P return re.compile( rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/get?job_id={job_id}")}') + def make_listjobs_path_pattern(page_token: str) -> Pattern[str]: return re.compile( rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/list")}\?(?:expand_tasks=(?:true|false)&)?page_token={re.escape(page_token)}' @@ -270,6 +272,7 @@ def test_get_job_pagination_with_tasks(config, requests_mock): } } + def test_list_jobs_without_task_expansion(config, requests_mock): listjobs_page1 = { "jobs": [{ @@ -343,6 +346,7 @@ def test_list_jobs_without_task_expansion(config, requests_mock): # only two requests should be made which are jobs/list requests assert requests_mock.call_count == 2 + def test_list_jobs_with_many_tasks(config, requests_mock): from databricks.sdk.service import compute, jobs cluster_spec = compute.ClusterSpec(spark_version="11.3.x-scala2.12", @@ -393,7 +397,8 @@ def test_list_jobs_with_many_tasks(config, requests_mock): }] } }], - "next_page_token": "tokenToSecondPage" + "next_page_token": + "tokenToSecondPage" } listjobs_page2 = { "jobs": [{ From a18a76af5e4dfafd978fdaae7f20ab2ef8b2cba5 Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Thu, 20 Feb 2025 17:05:50 +0100 Subject: [PATCH 3/3] Add changelog --- NEXT_CHANGELOG.md | 1 + tests/test_jobs_mixin.py | 15 +++++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 44956e0ce..97692cd9e 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -9,6 +9,7 @@ ### Documentation ### Internal Changes +* Update Jobs ListJobs API to support paginated responses ([#896](https://github.com/databricks/databricks-sdk-py/pull/896)) * Introduce automated tagging ([#888](https://github.com/databricks/databricks-sdk-py/pull/888)) * Update Jobs GetJob API to support paginated responses ([#869](https://github.com/databricks/databricks-sdk-py/pull/869)). diff --git a/tests/test_jobs_mixin.py b/tests/test_jobs_mixin.py index 2c3c300f1..9f6a5feb7 100644 --- a/tests/test_jobs_mixin.py +++ b/tests/test_jobs_mixin.py @@ -494,12 +494,15 @@ def test_list_jobs_with_many_tasks(config, requests_mock): getjob_400_page1 = { "job_id": 400, "settings": { - "tasks": [{ - "task_key": "taskkey401" - }, { - "task_key": - "taskkey403" # jobs/get returns tasks in different order. jobs/get order is the ground truth - }], + "tasks": [ + { + "task_key": "taskkey401" + }, + { + "task_key": + "taskkey403" # jobs/get returns tasks in different order. jobs/get order is the ground truth + } + ], "job_clusters": [cluster1.as_dict()] }, "next_page_token": "tokenToSecondPage_400"