1
1
import json
2
2
from dataclasses import dataclass
3
- from typing import Optional , Dict , List , Set
3
+ from typing import Optional , Union , Dict , List , Set
4
4
from enum import Enum
5
5
from nucleus .constants import (
6
6
CAMERA_PARAMS_KEY ,
7
7
FRAMES_KEY ,
8
+ INDEX_KEY ,
9
+ ITEMS_KEY ,
8
10
METADATA_KEY ,
9
11
REFERENCE_ID_KEY ,
10
12
TYPE_KEY ,
@@ -70,6 +72,7 @@ def to_json(self) -> str:
70
72
71
73
@dataclass
72
74
class Frame :
75
+ index : Union [int , None ] = None
73
76
items : Dict [str , SceneDatasetItem ] = {}
74
77
75
78
def __post_init__ (self ):
@@ -84,8 +87,11 @@ def add_item(self, item: SceneDatasetItem, sensor_name: str):
84
87
85
88
def to_payload (self ) -> dict :
86
89
return {
87
- sensor : scene_dataset_item .to_payload ()
88
- for sensor , scene_dataset_item in self .items .items ()
90
+ INDEX_KEY : self .index ,
91
+ ITEMS_KEY : {
92
+ sensor : scene_dataset_item .to_payload ()
93
+ for sensor , scene_dataset_item in self .items .items ()
94
+ },
89
95
}
90
96
91
97
@@ -95,8 +101,14 @@ class Scene:
95
101
frames : List [Frame ] = []
96
102
metadata : Optional [dict ] = None
97
103
98
- # TODO: move validation to scene upload
99
104
def __post_init__ (self ):
105
+ self .check_valid_frame_indices ()
106
+ if (all ([frame .index is not None for frame in self .frames ])):
107
+ self .frames_dict = {frame .index : frame for frame in self .frames }
108
+ else :
109
+ self .frames_dict = {i : frame for i , frame in enumerate (self .frames )}
110
+
111
+ # TODO: move validation to scene upload
100
112
assert isinstance (self .frames , List ), "frames must be a list"
101
113
for frame in self .frames :
102
114
assert isinstance (
@@ -107,13 +119,34 @@ def __post_init__(self):
107
119
self .reference_id , str
108
120
), "reference_id must be a string"
109
121
110
- def add_frame (self , frame : Frame ):
111
- self .frames .append (frame )
122
+ def check_valid_frame_indices (self ):
123
+ infer_from_list_position = all ([frame .index is None for frame in self .frames ])
124
+ explicit_frame_order = all ([frame .index is not None for frame in self .frames ])
125
+ assert infer_from_list_position or explicit_frame_order , "Must specify index explicitly for all frames or implicitly for all frames (inferred from list position)"
126
+
127
+ def add_item (self , item : SceneDatasetItem , index : int , sensor_name : str ):
128
+ if index not in self .frames_dict :
129
+ new_frame = Frame (index , {sensor_name : item })
130
+ self .frames_dict [index ] = new_frame
131
+ else :
132
+ self .frames_dict [index ].items [sensor_name ] = item
133
+
134
+ def add_frame (self , frame : Frame , update : bool = False ):
135
+ assert frame .index is not None , "Must specify index explicitly when calling add_frame"
136
+ if frame .index not in self .frames_dict or frame .index in self .frames_dict and update :
137
+ self .frames_dict [frame .index ] = frame
112
138
113
139
def to_payload (self ) -> dict :
140
+ frames_payload = [frame .to_payload () for frame in self .frames ]
141
+ if len (frames_payload ) > 0 and frames_payload [0 ].index is None :
142
+ for i in range (len (frames_payload )):
143
+ frames_payload [INDEX_KEY ] = i
144
+ else :
145
+ frames_payload .sort (lambda x : x [INDEX_KEY ])
146
+
114
147
return {
115
148
REFERENCE_ID_KEY : self .reference_id ,
116
- FRAMES_KEY : [ frame . to_payload () for frame in self . frames ] ,
149
+ FRAMES_KEY : frames_payload ,
117
150
METADATA_KEY : self .metadata ,
118
151
}
119
152
0 commit comments