@@ -60,7 +60,11 @@ def _interval_to_seconds(interval: str) -> int:
60
60
61
61
def _set_sql_query (sql_query : str ) -> str :
62
62
return (
63
- sql_query .format (start_time = "$start_time" , end_time = "$end_time" )
63
+ (
64
+ sql_query .replace ("{start_time}" , "$start_time" ).replace (
65
+ "{end_time}" , "$end_time"
66
+ )
67
+ )
64
68
or "SELECT * FROM {measurement_name} " # noqa: S608
65
69
"WHERE time >= $start_time "
66
70
"AND time < $end_time"
@@ -166,17 +170,6 @@ def _measurement_names(self) -> list[str]:
166
170
self ._measurements = self ._get_measurements ()
167
171
return self ._measurements
168
172
169
- @with_retry
170
- def _produce_records (self , records : list [dict ], measurement_name : str ):
171
- for record in records :
172
- # TODO: a key, value, and timestamp setter
173
- msg = self .serialize (
174
- key = f"{ measurement_name } _{ random .randint (1 , 1000 )} " , # noqa: S311
175
- value = record ,
176
- )
177
- self .produce (value = msg .value , key = msg .key )
178
- self .producer .flush ()
179
-
180
173
def _get_measurements (self ) -> list [str ]:
181
174
try :
182
175
result = self ._client .query (
@@ -192,6 +185,17 @@ def _get_measurements(self) -> list[str]:
192
185
)
193
186
return result ["name" ].tolist ()
194
187
188
+ @with_retry
189
+ def _produce_records (self , records : list [dict ], measurement_name : str ):
190
+ for record in records :
191
+ # TODO: a key, value, and timestamp setter
192
+ msg = self .serialize (
193
+ key = f"{ measurement_name } _{ random .randint (1 , 1000 )} " , # noqa: S311
194
+ value = record ,
195
+ )
196
+ self .produce (value = msg .value , key = msg .key )
197
+ self .producer .flush ()
198
+
195
199
@with_retry
196
200
def _query_data (self , measurement_name , start_time , end_time ):
197
201
logger .info (
0 commit comments