@@ -147,7 +147,7 @@ def save_to_file(self):
147
147
logging .debug (f"{ full_file_path } saved" )
148
148
149
149
def append (self , df ):
150
- self .df = self .df . append ( df ). reset_index ( drop = True )
150
+ self .df = pd . concat ([ self .df , df ], ignore_index = True )
151
151
152
152
def get_newest_timestamp (self ,module_mac ):
153
153
if ('module_mac' in self .df .columns ):
@@ -233,18 +233,17 @@ def _write_file(self, file_path):
233
233
class SQLHandler (DataFrameHandler ):
234
234
def __init__ (self , file_name , output_path ):
235
235
raise NotImplementedError ("sql details not setup" )
236
- from sqlalchemy import create_engine
236
+ # from sqlalchemy import create_engine
237
237
238
- super ().__init__ (df , file_name , output_path , file_format = "sql" , kwargs = {"con" : self .engine })
239
- self .engine = create_engine ("sqlite://" , echo = False )
238
+ # super().__init__(file_name, output_path, file_format="sql", kwargs={"con": self.engine})
239
+ # self.engine = create_engine("sqlite://", echo=False)
240
240
241
241
def _read_file (self , file_path ):
242
242
return pd .read_sql (file_path , ** self .kwargs )
243
243
244
244
def _write_file (self , file_path ):
245
245
raise NotImplementedError ("sql details not setup" )
246
- engine = create_engine ("sqlite://" , echo = False )
247
- df .to_sql (file_path , index = False , ** self .kwargs )
246
+ self .df .to_sql (file_path , index = False , ** self .kwargs )
248
247
249
248
250
249
class FeatherHandler (DataFrameHandler ):
@@ -393,9 +392,9 @@ def _get_field_dict(self, station_id,module_id,data_type,start_date,end_date):
393
392
"""Returns a dict to be used when requesting data through the Netatmo API"""
394
393
395
394
return {'device_id' :station_id ,
396
- 'module_id' :module_id ,
397
395
'scale' :'max' ,
398
396
'mtype' :',' .join (data_type ),
397
+ 'module_id' :module_id ,
399
398
'date_begin' :start_date ,
400
399
'date_end' :end_date }
401
400
@@ -445,14 +444,12 @@ def get_module_df(self, newest_utctime, station_name, station_mac, module_data_o
445
444
# Start with the oldest timestamp
446
445
module_start_date_timestamp = module_data_overview ['last_setup' ]
447
446
448
- # Create an empty DataFrame to fill with new values
449
- df_module = pd .DataFrame ([])
450
-
447
+ # Fill array with data
448
+ data = []
451
449
452
450
if (newest_utctime ):
453
451
# Found newer data! Change start time according to the newest value
454
452
455
-
456
453
if (newest_utctime > module_start_date_timestamp ):
457
454
module_start_date_timestamp = newest_utctime + 1
458
455
logging .info (f'Newer data found for { module_name } . Setting new start date to { self ._get_date_from_timestamp (module_start_date_timestamp , tz = time_z )} ' )
@@ -483,16 +480,17 @@ def get_module_df(self, newest_utctime, station_name, station_mac, module_data_o
483
480
try :
484
481
# Was there any data?
485
482
if (retreived_module_data ['body' ]):
486
- # Yes! Append it with df_module
487
- df_module = df_module .append (self ._to_dataframe (retreived_module_data ['body' ],
483
+ new_df = self ._to_dataframe (retreived_module_data ['body' ],
488
484
module_data_overview ,
489
485
station_name ,
490
486
station_mac ,
491
487
dtype ,
492
- time_z ))
493
- logging .debug (f'{ len (retreived_module_data ["body" ])} samples found for { module_data_overview ["module_name" ]} . { df_module .shape [0 ]} new samples collected so far.' )
488
+ time_z )
489
+ data .append (new_df )
490
+ new_df ['utc_time' ].min ()
491
+ logging .debug (f'{ len (retreived_module_data ["body" ])} samples found for { module_data_overview ["module_name" ]} . { new_df ["timestamp" ].iloc [0 ]} - { new_df ["timestamp" ].iloc [- 1 ]} ' )
494
492
# Now change the start_time
495
- module_start_date_timestamp = df_module ['utc_time' ].max () + 1
493
+ module_start_date_timestamp = new_df ['utc_time' ].max () + 1
496
494
497
495
else :
498
496
keep_collecting_module_data = False
@@ -503,8 +501,15 @@ def get_module_df(self, newest_utctime, station_name, station_mac, module_data_o
503
501
keep_collecting_module_data = False
504
502
logging .error (f'Something fishy is going on... Aborting collection for module { module_name } ' )
505
503
504
+
505
+ if data :
506
+ df_module = pd .concat (data ,ignore_index = True )
507
+ else :
508
+ df_module = pd .DataFrame ([])
509
+
510
+
506
511
logging .info (f'Collected data from { module_name } contains { df_module .shape [0 ]} samples.' )
507
- return df_module . reset_index ( drop = True )
512
+ return df_module
508
513
509
514
def main ():
510
515
@@ -602,17 +607,18 @@ def main():
602
607
nr_previous_requests = args .previous_requests )
603
608
604
609
605
- for station_mac , station_data_overview in rate_limit_handler .get_stations ():
610
+ for station_name , station_data_overview in rate_limit_handler .get_stations ():
606
611
607
- station_name = station_data_overview ['station_name ' ]
612
+ station_mac = station_data_overview ['_id ' ]
608
613
609
614
station_timezone = timezone (station_data_overview ['place' ]['timezone' ])
610
615
logging .info (f'Timezone { station_timezone } extracted from data.' )
611
616
612
617
end_datetime_timestamp = np .floor (datetime .timestamp (station_timezone .localize (args .end_datetime )))
618
+ newest_utc = df_handler .get_newest_timestamp (station_data_overview ['_id' ])
613
619
df_handler .append (
614
620
rate_limit_handler .get_module_df (
615
- df_handler . get_newest_timestamp ( station_data_overview [ '_id' ]) ,
621
+ newest_utc ,
616
622
station_name ,
617
623
station_mac ,
618
624
station_data_overview ,
0 commit comments