Skip to content

Fix: Prevent intermittent zero energy reporting for tasks #853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
repos:
- repo: https://github.com/pycqa/isort
rev: 5.13.2
rev: 6.0.1
hooks:
- id: isort
args: ["--filter-files"]
- repo: https://github.com/psf/black
rev: 24.8.0
rev: 25.1.0
hooks:
- id: black
args: [--safe]
- repo: https://github.com/PyCQA/flake8
rev: 7.1.1
rev: 7.2.0
hooks:
- id: flake8
args: ["--config=.flake8"]
Expand Down
59 changes: 51 additions & 8 deletions codecarbon/emissions_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ def __init__(
self._task_stop_measurement_values = {}
self._tasks: Dict[str, Task] = {}
self._active_task: Optional[str] = None
self._active_task_emissions_at_start: Optional[EmissionsData] = None

# Tracking mode detection
ressource_tracker = ResourceTracker(self)
Expand Down Expand Up @@ -474,7 +475,7 @@ def start_task(self, task_name=None) -> None:
self._scheduler_monitor_power.start()

if self._active_task:
logger.info("A task is already under measure")
logger.warning("A task is already under measure")
return
if not task_name:
task_name = uuid.uuid4().__str__()
Expand All @@ -484,8 +485,13 @@ def start_task(self, task_name=None) -> None:
# Read initial energy for hardware
for hardware in self._hardware:
hardware.start()
_ = self._prepare_emissions_data()
_ = self._compute_emissions_delta(_)
prepared_data_for_task_start = self._prepare_emissions_data()
self._active_task_emissions_at_start = dataclasses.replace(
prepared_data_for_task_start
)
# The existing call to _compute_emissions_delta uses the result of _prepare_emissions_data.
# Let's make sure it uses the same one we captured.
self._compute_emissions_delta(prepared_data_for_task_start)

self._tasks.update(
{
Expand All @@ -506,20 +512,51 @@ def stop_task(self, task_name: str = None) -> EmissionsData:
self._scheduler_monitor_power.stop()

task_name = task_name if task_name else self._active_task
if self._tasks.get(task_name) is None:
logger.warning("stop_task : No active task to stop.")
return None
self._measure_power_and_energy()
emissions_data = (
self._prepare_emissions_data()
) # This is emissions_data_at_stop

if self._active_task_emissions_at_start is None:
logger.error(
f"Task {task_name}: _active_task_emissions_at_start was None. "
"This indicates an issue, possibly start_task was not called or was corrupted. "
"Reporting zero delta for this task to avoid errors."
)
emissions_data_delta = dataclasses.replace(emissions_data)
# Zero out energy fields for the delta
emissions_data_delta.emissions = 0.0
emissions_data_delta.emissions_rate = 0.0
emissions_data_delta.cpu_energy = 0.0
emissions_data_delta.gpu_energy = 0.0
emissions_data_delta.ram_energy = 0.0
emissions_data_delta.energy_consumed = 0.0
else:
emissions_data_delta = dataclasses.replace(emissions_data)
emissions_data_delta.compute_delta_emission(
self._active_task_emissions_at_start
)

emissions_data = self._prepare_emissions_data()
emissions_data_delta = self._compute_emissions_delta(emissions_data)
# Update global _previous_emissions state using the current totals at task stop.
self._compute_emissions_delta(emissions_data)

task_duration = Time.from_seconds(
time.perf_counter() - self._tasks[task_name].start_time
)

# task_emission_data is the final delta object to be returned and stored
task_emission_data = emissions_data_delta
task_emission_data.duration = task_duration.seconds
task_emission_data.duration = (
task_duration.seconds
) # Set the correct duration for the task

self._tasks[task_name].emissions_data = task_emission_data
self._tasks[task_name].is_active = False
self._active_task = None
self._active_task_emissions_at_start = None # Clear task-specific start data

return task_emission_data

Expand Down Expand Up @@ -625,7 +662,8 @@ def _persist_data(

def _prepare_emissions_data(self) -> EmissionsData:
"""
:delta: If 'True', return only the delta comsumption since the last call.
Prepare the emissions data to be sent to the API or written to a file.
:return: EmissionsData object with the total emissions data.
"""
cloud: CloudMetadata = self._get_cloud_metadata()
duration: Time = Time.from_seconds(time.perf_counter() - self._start_time)
Expand Down Expand Up @@ -688,9 +726,14 @@ def _prepare_emissions_data(self) -> EmissionsData:
return total_emissions

def _compute_emissions_delta(self, total_emissions: EmissionsData) -> EmissionsData:
delta_emissions: EmissionsData = total_emissions
"""
Compute the delta emissions since the last call to this method.
:param total_emissions: The total emissions data to compute the delta from.
:return: EmissionsData with the delta emissions.
"""
if self._previous_emissions is None:
self._previous_emissions = total_emissions
delta_emissions: EmissionsData = total_emissions
else:
# Create a copy
delta_emissions = dataclasses.replace(total_emissions)
Expand Down
74 changes: 74 additions & 0 deletions examples/task_loop_same_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import time

from codecarbon import EmissionsTracker

tracker = EmissionsTracker(
project_name="ZeroEnergyTestLoop",
measure_power_secs=1, # Or your desired interval
log_level="debug", # Set to debug to get all codecarbon logs + our new ones
)


def busy_task(duration_secs=4):
print(f" Task: Starting busy work for ~{duration_secs} seconds...")
start_time = time.perf_counter()
while time.perf_counter() - start_time < duration_secs:
# Simulate some CPU work
# for _ in range(100000): # Adjust complexity as needed
# pass
time.sleep(2)
end_time = time.perf_counter()
print(f" Task: Finished busy work in {end_time - start_time:.2f} seconds.")


max_rounds = 20 # Safety break for the loop

print("Starting tracking loop. Will stop if energy_consumed is 0.0 for a task.")

try:
for current_round in range(max_rounds):
print(f"Round {current_round + 1}:")
task_name = f"round_{current_round + 1}_task"

tracker.start_task(task_name)
print(f" Tracker: Started task '{task_name}'")

busy_task(duration_secs=1) # Simulate work for about 1 second

emissions_data = tracker.stop_task()
print(f" Tracker: Stopped task '{task_name}'")

if emissions_data:
print(f" EmissionsData for {task_name}:")
print(f" Duration: {emissions_data.duration:.4f}s")
print(f" CPU Energy: {emissions_data.cpu_energy:.6f} kWh")
print(f" GPU Energy: {emissions_data.gpu_energy:.6f} kWh")
print(f" RAM Energy: {emissions_data.ram_energy:.6f} kWh")
print(
f" Total Energy Consumed: {emissions_data.energy_consumed:.6f} kWh"
)
print(f" Emissions: {emissions_data.emissions:.6f} kg CO2eq")

if emissions_data.energy_consumed == 0.0:
print("###########################################################")
print(
f"INFO: energy_consumed is 0.0 in round {current_round + 1}. Stopping loop."
)
print("###########################################################")
break
else:
print(f" WARNING: tracker.stop_task() returned None for {task_name}")

# Small pause between rounds, can be adjusted or removed
time.sleep(1)

else: # Executed if the loop completes without break
print(
f"Loop completed {max_rounds} rounds without encountering zero energy consumption."
)

except Exception as e:
print(f"An error occurred: {e}")
finally:
tracker.stop_task()
print("Script finished.")
94 changes: 94 additions & 0 deletions tests/test_emissions_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import requests
import responses

from codecarbon.core.units import Energy, Power
from codecarbon.emissions_tracker import (
EmissionsTracker,
OfflineEmissionsTracker,
Expand Down Expand Up @@ -407,6 +408,99 @@ def test_carbon_tracker_online_context_manager_TWO_GPU_PRIVATE_INFRA_CANADA(
self.assertIsInstance(tracker.final_emissions, float)
self.assertAlmostEqual(tracker.final_emissions, 6.262572537957655e-05, places=2)

@mock.patch(
"codecarbon.external.ram.RAM.measure_power_and_energy"
) # Corrected path for RAM
@mock.patch(
"codecarbon.external.hardware.CPU.measure_power_and_energy"
) # Path for CPU is likely correct
def test_task_energy_with_live_update_interference(
self,
mock_cpu_measure, # Method decorator (innermost)
mock_ram_measure, # Method decorator (outermost)
mock_setup_intel_cli, # Class decorator (innermost)
mock_log_values, # Class decorator
mocked_env_cloud_details, # Class decorator
mocked_get_gpu_details, # Class decorator
mocked_is_gpu_details_available, # Class decorator (outermost relevant one)
):
# --- Test Setup ---
# Configure mocks to return specific, non-zero energy values
cpu_energy_val_task = 0.0001
ram_energy_val_task = 0.00005
mock_cpu_measure.return_value = (
Power.from_watts(10),
Energy.from_energy(kWh=cpu_energy_val_task),
)
mock_ram_measure.return_value = (
Power.from_watts(5),
Energy.from_energy(kWh=ram_energy_val_task),
)

tracker = EmissionsTracker(
project_name="TestLiveUpdateInterference",
measure_power_secs=1,
api_call_interval=1, # Trigger live update on first opportunity
output_handlers=[], # Clear any default handlers like FileOutput
save_to_file=False, # Ensure no file is created by default
save_to_api=False,
# Config file is mocked by get_custom_mock_open in setUp
)

# --- Test Logic ---
tracker.start_task("my_test_task")
# Simulate some work or time passing if necessary, though energy is mocked.
# time.sleep(0.1) # Not strictly needed due to mocking

task_data = tracker.stop_task()
# In stop_task:
# 1. _measure_power_and_energy() is called MANUALLY.
# - mock_cpu_measure and mock_ram_measure are called.
# - _total_energies get cpu_energy_val_task and ram_energy_val_task added.
# - _measure_occurrence becomes 1.
# - Since api_call_interval is 1, live update path IS triggered if _measure_occurrence >= api_call_interval:
# - _prepare_emissions_data() called (gets totals including task energy).
# - _compute_emissions_delta() called. This updates _previous_emissions.
# 2. Back in stop_task, after _measure_power_and_energy():
# - _prepare_emissions_data() called again (gets same totals).
# - The NEW logic computes delta using _active_task_emissions_at_start.
# - The global _previous_emissions is then updated again using current totals by another _compute_emissions_delta call.

# --- Assertions ---
self.assertIsNotNone(task_data, "Task data should not be None")

self.assertGreater(task_data.cpu_energy, 0, "CPU energy should be non-zero")
self.assertAlmostEqual(
task_data.cpu_energy,
cpu_energy_val_task,
places=7,
msg="CPU energy does not match expected task energy",
)

self.assertGreater(task_data.ram_energy, 0, "RAM energy should be non-zero")
self.assertAlmostEqual(
task_data.ram_energy,
ram_energy_val_task,
places=7,
msg="RAM energy does not match expected task energy",
)

expected_total_energy = cpu_energy_val_task + ram_energy_val_task
self.assertGreater(
task_data.energy_consumed, 0, "Total energy consumed should be non-zero"
)
self.assertAlmostEqual(
task_data.energy_consumed,
expected_total_energy,
places=7,
msg="Total energy consumed does not match sum of components",
)

# Verify mocks were called as expected
# They are called once in _measure_power_and_energy inside stop_task
mock_cpu_measure.assert_called_once()
mock_ram_measure.assert_called_once()

@responses.activate
def test_carbon_tracker_offline_context_manager(
self,
Expand Down
Loading