diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 53d90ede..0a94af2f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,16 +1,16 @@ repos: - repo: https://github.com/pycqa/isort - rev: 5.13.2 + rev: 6.0.1 hooks: - id: isort args: ["--filter-files"] - repo: https://github.com/psf/black - rev: 24.8.0 + rev: 25.1.0 hooks: - id: black args: [--safe] - repo: https://github.com/PyCQA/flake8 - rev: 7.1.1 + rev: 7.2.0 hooks: - id: flake8 args: ["--config=.flake8"] diff --git a/codecarbon/emissions_tracker.py b/codecarbon/emissions_tracker.py index 9b4294d7..4489c040 100644 --- a/codecarbon/emissions_tracker.py +++ b/codecarbon/emissions_tracker.py @@ -317,6 +317,7 @@ def __init__( self._task_stop_measurement_values = {} self._tasks: Dict[str, Task] = {} self._active_task: Optional[str] = None + self._active_task_emissions_at_start: Optional[EmissionsData] = None # Tracking mode detection ressource_tracker = ResourceTracker(self) @@ -474,7 +475,7 @@ def start_task(self, task_name=None) -> None: self._scheduler_monitor_power.start() if self._active_task: - logger.info("A task is already under measure") + logger.warning("A task is already under measure") return if not task_name: task_name = uuid.uuid4().__str__() @@ -484,8 +485,13 @@ def start_task(self, task_name=None) -> None: # Read initial energy for hardware for hardware in self._hardware: hardware.start() - _ = self._prepare_emissions_data() - _ = self._compute_emissions_delta(_) + prepared_data_for_task_start = self._prepare_emissions_data() + self._active_task_emissions_at_start = dataclasses.replace( + prepared_data_for_task_start + ) + # The existing call to _compute_emissions_delta uses the result of _prepare_emissions_data. + # Let's make sure it uses the same one we captured. + self._compute_emissions_delta(prepared_data_for_task_start) self._tasks.update( { @@ -506,20 +512,51 @@ def stop_task(self, task_name: str = None) -> EmissionsData: self._scheduler_monitor_power.stop() task_name = task_name if task_name else self._active_task + if self._tasks.get(task_name) is None: + logger.warning("stop_task : No active task to stop.") + return None self._measure_power_and_energy() + emissions_data = ( + self._prepare_emissions_data() + ) # This is emissions_data_at_stop + + if self._active_task_emissions_at_start is None: + logger.error( + f"Task {task_name}: _active_task_emissions_at_start was None. " + "This indicates an issue, possibly start_task was not called or was corrupted. " + "Reporting zero delta for this task to avoid errors." + ) + emissions_data_delta = dataclasses.replace(emissions_data) + # Zero out energy fields for the delta + emissions_data_delta.emissions = 0.0 + emissions_data_delta.emissions_rate = 0.0 + emissions_data_delta.cpu_energy = 0.0 + emissions_data_delta.gpu_energy = 0.0 + emissions_data_delta.ram_energy = 0.0 + emissions_data_delta.energy_consumed = 0.0 + else: + emissions_data_delta = dataclasses.replace(emissions_data) + emissions_data_delta.compute_delta_emission( + self._active_task_emissions_at_start + ) - emissions_data = self._prepare_emissions_data() - emissions_data_delta = self._compute_emissions_delta(emissions_data) + # Update global _previous_emissions state using the current totals at task stop. + self._compute_emissions_delta(emissions_data) task_duration = Time.from_seconds( time.perf_counter() - self._tasks[task_name].start_time ) + # task_emission_data is the final delta object to be returned and stored task_emission_data = emissions_data_delta - task_emission_data.duration = task_duration.seconds + task_emission_data.duration = ( + task_duration.seconds + ) # Set the correct duration for the task + self._tasks[task_name].emissions_data = task_emission_data self._tasks[task_name].is_active = False self._active_task = None + self._active_task_emissions_at_start = None # Clear task-specific start data return task_emission_data @@ -625,7 +662,8 @@ def _persist_data( def _prepare_emissions_data(self) -> EmissionsData: """ - :delta: If 'True', return only the delta comsumption since the last call. + Prepare the emissions data to be sent to the API or written to a file. + :return: EmissionsData object with the total emissions data. """ cloud: CloudMetadata = self._get_cloud_metadata() duration: Time = Time.from_seconds(time.perf_counter() - self._start_time) @@ -688,9 +726,14 @@ def _prepare_emissions_data(self) -> EmissionsData: return total_emissions def _compute_emissions_delta(self, total_emissions: EmissionsData) -> EmissionsData: - delta_emissions: EmissionsData = total_emissions + """ + Compute the delta emissions since the last call to this method. + :param total_emissions: The total emissions data to compute the delta from. + :return: EmissionsData with the delta emissions. + """ if self._previous_emissions is None: self._previous_emissions = total_emissions + delta_emissions: EmissionsData = total_emissions else: # Create a copy delta_emissions = dataclasses.replace(total_emissions) diff --git a/examples/task_loop_same_task.py b/examples/task_loop_same_task.py new file mode 100644 index 00000000..812ab2a3 --- /dev/null +++ b/examples/task_loop_same_task.py @@ -0,0 +1,74 @@ +import time + +from codecarbon import EmissionsTracker + +tracker = EmissionsTracker( + project_name="ZeroEnergyTestLoop", + measure_power_secs=1, # Or your desired interval + log_level="debug", # Set to debug to get all codecarbon logs + our new ones +) + + +def busy_task(duration_secs=4): + print(f" Task: Starting busy work for ~{duration_secs} seconds...") + start_time = time.perf_counter() + while time.perf_counter() - start_time < duration_secs: + # Simulate some CPU work + # for _ in range(100000): # Adjust complexity as needed + # pass + time.sleep(2) + end_time = time.perf_counter() + print(f" Task: Finished busy work in {end_time - start_time:.2f} seconds.") + + +max_rounds = 20 # Safety break for the loop + +print("Starting tracking loop. Will stop if energy_consumed is 0.0 for a task.") + +try: + for current_round in range(max_rounds): + print(f"Round {current_round + 1}:") + task_name = f"round_{current_round + 1}_task" + + tracker.start_task(task_name) + print(f" Tracker: Started task '{task_name}'") + + busy_task(duration_secs=1) # Simulate work for about 1 second + + emissions_data = tracker.stop_task() + print(f" Tracker: Stopped task '{task_name}'") + + if emissions_data: + print(f" EmissionsData for {task_name}:") + print(f" Duration: {emissions_data.duration:.4f}s") + print(f" CPU Energy: {emissions_data.cpu_energy:.6f} kWh") + print(f" GPU Energy: {emissions_data.gpu_energy:.6f} kWh") + print(f" RAM Energy: {emissions_data.ram_energy:.6f} kWh") + print( + f" Total Energy Consumed: {emissions_data.energy_consumed:.6f} kWh" + ) + print(f" Emissions: {emissions_data.emissions:.6f} kg CO2eq") + + if emissions_data.energy_consumed == 0.0: + print("###########################################################") + print( + f"INFO: energy_consumed is 0.0 in round {current_round + 1}. Stopping loop." + ) + print("###########################################################") + break + else: + print(f" WARNING: tracker.stop_task() returned None for {task_name}") + + # Small pause between rounds, can be adjusted or removed + time.sleep(1) + + else: # Executed if the loop completes without break + print( + f"Loop completed {max_rounds} rounds without encountering zero energy consumption." + ) + +except Exception as e: + print(f"An error occurred: {e}") +finally: + tracker.stop_task() + print("Script finished.") diff --git a/tests/test_emissions_tracker.py b/tests/test_emissions_tracker.py index cac50bc5..a07e7458 100644 --- a/tests/test_emissions_tracker.py +++ b/tests/test_emissions_tracker.py @@ -10,6 +10,7 @@ import requests import responses +from codecarbon.core.units import Energy, Power from codecarbon.emissions_tracker import ( EmissionsTracker, OfflineEmissionsTracker, @@ -407,6 +408,99 @@ def test_carbon_tracker_online_context_manager_TWO_GPU_PRIVATE_INFRA_CANADA( self.assertIsInstance(tracker.final_emissions, float) self.assertAlmostEqual(tracker.final_emissions, 6.262572537957655e-05, places=2) + @mock.patch( + "codecarbon.external.ram.RAM.measure_power_and_energy" + ) # Corrected path for RAM + @mock.patch( + "codecarbon.external.hardware.CPU.measure_power_and_energy" + ) # Path for CPU is likely correct + def test_task_energy_with_live_update_interference( + self, + mock_cpu_measure, # Method decorator (innermost) + mock_ram_measure, # Method decorator (outermost) + mock_setup_intel_cli, # Class decorator (innermost) + mock_log_values, # Class decorator + mocked_env_cloud_details, # Class decorator + mocked_get_gpu_details, # Class decorator + mocked_is_gpu_details_available, # Class decorator (outermost relevant one) + ): + # --- Test Setup --- + # Configure mocks to return specific, non-zero energy values + cpu_energy_val_task = 0.0001 + ram_energy_val_task = 0.00005 + mock_cpu_measure.return_value = ( + Power.from_watts(10), + Energy.from_energy(kWh=cpu_energy_val_task), + ) + mock_ram_measure.return_value = ( + Power.from_watts(5), + Energy.from_energy(kWh=ram_energy_val_task), + ) + + tracker = EmissionsTracker( + project_name="TestLiveUpdateInterference", + measure_power_secs=1, + api_call_interval=1, # Trigger live update on first opportunity + output_handlers=[], # Clear any default handlers like FileOutput + save_to_file=False, # Ensure no file is created by default + save_to_api=False, + # Config file is mocked by get_custom_mock_open in setUp + ) + + # --- Test Logic --- + tracker.start_task("my_test_task") + # Simulate some work or time passing if necessary, though energy is mocked. + # time.sleep(0.1) # Not strictly needed due to mocking + + task_data = tracker.stop_task() + # In stop_task: + # 1. _measure_power_and_energy() is called MANUALLY. + # - mock_cpu_measure and mock_ram_measure are called. + # - _total_energies get cpu_energy_val_task and ram_energy_val_task added. + # - _measure_occurrence becomes 1. + # - Since api_call_interval is 1, live update path IS triggered if _measure_occurrence >= api_call_interval: + # - _prepare_emissions_data() called (gets totals including task energy). + # - _compute_emissions_delta() called. This updates _previous_emissions. + # 2. Back in stop_task, after _measure_power_and_energy(): + # - _prepare_emissions_data() called again (gets same totals). + # - The NEW logic computes delta using _active_task_emissions_at_start. + # - The global _previous_emissions is then updated again using current totals by another _compute_emissions_delta call. + + # --- Assertions --- + self.assertIsNotNone(task_data, "Task data should not be None") + + self.assertGreater(task_data.cpu_energy, 0, "CPU energy should be non-zero") + self.assertAlmostEqual( + task_data.cpu_energy, + cpu_energy_val_task, + places=7, + msg="CPU energy does not match expected task energy", + ) + + self.assertGreater(task_data.ram_energy, 0, "RAM energy should be non-zero") + self.assertAlmostEqual( + task_data.ram_energy, + ram_energy_val_task, + places=7, + msg="RAM energy does not match expected task energy", + ) + + expected_total_energy = cpu_energy_val_task + ram_energy_val_task + self.assertGreater( + task_data.energy_consumed, 0, "Total energy consumed should be non-zero" + ) + self.assertAlmostEqual( + task_data.energy_consumed, + expected_total_energy, + places=7, + msg="Total energy consumed does not match sum of components", + ) + + # Verify mocks were called as expected + # They are called once in _measure_power_and_energy inside stop_task + mock_cpu_measure.assert_called_once() + mock_ram_measure.assert_called_once() + @responses.activate def test_carbon_tracker_offline_context_manager( self,