Skip to content

Commit 1caaa7c

Browse files
authored
[Nucleus] Get job status (#379)
1 parent 64d6e50 commit 1caaa7c

File tree

8 files changed

+246
-19
lines changed

8 files changed

+246
-19
lines changed

.flake8

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ ignore = E203, E266, E501, W503, F403, F401
33
max-line-length = 79
44
max-complexity = 18
55
select = B,C,E,F,W,T4,B9
6+
classmethod-decorators =
7+
classmethod
8+
validator
69
exclude =
710
# All of these excludes should mirror something in .gitignore
811
.git,

CHANGELOG.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,41 @@ All notable changes to the [Nucleus Python Client](https://github.com/scaleapi/n
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [0.15.1](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.15.1) - 2023-01-16
9+
10+
### Changed
11+
- Better filter tuning of `client.list_jobs(args)` method
12+
13+
### Added
14+
- Dataset method to filter jobs, and statistics on running jobs
15+
Example:
16+
```python
17+
>>> client = nucleus.NucleusClient(API_KEY)
18+
>>> ds = client.get_dataset(ds_id)
19+
>>> ds.jobs(show_completed=True, stats_only=True)
20+
{'autotagInference': {'Cancelled': 1, 'Completed': 11},
21+
'modelRunCommit': {'Completed': 7, 'Errored_Server': 1, 'Running': 1},
22+
'sliceQuery': {'Completed': 40, 'Running': 2}}
23+
```
24+
25+
Detailed Example
26+
```python
27+
>>> from nucleus.job import CustomerJobTypes
28+
>>> client = nucleus.NucleusClient(API_KEY)
29+
>>> ds = client.get_dataset(ds_id)
30+
>>> from_date = "2022-12-20"; to_date = "2023-01-15"
31+
>>> job_types = [CustomerJobTypes.MODEL_INFERENCE_RUN, CustomerJobTypes.UPLOAD_DATASET_ITEMS]
32+
>>> ds.jobs(
33+
from_date=from_date,
34+
to_date=to_date,
35+
show_completed=True,
36+
job_types=job_types,
37+
limit=150
38+
)
39+
# ... returns list of AsyncJob objects
40+
```
41+
42+
843
## [0.15.0](https://github.com/scaleapi/nucleus-python-client/releases/tag/v0.15.0) - 2022-12-19
944

1045
### Changed

nucleus/__init__.py

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
"VideoScene",
4141
]
4242

43+
import datetime
4344
import os
4445
import warnings
4546
from typing import Any, Dict, List, Optional, Sequence, Union
@@ -105,6 +106,7 @@
105106
)
106107
from .data_transfer_object.dataset_details import DatasetDetails
107108
from .data_transfer_object.dataset_info import DatasetInfo
109+
from .data_transfer_object.job_status import JobInfoRequestPayload
108110
from .dataset import Dataset
109111
from .dataset_item import DatasetItem
110112
from .deprecation_warning import deprecated
@@ -116,6 +118,7 @@
116118
NotFoundError,
117119
NucleusAPIError,
118120
)
121+
from .job import CustomerJobTypes
119122
from .logger import logger
120123
from .model import Model
121124
from .model_run import ModelRun
@@ -251,23 +254,47 @@ def list_datasets(self) -> Dict[str, Union[str, List[str]]]:
251254
return self.make_request({}, "dataset/", requests.get)
252255

253256
def list_jobs(
254-
self, show_completed=None, date_limit=None
257+
self,
258+
show_completed: bool = False,
259+
from_date: Optional[Union[str, datetime.datetime]] = None,
260+
to_date: Optional[Union[str, datetime.datetime]] = None,
261+
job_types: Optional[List[CustomerJobTypes]] = None,
262+
limit: Optional[int] = None,
263+
dataset_id: Optional[str] = None,
264+
date_limit: Optional[str] = None,
255265
) -> List[AsyncJob]:
256266
"""Fetches all of your running jobs in Nucleus.
257267
258268
Parameters:
259-
show_completed: Whether to fetch completed and errored jobs or just
260-
running jobs. Default behavior is False.
261-
date_limit: Only fetch jobs that were started after this date. Default
262-
behavior is 2 weeks prior to the current date.
263-
264-
Returns:
265-
List[:class:`AsyncJob`]: List of running asynchronous jobs
266-
associated with the client API key.
269+
job_types: Filter on set of job types, if None, fetch all types
270+
from_date: beginning of date range filter
271+
to_date: end of date range filter
272+
limit: number of results to fetch, max 50_000
273+
show_completed: dont fetch jobs with Completed status
274+
stats_only: return overview of jobs, instead of a list of job objects
275+
dataset_id: filter on a particular dataset
276+
date_limit: Deprecated, do not use
277+
278+
Returns:
279+
List[:class:`AsyncJob`]: List of running asynchronous jobs
280+
associated with the client API key.
267281
"""
268-
# TODO: What type is date_limit? Use pydantic ...
269-
payload = {show_completed: show_completed, date_limit: date_limit}
270-
job_objects = self.make_request(payload, "jobs/", requests.get)
282+
283+
if date_limit is not None:
284+
warnings.warn(
285+
"Argument `date_limit` is no longer supported. Consider using the `from_date` and `to_date` args."
286+
)
287+
288+
payload = JobInfoRequestPayload(
289+
dataset_id=dataset_id,
290+
show_completed=show_completed,
291+
from_date=from_date,
292+
to_date=to_date,
293+
limit=limit,
294+
job_types=job_types,
295+
).dict()
296+
297+
job_objects = self.make_request(payload, "jobs/", requests.post)
271298
return [
272299
AsyncJob(
273300
job_id=job[JOB_ID_KEY],
@@ -1032,7 +1059,7 @@ def make_request(
10321059
route: str,
10331060
requests_command=requests.post,
10341061
return_raw_response: bool = False,
1035-
) -> dict:
1062+
) -> Union[dict, Any]:
10361063
"""Makes a request to a Nucleus API endpoint.
10371064
10381065
Logs a warning if not successful.

nucleus/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
K4_KEY = "k4"
7878
KEEP_HISTORY_KEY = "keep_history"
7979
LENGTH_KEY = "length"
80+
JOB_REQ_LIMIT = 50_000
8081
JOB_STATUS_KEY = "job_status"
8182
JOB_LAST_KNOWN_STATUS_KEY = "job_last_known_status"
8283
JOB_TYPE_KEY = "job_type"
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# pylint: disable=E0213
2+
3+
from datetime import datetime
4+
from typing import List, Optional, Union
5+
6+
from dateutil.parser import ParserError, parse
7+
from pydantic import validator
8+
9+
from nucleus.constants import JOB_REQ_LIMIT
10+
from nucleus.job import CustomerJobTypes
11+
from nucleus.pydantic_base import ImmutableModel
12+
13+
14+
class JobInfoRequestPayload(ImmutableModel):
15+
dataset_id: Optional[str]
16+
job_types: Optional[List[CustomerJobTypes]]
17+
from_date: Optional[Union[str, datetime]]
18+
to_date: Optional[Union[str, datetime]]
19+
limit: Optional[int]
20+
show_completed: bool
21+
22+
@validator("from_date", "to_date")
23+
def ensure_date_format(cls, date):
24+
if date is None:
25+
return None
26+
if isinstance(date, datetime):
27+
return str(date)
28+
try:
29+
parse(date)
30+
except ParserError as err:
31+
raise ValueError(
32+
f"Date {date} not a valid date. Try using YYYY-MM-DD format."
33+
) from err
34+
return date
35+
36+
@validator("limit")
37+
def ensure_limit(cls, limit):
38+
if limit is None:
39+
return JOB_REQ_LIMIT
40+
if limit > JOB_REQ_LIMIT:
41+
raise ValueError(f"Max request limit is 50,000, but got: {limit}.")
42+
return limit
43+
44+
@validator("job_types")
45+
def ensure_job_type(cls, job_types):
46+
if job_types is None:
47+
return []
48+
try:
49+
assert all(t in CustomerJobTypes for t in job_types)
50+
except AssertionError as badType:
51+
raise ValueError(
52+
f"Job types must be one of: {CustomerJobTypes.options()}"
53+
) from badType
54+
55+
return [t.value for t in job_types]

nucleus/dataset.py

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
1+
import datetime
12
import os
2-
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union
3+
from typing import (
4+
TYPE_CHECKING,
5+
Any,
6+
Dict,
7+
Iterable,
8+
List,
9+
Optional,
10+
Sequence,
11+
Tuple,
12+
Union,
13+
)
314

415
import requests
516

@@ -32,6 +43,7 @@
3243
EXPORTED_ROWS,
3344
FRAME_RATE_KEY,
3445
ITEMS_KEY,
46+
JOB_REQ_LIMIT,
3547
KEEP_HISTORY_KEY,
3648
MESSAGE_KEY,
3749
NAME_KEY,
@@ -54,6 +66,7 @@
5466
from .dataset_item_uploader import DatasetItemUploader
5567
from .deprecation_warning import deprecated
5668
from .errors import NotFoundError, NucleusAPIError
69+
from .job import CustomerJobTypes, jobs_status_overview
5770
from .metadata_manager import ExportMetadataType, MetadataManager
5871
from .payload_constructor import (
5972
construct_append_scenes_payload,
@@ -70,6 +83,9 @@
7083
)
7184
from .upload_response import UploadResponse
7285

86+
if TYPE_CHECKING:
87+
from . import NucleusClient
88+
7389
# TODO: refactor to reduce this file to under 1000 lines.
7490
# pylint: disable=C0302
7591

@@ -107,7 +123,7 @@ class Dataset:
107123
existing_dataset = client.get_dataset("YOUR_DATASET_ID")
108124
"""
109125

110-
def __init__(self, dataset_id, client, name=None):
126+
def __init__(self, dataset_id, client: "NucleusClient", name=None):
111127
self.id = dataset_id
112128
self._client = client
113129
# NOTE: Optionally set name on creation such that the property access doesn't need to hit the server
@@ -144,7 +160,7 @@ def is_scene(self) -> bool:
144160
{}, f"dataset/{self.id}/is_scene", requests.get
145161
)[DATASET_IS_SCENE_KEY]
146162
self._is_scene = response
147-
return self._is_scene
163+
return self._is_scene # type: ignore
148164

149165
@property
150166
def model_runs(self) -> List[str]:
@@ -153,7 +169,7 @@ def model_runs(self) -> List[str]:
153169
response = self._client.make_request(
154170
{}, f"dataset/{self.id}/model_runs", requests.get
155171
)
156-
return response
172+
return response # type: ignore
157173

158174
@property
159175
def slices(self) -> List[Slice]:
@@ -885,7 +901,7 @@ def build_slice(
885901
sample_size: int,
886902
sample_method: Union[str, SliceBuilderMethods],
887903
filters: Optional[SliceBuilderFilters] = None,
888-
) -> Union[str, Tuple[AsyncJob, str]]:
904+
) -> Union[str, Tuple[AsyncJob, str], dict]:
889905
"""Build a slice using Nucleus' Smart Sample tool. Allowing slices to be built
890906
based on certain criteria, and filters.
891907
@@ -1926,3 +1942,36 @@ def delete_tracks(self, track_reference_ids: List[str]) -> None:
19261942
route=f"dataset/{self.id}/tracks",
19271943
requests_command=requests.delete,
19281944
)
1945+
1946+
def jobs(
1947+
self,
1948+
job_types: Optional[List[CustomerJobTypes]] = None,
1949+
from_date: Optional[Union[str, datetime.datetime]] = None,
1950+
to_date: Optional[Union[str, datetime.datetime]] = None,
1951+
limit: int = JOB_REQ_LIMIT,
1952+
show_completed: bool = False,
1953+
stats_only: bool = False,
1954+
):
1955+
"""
1956+
Fetch jobs pertaining to this particular dataset.
1957+
1958+
Parameters:
1959+
job_types: Filter on set of job types, if None, fetch all types, ie: ['uploadDatasetItems']
1960+
from_date: beginning of date range, as a string 'YYYY-MM-DD' or datetime object.
1961+
For example: '2021-11-05', parser.parse('Nov 5 2021'), or datetime(2021,11,5)
1962+
to_date: end of date range
1963+
limit: number of results to fetch, max 50_000
1964+
show_completed: dont fetch jobs with Completed status
1965+
stats_only: return overview of jobs, instead of a list of job objects
1966+
"""
1967+
job_objects = self._client.list_jobs(
1968+
dataset_id=self.id,
1969+
show_completed=show_completed,
1970+
from_date=from_date,
1971+
to_date=to_date,
1972+
limit=limit,
1973+
job_types=job_types,
1974+
)
1975+
if stats_only:
1976+
return jobs_status_overview(job_objects)
1977+
return job_objects

nucleus/job.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from collections import defaultdict
2+
from enum import Enum
3+
from typing import Any, Dict, List
4+
5+
from .async_job import AsyncJob
6+
7+
8+
class CustomerJobTypes(str, Enum):
9+
UPLOAD_DATASET_ITEMS = "uploadDatasetItems"
10+
UPLOAD_PREDICTIONS = "uploadPredictions"
11+
UPLOAD_ANNOTATIONS = "uploadAnnotations"
12+
UPLOAD_LIDAR_SCENE = "uploadLidarScene"
13+
UPLOAD_VIDEO_SCENE = "uploadVideoScene"
14+
MODEL_INFERENCE_RUN = "modelInferenceRun"
15+
INDEXING_IMAGE = "indexingImage"
16+
INDEXING_OBJECT = "indexingObject"
17+
ANNOTATION_DELETION = "annotationDeletion"
18+
SEND_TO_LABELING = "sendToLabeling"
19+
INGEST_TASKS = "ingestTasks"
20+
CUSTOM_INDEXING = "customIndexing"
21+
EMBEDDING_DELETE = "embeddingDelete"
22+
TEST_EVALUATION = "testEvaluation"
23+
VALIDATE_METRICS = "modelMetrics"
24+
MODEL_RUN_COMMIT = "modelRunCommit"
25+
AUTOTAG_INFERENCE = "autotagInference"
26+
SLICE_QUERY = "sliceQuery"
27+
CLONE_DATASET = "cloneDataset"
28+
METADATA_UPDATE = "metadataUpdate"
29+
TRIGGER_EVALUATE = "triggerEvaluate"
30+
31+
def __contains__(self, item):
32+
try:
33+
self(item)
34+
except ValueError:
35+
return False
36+
return True
37+
38+
@staticmethod
39+
def options():
40+
return list(map(lambda c: c.value, CustomerJobTypes))
41+
42+
43+
def jobs_status_overview(jobs: List[AsyncJob]) -> Dict[str, Any]:
44+
jobs_by_type = defaultdict(list)
45+
for job in jobs:
46+
jobs_by_type[job.job_type].append(job)
47+
48+
jobs_status = {}
49+
for job_type, job_collection in jobs_by_type.items():
50+
overview = defaultdict(int) # type: Dict[str, int]
51+
for job in job_collection:
52+
overview[job.job_last_known_status] += 1
53+
jobs_status[job_type] = dict(overview)
54+
55+
return jobs_status

0 commit comments

Comments
 (0)