From 8af2190048ddf6da794922d9228be98e75440193 Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Mon, 17 Feb 2025 14:57:21 +0100 Subject: [PATCH 1/3] Add logic and tests for ListRuns --- databricks/sdk/mixins/jobs.py | 83 +++++++++++++- tests/test_jobs_mixin.py | 207 +++++++++++++++++++++++++++++++++- 2 files changed, 283 insertions(+), 7 deletions(-) diff --git a/databricks/sdk/mixins/jobs.py b/databricks/sdk/mixins/jobs.py index d5e2a1728..07b6baf6c 100644 --- a/databricks/sdk/mixins/jobs.py +++ b/databricks/sdk/mixins/jobs.py @@ -1,11 +1,90 @@ -from typing import Optional +from typing import Iterator, Optional from databricks.sdk.service import jobs -from databricks.sdk.service.jobs import Job +from databricks.sdk.service.jobs import BaseRun, Job, RunType class JobsExt(jobs.JobsAPI): + def list_runs(self, + *, + active_only: Optional[bool] = None, + completed_only: Optional[bool] = None, + expand_tasks: Optional[bool] = None, + job_id: Optional[int] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + page_token: Optional[str] = None, + run_type: Optional[RunType] = None, + start_time_from: Optional[int] = None, + start_time_to: Optional[int] = None) -> Iterator[BaseRun]: + """List job runs. + + List runs in descending order by start time. If the job has multiple pages of tasks, job_clusters, parameters or repair history, + it will paginate through all pages and aggregate the results. + + :param active_only: bool (optional) + If active_only is `true`, only active runs are included in the results; otherwise, lists both active + and completed runs. An active run is a run in the `QUEUED`, `PENDING`, `RUNNING`, or `TERMINATING`. + This field cannot be `true` when completed_only is `true`. + :param completed_only: bool (optional) + If completed_only is `true`, only completed runs are included in the results; otherwise, lists both + active and completed runs. This field cannot be `true` when active_only is `true`. + :param expand_tasks: bool (optional) + Whether to include task and cluster details in the response. Note that in API 2.2, only the first + 100 elements will be shown. Use :method:jobs/getrun to paginate through all tasks and clusters. + :param job_id: int (optional) + The job for which to list runs. If omitted, the Jobs service lists runs from all jobs. + :param limit: int (optional) + The number of runs to return. This value must be greater than 0 and less than 25. The default value + is 20. If a request specifies a limit of 0, the service instead uses the maximum limit. + :param offset: int (optional) + The offset of the first run to return, relative to the most recent run. Deprecated since June 2023. + Use `page_token` to iterate through the pages instead. + :param page_token: str (optional) + Use `next_page_token` or `prev_page_token` returned from the previous request to list the next or + previous page of runs respectively. + :param run_type: :class:`RunType` (optional) + The type of runs to return. For a description of run types, see :method:jobs/getRun. + :param start_time_from: int (optional) + Show runs that started _at or after_ this value. The value must be a UTC timestamp in milliseconds. + Can be combined with _start_time_to_ to filter by a time range. + :param start_time_to: int (optional) + Show runs that started _at or before_ this value. The value must be a UTC timestamp in milliseconds. + Can be combined with _start_time_from_ to filter by a time range. + + :returns: Iterator over :class:`BaseRun` + """ + # fetch runs with limited elements in top level arrays + runs_list = super().list_runs(active_only=active_only, + completed_only=completed_only, + expand_tasks=expand_tasks, + job_id=job_id, + limit=limit, + offset=offset, + page_token=page_token, + run_type=run_type, + start_time_from=start_time_from, + start_time_to=start_time_to) + + if not expand_tasks: + yield from runs_list + + # fully fetch all top level arrays for each run in the list + for run in runs_list: + if run.has_more: + run_from_get_call = self.get_run(run.run_id) + run.tasks = run_from_get_call.tasks + run.job_clusters = run_from_get_call.job_clusters + run.job_parameters = run_from_get_call.job_parameters + run.repair_history = run_from_get_call.repair_history + # Remove has_more fields for each run in the list. + # This field in Jobs API 2.2 is useful for pagination. It indicates if there are more than 100 tasks or job_clusters in the run. + # This function hides pagination details from the user. So the field does not play useful role here. + if hasattr(run, 'has_more'): + delattr(run, 'has_more') + yield run + def get_run(self, run_id: int, *, diff --git a/tests/test_jobs_mixin.py b/tests/test_jobs_mixin.py index 2c39d41d9..c29b08b3b 100644 --- a/tests/test_jobs_mixin.py +++ b/tests/test_jobs_mixin.py @@ -1,14 +1,18 @@ import json import re -from typing import Pattern +from typing import Optional, Pattern from databricks.sdk import WorkspaceClient -def make_getrun_path_pattern(run_id: int, page_token: str) -> Pattern[str]: - return re.compile( - rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}' - ) +def make_getrun_path_pattern(run_id: int, page_token: Optional[str] = None) -> Pattern[str]: + if page_token: + return re.compile( + rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}' + ) + else: + return re.compile( + rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?run_id={run_id}")}') def make_getjob_path_pattern(job_id: int, page_token: str) -> Pattern[str]: @@ -17,6 +21,12 @@ def make_getjob_path_pattern(job_id: int, page_token: str) -> Pattern[str]: ) +def make_listruns_path_pattern(page_token: str) -> Pattern[str]: + return re.compile( + rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/list")}\?(?:expand_tasks=(?:true|false)&)?page_token={re.escape(page_token)}' + ) + + def test_get_run_with_no_pagination(config, requests_mock): run1 = {"tasks": [{"run_id": 0}, {"run_id": 1}], } requests_mock.get(make_getrun_path_pattern(1337, "initialToken"), text=json.dumps(run1)) @@ -261,3 +271,190 @@ def test_get_job_pagination_with_tasks(config, requests_mock): }] } } + + +def test_list_runs_without_task_expansion(config, requests_mock): + listruns_page1 = { + "runs": [{ + "run_id": 100, + "run_name": "run100", + }, { + "run_id": + 200, + "run_name": + "run200", + "job_parameters": [{ + "name": "param1", + "default": "default1" + }, { + "name": "param2", + "default": "default2" + }] + }, { + "run_id": 300, + "run_name": "run300", + }], + "next_page_token": + "tokenToSecondPage" + } + listruns_page2 = { + "runs": [{ + "run_id": 400, + "run_name": "run400", + "repair_history": [{ + "id": "repair400_1", + }, { + "id": "repair400_2", + }] + }] + } + + requests_mock.get(make_listruns_path_pattern("initialToken"), text=json.dumps(listruns_page1)) + requests_mock.get(make_listruns_path_pattern("tokenToSecondPage"), text=json.dumps(listruns_page2)) + w = WorkspaceClient(config=config) + + runs_list = list(w.jobs.list_runs(expand_tasks=False, page_token="initialToken")) + runs_dict = [run.as_dict() for run in runs_list] + + assert runs_dict == [{ + "run_id": 100, + "run_name": "run100", + }, { + "run_id": + 200, + "run_name": + "run200", + "job_parameters": [{ + "name": "param1", + "default": "default1" + }, { + "name": "param2", + "default": "default2" + }] + }, { + "run_id": 300, + "run_name": "run300", + }, { + "run_id": 400, + "run_name": "run400", + "repair_history": [{ + "id": "repair400_1", + }, { + "id": "repair400_2", + }] + }] + + # only two requests should be made which are jobs/list requests + assert requests_mock.call_count == 2 + + +def test_list_runs(config, requests_mock): + listruns_page1 = { + "runs": [{ + "run_id": 100, + "tasks": [{ + "task_key": "taskkey101" + }, { + "task_key": "taskkey102" + }], + "has_more": True + }, { + "run_id": 200, + "tasks": [{ + "task_key": "taskkey201" + }] + }, { + "run_id": 300, + "tasks": [{ + "task_key": "taskkey301" + }] + }], + "next_page_token": + "tokenToSecondPage" + } + listruns_page2 = { + "runs": [{ + "run_id": 400, + "tasks": [{ + "task_key": "taskkey401" + }, { + "task_key": "taskkey402" + }], + "has_more": True + }] + } + + getrun_100_page1 = { + "run_id": 100, + "tasks": [{ + "task_key": "taskkey101" + }, { + "task_key": "taskkey102" + }], + "next_page_token": "tokenToSecondPage_100" + } + getrun_100_page2 = {"run_id": 100, "tasks": [{"task_key": "taskkey103"}]} + getrun_400_page1 = { + "run_id": 400, + "tasks": [{ + "task_key": "taskkey401" + }, { + "task_key": "taskkey403" + }], + "next_page_token": "tokenToSecondPage_400" + } + getrun_400_page2 = {"run_id": 400, "tasks": [{"task_key": "taskkey402"}, {"task_key": "taskkey404"}]} + + requests_mock.get(make_listruns_path_pattern("initialToken"), text=json.dumps(listruns_page1)) + requests_mock.get(make_listruns_path_pattern("tokenToSecondPage"), text=json.dumps(listruns_page2)) + + requests_mock.get(make_getrun_path_pattern(100), text=json.dumps(getrun_100_page1)) + requests_mock.get(make_getrun_path_pattern(100, "tokenToSecondPage_100"), + text=json.dumps(getrun_100_page2)) + + requests_mock.get(make_getrun_path_pattern(400), text=json.dumps(getrun_400_page1)) + requests_mock.get(make_getrun_path_pattern(400, "tokenToSecondPage_400"), + text=json.dumps(getrun_400_page2)) + w = WorkspaceClient(config=config) + + runs_list = list(w.jobs.list_runs(expand_tasks=True, page_token="initialToken")) + runs_dict = [run.as_dict() for run in runs_list] + + assert runs_dict == [{ + "run_id": + 100, + "tasks": [{ + "task_key": "taskkey101", + }, { + "task_key": "taskkey102", + }, { + "task_key": "taskkey103", + }], + }, { + "run_id": 200, + "tasks": [{ + "task_key": "taskkey201", + }], + }, { + "run_id": 300, + "tasks": [{ + "task_key": "taskkey301", + }], + }, { + "run_id": + 400, + "tasks": [{ + "task_key": "taskkey401", + }, { + "task_key": "taskkey403", + }, { + "task_key": "taskkey402", + }, { + "task_key": "taskkey404", + }], + }] + + # check that job_id 200 and 300 was never used in runs/get call + history = requests_mock.request_history + assert all('300' not in request.qs.get("run_id", ['']) for request in history) + assert all('200' not in request.qs.get("run_id", ['']) for request in history) From fdd513820494e2b9da184dd2778359360593183c Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Thu, 20 Feb 2025 16:48:30 +0100 Subject: [PATCH 2/3] Update changelog --- NEXT_CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 44956e0ce..4b6edca54 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -9,6 +9,7 @@ ### Documentation ### Internal Changes +* Update Jobs ListRuns API to support paginated responses ([#889](https://github.com/databricks/databricks-sdk-py/pull/889)) * Introduce automated tagging ([#888](https://github.com/databricks/databricks-sdk-py/pull/888)) * Update Jobs GetJob API to support paginated responses ([#869](https://github.com/databricks/databricks-sdk-py/pull/869)). From b053f07fa0e257c99006e757ce971809163f2e06 Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Thu, 20 Feb 2025 17:09:00 +0100 Subject: [PATCH 3/3] Update changelog --- NEXT_CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 4b6edca54..cdc78ffd3 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -9,7 +9,7 @@ ### Documentation ### Internal Changes -* Update Jobs ListRuns API to support paginated responses ([#889](https://github.com/databricks/databricks-sdk-py/pull/889)) +* Update Jobs ListRuns API to support paginated responses ([#890](https://github.com/databricks/databricks-sdk-py/pull/890)) * Introduce automated tagging ([#888](https://github.com/databricks/databricks-sdk-py/pull/888)) * Update Jobs GetJob API to support paginated responses ([#869](https://github.com/databricks/databricks-sdk-py/pull/869)).