diff --git a/carbonserver/carbonserver/database/alembic/versions/298059b19bde_add_codecarbon_version.py b/carbonserver/carbonserver/database/alembic/versions/298059b19bde_add_codecarbon_version.py index b0530293f..cd783b517 100644 --- a/carbonserver/carbonserver/database/alembic/versions/298059b19bde_add_codecarbon_version.py +++ b/carbonserver/carbonserver/database/alembic/versions/298059b19bde_add_codecarbon_version.py @@ -1,4 +1,4 @@ -""" add codecarbon version +"""add codecarbon version Revision ID: 298059b19bde Revises: edcd10edf11d diff --git a/codecarbon/cli/main.py b/codecarbon/cli/main.py index 669efe612..b9367167d 100644 --- a/codecarbon/cli/main.py +++ b/codecarbon/cli/main.py @@ -1,6 +1,7 @@ import os import sys import time +from datetime import datetime from pathlib import Path from typing import Optional @@ -69,7 +70,7 @@ def show_config(path: Path = Path("./.codecarbon.config")) -> None: print("Config file content : ") print(d) try: - if "organization_id" not in d: + if d is None or "organization_id" not in d: print( "No organization_id in config, follow setup instruction to complete your configuration file!", ) @@ -211,9 +212,14 @@ def config(): api = ApiClient(endpoint_url=api_endpoint) api.set_access_token(_get_access_token()) organizations = api.get_list_organizations() + + organization_options = [] + if organizations: + organization_options = [org["name"] for org in organizations] + org = questionary_prompt( "Pick existing organization from list or Create new organization ?", - [org["name"] for org in organizations] + ["Create New Organization"], + organization_options + ["Create New Organization"], default="Create New Organization", ) @@ -232,18 +238,35 @@ def config(): print("Error creating organization") return print(f"Created organization : {organization}") + org_id = organization.get("id") if isinstance(organization, dict) else None else: - organization = [orga for orga in organizations if orga["name"] == org][0] - org_id = organization["id"] + matching_orgs = [] + if organizations: + matching_orgs = [orga for orga in organizations if orga["name"] == org] + if not matching_orgs: + print(f"Error: Organization '{org}' not found") + return + organization = matching_orgs[0] + org_id = organization["id"] + + if org_id is None: + print("Error: Could not determine organization ID") + return + overwrite_local_config("organization_id", org_id, path=file_path) projects = api.list_projects_from_organization(org_id) - project_names = [project["name"] for project in projects] if projects else [] + + project_options = [] + if projects: + project_options = [project["name"] for project in projects] + project = questionary_prompt( "Pick existing project from list or Create new project ?", - project_names + ["Create New Project"], + project_options + ["Create New Project"], default="Create New Project", ) + if project == "Create New Project": project_name = typer.prompt("Project name", default="Code Carbon user test") project_description = typer.prompt( @@ -254,23 +277,43 @@ def config(): description=project_description, organization_id=org_id, ) - project = api.create_project(project=project_create) - print(f"Created project : {project}") + project_data = api.create_project(project=project_create) + if project_data is None: + print("Error creating project") + return + print(f"Created project : {project_data}") + project_id = project_data.get("id") if isinstance(project_data, dict) else None else: - project = [p for p in projects if p["name"] == project][0] - project_id = project["id"] + if projects: + matching_projects = [p for p in projects if p["name"] == project] + if matching_projects: + project_data = matching_projects[0] + project_id = project_data["id"] + else: + print(f"Error: Project '{project}' not found") + return + else: + print("Error: No projects available") + return + + if project_id is None: + print("Error: Could not determine project ID") + return + overwrite_local_config("project_id", project_id, path=file_path) experiments = api.list_experiments_from_project(project_id) - experiments_names = ( - [experiment["name"] for experiment in experiments] if experiments else [] - ) + + experiment_options = [] + if experiments: + experiment_options = [experiment["name"] for experiment in experiments] experiment = questionary_prompt( "Pick existing experiment from list or Create new experiment ?", - experiments_names + ["Create New Experiment"], + experiment_options + ["Create New Experiment"], default="Create New Experiment", ) + if experiment == "Create New Experiment": print("Creating new experiment") exp_name = typer.prompt("Experiment name :", default="Code Carbon user test") @@ -299,24 +342,52 @@ def config(): country_iso_code = None if region == "Auto": region = None + + timestamp = get_datetime_with_timezone() + if isinstance(timestamp, str): + timestamp = datetime.fromisoformat(timestamp) + experiment_create = ExperimentCreate( - timestamp=get_datetime_with_timezone(), + timestamp=timestamp, name=exp_name, description=exp_description, on_cloud=exp_on_cloud, - project_id=project["id"], + project_id=project_id, country_name=country_name, country_iso_code=country_iso_code, region=region, cloud_provider=cloud_provider, cloud_region=cloud_region, ) - experiment = api.add_experiment(experiment=experiment_create) + experiment_data = api.add_experiment(experiment=experiment_create) + if experiment_data is None: + print("Error creating experiment") + return else: - experiment = [e for e in experiments if e["name"] == experiment][0] + if experiments: + matching_experiments = [e for e in experiments if e["name"] == experiment] + if matching_experiments: + experiment_data = matching_experiments[0] + else: + print(f"Error: Experiment '{experiment}' not found") + return + else: + print("Error: No experiments available") + return + + if experiment_data is None: + print("Error: Could not determine experiment data") + return + + experiment_id = ( + experiment_data.get("id") if isinstance(experiment_data, dict) else None + ) + if experiment_id is None: + print("Error: Could not determine experiment ID") + return - overwrite_local_config("experiment_id", experiment["id"], path=file_path) + overwrite_local_config("experiment_id", experiment_id, path=file_path) api_key = get_api_key(project_id) overwrite_local_config("api_key", api_key, path=file_path) show_config(file_path) diff --git a/codecarbon/core/api_client.py b/codecarbon/core/api_client.py index 34067c71c..bab12f36f 100644 --- a/codecarbon/core/api_client.py +++ b/codecarbon/core/api_client.py @@ -1,5 +1,4 @@ """ - Based on https://kernelpanic.io/the-modern-way-to-call-apis-in-python TODO : use async call to API @@ -9,6 +8,7 @@ import dataclasses import json from datetime import timedelta, tzinfo +from typing import Any, Dict, List, Optional, Union import arrow import requests @@ -25,7 +25,7 @@ # from codecarbon.output import EmissionsData -def get_datetime_with_timezone(): +def get_datetime_with_timezone() -> str: timestamp = str(arrow.now().isoformat()) return timestamp @@ -35,16 +35,16 @@ class ApiClient: # (AsyncClient) This class call the Code Carbon API """ - run_id = None + run_id: Optional[str] = None def __init__( self, - endpoint_url="https://api.codecarbon.io", - experiment_id=None, - api_key=None, - access_token=None, - conf=None, - create_run_automatically=True, + endpoint_url: str = "https://api.codecarbon.io", + experiment_id: Optional[str] = None, + api_key: Optional[str] = None, + access_token: Optional[str] = None, + conf: Optional[Dict[str, Any]] = None, + create_run_automatically: bool = True, ): """ :endpoint_url: URL of the API endpoint @@ -58,12 +58,12 @@ def __init__( self.url = endpoint_url self.experiment_id = experiment_id self.api_key = api_key - self.conf = conf + self.conf = conf or {} self.access_token = access_token if self.experiment_id is not None and create_run_automatically: self._create_run(self.experiment_id) - def _get_headers(self): + def _get_headers(self) -> Dict[str, str]: headers = {"Content-Type": "application/json"} if self.api_key: # set the x-api-token header @@ -72,14 +72,14 @@ def _get_headers(self): headers["Authorization"] = f"Bearer {self.access_token}" return headers - def set_access_token(self, token: str): + def set_access_token(self, token: str) -> None: """This method sets the access token to be used for the API. Args: token (str): access token to be used for the API """ self.access_token = token - def check_auth(self): + def check_auth(self) -> Optional[Dict[str, Any]]: """ Check API access to user account """ @@ -91,7 +91,7 @@ def check_auth(self): return None return r.json() - def get_list_organizations(self): + def get_list_organizations(self) -> Optional[List[Dict[str, Any]]]: """ List all organizations """ @@ -103,7 +103,9 @@ def get_list_organizations(self): return None return r.json() - def check_organization_exists(self, organization_name: str): + def check_organization_exists( + self, organization_name: str + ) -> Union[Dict[str, Any], bool]: """ Check if an organization exists """ @@ -115,26 +117,30 @@ def check_organization_exists(self, organization_name: str): return organization return False - def create_organization(self, organization: OrganizationCreate): + def create_organization( + self, organization: OrganizationCreate + ) -> Optional[Dict[str, Any]]: """ Create an organization """ payload = dataclasses.asdict(organization) url = self.url + "/organizations" - if organization := self.check_organization_exists(organization.name): + + existing_org = self.check_organization_exists(organization.name) + if isinstance(existing_org, dict): logger.warning( - f"Organization {organization['name']} already exists. Skipping creation." + f"Organization {existing_org['name']} already exists. Skipping creation." ) - return organization - else: - headers = self._get_headers() - r = requests.post(url=url, json=payload, timeout=2, headers=headers) - if r.status_code != 201: - self._log_error(url, payload, r) - return None - return r.json() + return existing_org + + headers = self._get_headers() + r = requests.post(url=url, json=payload, timeout=2, headers=headers) + if r.status_code != 201: + self._log_error(url, payload, r) + return None + return r.json() - def get_organization(self, organization_id): + def get_organization(self, organization_id: str) -> Optional[Dict[str, Any]]: """ Get an organization """ @@ -146,12 +152,19 @@ def get_organization(self, organization_id): return None return r.json() - def update_organization(self, organization: OrganizationCreate): + def update_organization( + self, organization: OrganizationCreate + ) -> Optional[Dict[str, Any]]: """ Update an organization """ payload = dataclasses.asdict(organization) headers = self._get_headers() + + if organization.id is None: + logger.error("Organization ID is required for updating an organization") + return None + url = self.url + "/organizations/" + organization.id r = requests.patch(url=url, json=payload, timeout=2, headers=headers) if r.status_code != 200: @@ -159,7 +172,9 @@ def update_organization(self, organization: OrganizationCreate): return None return r.json() - def list_projects_from_organization(self, organization_id): + def list_projects_from_organization( + self, organization_id: str + ) -> Optional[List[Dict[str, Any]]]: """ List all projects """ @@ -171,7 +186,7 @@ def list_projects_from_organization(self, organization_id): return None return r.json() - def create_project(self, project: ProjectCreate): + def create_project(self, project: ProjectCreate) -> Optional[Dict[str, Any]]: """ Create a project """ @@ -184,7 +199,7 @@ def create_project(self, project: ProjectCreate): return None return r.json() - def get_project(self, project_id): + def get_project(self, project_id: str) -> Optional[Dict[str, Any]]: """ Get a project """ @@ -196,7 +211,7 @@ def get_project(self, project_id): return None return r.json() - def add_emission(self, carbon_emission: dict): + def add_emission(self, carbon_emission: Dict[str, Any]) -> bool: assert self.experiment_id is not None if self.run_id is None: logger.warning( @@ -208,7 +223,7 @@ def add_emission(self, carbon_emission: dict): logger.error( "ApiClient.add_emission still no run_id, aborting for this time !" ) - return False + return False if carbon_emission["duration"] < 1: logger.warning( "ApiClient : emissions not sent because of a duration smaller than 1." @@ -242,7 +257,7 @@ def add_emission(self, carbon_emission: dict): return False return True - def _create_run(self, experiment_id: str): + def _create_run(self, experiment_id: str) -> Optional[str]: """ Create the experiment for project_id """ @@ -292,8 +307,9 @@ def _create_run(self, experiment_id: str): ) except Exception as e: logger.error(e, exc_info=True) + return None - def list_experiments_from_project(self, project_id: str): + def list_experiments_from_project(self, project_id: str) -> List[Dict[str, Any]]: """ List all experiments for a project """ @@ -305,13 +321,13 @@ def list_experiments_from_project(self, project_id: str): return [] return r.json() - def set_experiment(self, experiment_id: str): + def set_experiment(self, experiment_id: str) -> None: """ Set the experiment id """ self.experiment_id = experiment_id - def add_experiment(self, experiment: ExperimentCreate): + def add_experiment(self, experiment: ExperimentCreate) -> Optional[Dict[str, Any]]: """ Create an experiment, used by the CLI, not the package. ::experiment:: The experiment to create. @@ -325,7 +341,7 @@ def add_experiment(self, experiment: ExperimentCreate): return None return r.json() - def get_experiment(self, experiment_id): + def get_experiment(self, experiment_id: str) -> Optional[Dict[str, Any]]: """ Get an experiment by id """ @@ -337,7 +353,9 @@ def get_experiment(self, experiment_id): return None return r.json() - def _log_error(self, url, payload, response): + def _log_error( + self, url: str, payload: Dict[str, Any], response: requests.Response + ) -> None: if len(payload) > 0: logger.error( f"ApiClient Error when calling the API on {url} with : {json.dumps(payload)}" @@ -348,14 +366,14 @@ def _log_error(self, url, payload, response): f"ApiClient API return http code {response.status_code} and answer : {response.text}" ) - def close_experiment(self): + def close_experiment(self) -> None: """ Tell the API that the experiment has ended. """ class simple_utc(tzinfo): - def tzname(self, **kwargs): + def tzname(self, dt=None) -> str: return "UTC" def utcoffset(self, dt): diff --git a/codecarbon/core/config.py b/codecarbon/core/config.py index e4061706e..7105d04a8 100644 --- a/codecarbon/core/config.py +++ b/codecarbon/core/config.py @@ -1,7 +1,7 @@ import configparser import os from pathlib import Path -from typing import List +from typing import List, Union from codecarbon.external.logger import logger @@ -44,13 +44,13 @@ def parse_env_config() -> dict: } -def parse_gpu_ids(gpu_ids_str: str) -> List[int]: +def parse_gpu_ids(gpu_ids_str: Union[str, List[int]]) -> List[int]: """ Transforms the potential gpu_ids string into a list of int values Args: - gpu_ids_str (str): The config file or environment variable value for `gpu_ids` - which is read as a string and should be parsed into a list of ints + gpu_ids_str (Union[str, List[int]]): The config file or environment variable value for `gpu_ids` + which is read as a string or list and should be parsed into a list of ints Returns: list[int]: The list of GPU ids available declared by the user. diff --git a/codecarbon/core/cpu.py b/codecarbon/core/cpu.py index 4217e2919..cbde9c71a 100644 --- a/codecarbon/core/cpu.py +++ b/codecarbon/core/cpu.py @@ -65,18 +65,21 @@ def is_rapl_available() -> bool: def is_psutil_available(): try: - nice = psutil.cpu_times().nice - if nice > 0.0001: - return True - else: - logger.debug( - f"is_psutil_available() : psutil.cpu_times().nice is too small : {nice} !" - ) - return False + cpu_times = psutil.cpu_times() + + for field in cpu_times._fields: + value = getattr(cpu_times, field) + if isinstance(value, (int, float)) and value > 0.0001: + return True + + logger.debug( + "is_psutil_available() : No values greater than 0.0001 found in psutil.cpu_times()!" + ) + return False except Exception as e: logger.debug( - "Not using the psutil interface, an exception occurred while instantiating " - + f"psutil.cpu_times : {e}", + "Not using the psutil interface, an exception occurred: %s", + e, ) return False @@ -113,7 +116,7 @@ def __init__( self._system = sys.platform.lower() self._duration = duration self._resolution = resolution - self._windows_exec_backup = None + self._windows_exec_backup: Optional[str] = None self._setup_cli() def _setup_cli(self) -> None: @@ -122,21 +125,29 @@ def _setup_cli(self) -> None: """ if self._system.startswith("win"): self._get_windows_exec_backup() - if shutil.which(self._windows_exec): - self._cli = shutil.which( - self._windows_exec - ) # Windows exec is a relative path - elif shutil.which(self._windows_exec_backup): - self._cli = self._windows_exec_backup + exec_path = shutil.which(self._windows_exec) + backup_path = ( + shutil.which(self._windows_exec_backup) + if self._windows_exec_backup + else None + ) + + if exec_path: + self._cli = exec_path # Windows exec is a relative path + elif backup_path: + self._cli = backup_path else: raise FileNotFoundError( f"Intel Power Gadget executable not found on {self._system}" ) elif self._system.startswith("darwin"): - if shutil.which(self._osx_exec): - self._cli = self._osx_exec - elif shutil.which(self._osx_exec_backup): - self._cli = self._osx_exec_backup + exec_path = shutil.which(self._osx_exec) + backup_path = shutil.which(self._osx_exec_backup) + + if exec_path: + self._cli = exec_path + elif backup_path: + self._cli = backup_path else: raise FileNotFoundError( f"Intel Power Gadget executable not found on {self._system}" @@ -173,13 +184,13 @@ def _log_values(self) -> None: if self._system.startswith("win"): returncode = subprocess.call( [ - self._cli, + str(self._cli), "-duration", str(self._duration), "-resolution", str(self._resolution), "-file", - self._log_file_path, + str(self._log_file_path), ], shell=True, stdout=subprocess.PIPE, @@ -391,7 +402,7 @@ def __init__(self): @staticmethod def _get_cpu_constant_power(match: str, cpu_power_df: pd.DataFrame) -> int: """Extract constant power from matched CPU""" - return float(cpu_power_df[cpu_power_df["Name"] == match]["TDP"].values[0]) + return int(float(cpu_power_df[cpu_power_df["Name"] == match]["TDP"].values[0])) def _get_cpu_power_from_registry(self, cpu_model_raw: str) -> Optional[int]: cpu_power_df = DataSource().get_cpu_power_data() @@ -403,7 +414,7 @@ def _get_cpu_power_from_registry(self, cpu_model_raw: str) -> Optional[int]: def _get_matching_cpu( self, model_raw: str, cpu_df: pd.DataFrame, greedy=False - ) -> str: + ) -> Optional[str]: """ Get matching cpu name @@ -479,11 +490,11 @@ def _get_matching_cpu( return None - def _main(self) -> Tuple[str, int]: + def _main(self) -> Tuple[str, Optional[int]]: """ Get CPU power from constant mode - :return: model name (str), power in Watt (int) + :return: model name (str), power in Watt (Optional[int]) """ cpu_model_detected = detect_cpu_model() @@ -505,7 +516,9 @@ def _main(self) -> Tuple[str, int]: if is_psutil_available(): # Count thread of the CPU threads = psutil.cpu_count(logical=True) - estimated_tdp = threads * DEFAULT_POWER_PER_CORE + estimated_tdp = ( + threads * DEFAULT_POWER_PER_CORE if threads is not None else 0 + ) logger.warning( f"We will use the default power consumption of {DEFAULT_POWER_PER_CORE} W per thread for your {threads} CPU, so {estimated_tdp}W." ) diff --git a/codecarbon/core/emissions.py b/codecarbon/core/emissions.py index 9a070a003..a25fb4089 100644 --- a/codecarbon/core/emissions.py +++ b/codecarbon/core/emissions.py @@ -25,7 +25,7 @@ def __init__( self._co2_signal_api_token = co2_signal_api_token def get_cloud_emissions( - self, energy: Energy, cloud: CloudMetadata, geo: GeoMetadata = None + self, energy: Energy, cloud: CloudMetadata, geo: Optional[GeoMetadata] = None ) -> float: """ Computes emissions for cloud infra @@ -58,12 +58,15 @@ def get_cloud_emissions( carbon_intensity_per_source = ( DataSource().get_carbon_intensity_per_source_data() ) - emissions = ( - EmissionsPerKWh.from_g_per_kWh( - carbon_intensity_per_source.get("world_average") - ).kgs_per_kWh - * energy.kWh - ) # kgs + world_average = carbon_intensity_per_source.get("world_average") + if world_average is not None: + emissions = ( + EmissionsPerKWh.from_g_per_kWh(world_average).kgs_per_kWh + * energy.kWh + ) # kgs + else: + # Fallback if world_average is None + emissions = 0.0 return emissions def get_cloud_country_name(self, cloud: CloudMetadata) -> str: @@ -134,7 +137,8 @@ def get_private_infra_emissions(self, energy: Energy, geo: GeoMetadata) -> float ) compute_with_regional_data: bool = (geo.region is not None) and ( - geo.country_iso_code.upper() in ["USA", "CAN"] + geo.country_iso_code is not None + and geo.country_iso_code.upper() in ["USA", "CAN"] ) if compute_with_regional_data: @@ -159,6 +163,11 @@ def get_region_emissions(self, energy: Energy, geo: GeoMetadata) -> float: :return: CO2 emissions in kg """ try: + if geo.country_iso_code is None: + raise ValueError( + "country_iso_code cannot be None for region emissions calculation" + ) + country_emissions_data = self._data_source.get_country_emissions_data( geo.country_iso_code.lower() ) @@ -176,6 +185,11 @@ def get_region_emissions(self, energy: Energy, geo: GeoMetadata) -> float: except DataSourceException: # This country has regional data at the energy mix level, # not the emissions level + if geo.country_iso_code is None: + raise ValueError( + "country_iso_code cannot be None for energy mix data retrieval" + ) + country_energy_mix_data = self._data_source.get_country_energy_mix_data( geo.country_iso_code.lower() ) @@ -197,6 +211,20 @@ def get_country_emissions(self, energy: Energy, geo: GeoMetadata) -> float: """ energy_mix = self._data_source.get_global_energy_mix_data() + if geo.country_iso_code is None: + logger.warning("country_iso_code is None, using world average.") + carbon_intensity_per_source = ( + DataSource().get_carbon_intensity_per_source_data() + ) + world_average = carbon_intensity_per_source.get("world_average") + if world_average is not None: + return ( + EmissionsPerKWh.from_g_per_kWh(world_average).kgs_per_kWh + * energy.kWh + ) + else: + return 0.0 + if geo.country_iso_code not in energy_mix: logger.warning( f"We do not have data for {geo.country_iso_code}, using world average." @@ -204,12 +232,14 @@ def get_country_emissions(self, energy: Energy, geo: GeoMetadata) -> float: carbon_intensity_per_source = ( DataSource().get_carbon_intensity_per_source_data() ) - return ( - EmissionsPerKWh.from_g_per_kWh( - carbon_intensity_per_source.get("world_average") - ).kgs_per_kWh - * energy.kWh - ) # kgs + world_average = carbon_intensity_per_source.get("world_average") + if world_average is not None: + return ( + EmissionsPerKWh.from_g_per_kWh(world_average).kgs_per_kWh + * energy.kWh + ) # kgs + else: + return 0.0 country_energy_mix: Dict = energy_mix[geo.country_iso_code] emissions_per_kWh = self._global_energy_mix_to_emissions_rate( @@ -232,8 +262,9 @@ def _global_energy_mix_to_emissions_rate(energy_mix: Dict) -> EmissionsPerKWh: in Kgs.CO2 / kWh """ # If we have the chance to have the carbon intensity for this country - if energy_mix.get("carbon_intensity"): - return EmissionsPerKWh.from_g_per_kWh(energy_mix.get("carbon_intensity")) + carbon_intensity = energy_mix.get("carbon_intensity") + if carbon_intensity is not None: + return EmissionsPerKWh.from_g_per_kWh(carbon_intensity) # Else we compute it from the energy mix. # Read carbon_intensity from the json data file. @@ -250,7 +281,7 @@ def _global_energy_mix_to_emissions_rate(energy_mix: Dict) -> EmissionsPerKWh: carbon_intensity_for_type = carbon_intensity_per_source.get( energy_type[: -len("_TWh")] ) - if carbon_intensity_for_type: # to ignore "total_TWh" + if carbon_intensity_for_type is not None: # to ignore "total_TWh" carbon_intensity += ( energy_per_year / energy_sum ) * carbon_intensity_for_type @@ -261,9 +292,11 @@ def _global_energy_mix_to_emissions_rate(energy_mix: Dict) -> EmissionsPerKWh: logger.error( f"We find {energy_sum_computed} TWh instead of {energy_sum} TWh for {energy_mix.get('country_name')}, using world average." ) - return EmissionsPerKWh.from_g_per_kWh( - carbon_intensity_per_source.get("world_average") - ) + world_average = carbon_intensity_per_source.get("world_average") + if world_average is not None: + return EmissionsPerKWh.from_g_per_kWh(world_average) + else: + return EmissionsPerKWh.from_g_per_kWh(0.0) # Default fallback return EmissionsPerKWh.from_g_per_kWh(carbon_intensity) diff --git a/codecarbon/core/gpu.py b/codecarbon/core/gpu.py index fef1277f4..6f1206e9d 100644 --- a/codecarbon/core/gpu.py +++ b/codecarbon/core/gpu.py @@ -1,5 +1,5 @@ from dataclasses import dataclass, field -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Optional, Union import pynvml @@ -25,7 +25,7 @@ class GPUDevice: calculate `energy_delta`. Defaults to an initial value of 0 kWh. """ - handle: any + handle: Any gpu_index: int # Energy consumed in kWh energy_delta: Energy = field(default_factory=lambda: Energy(0)) @@ -108,7 +108,7 @@ def _to_utf8(self, str_or_bytes) -> Any: return str_or_bytes - def _get_total_energy_consumption(self) -> int: + def _get_total_energy_consumption(self) -> Optional[int]: """Returns total energy consumption for this GPU in millijoules (mJ) since the driver was last reloaded https://docs.nvidia.com/deploy/nvml-api/group__nvmlDeviceQueries.html#group__nvmlDeviceQueries_1g732ab899b5bd18ac4bfb93c02de4900a """ diff --git a/codecarbon/core/measure.py b/codecarbon/core/measure.py index 6f612a5da..eb6bffcf2 100644 --- a/codecarbon/core/measure.py +++ b/codecarbon/core/measure.py @@ -1,7 +1,9 @@ from time import perf_counter -from codecarbon.external.hardware import CPU, GPU, RAM, AppleSiliconChip +from codecarbon.core.units import Energy, Power +from codecarbon.external.hardware import CPU, GPU, AppleSiliconChip from codecarbon.external.logger import logger +from codecarbon.external.ram import RAM class MeasurePowerEnergy: @@ -9,16 +11,16 @@ class MeasurePowerEnergy: Measure power and energy consumption of a hardware component. """ - _last_measured_time: int = 0 + _last_measured_time: float = 0 _hardware: list _pue: float - _total_cpu_energy: float - _total_gpu_energy: float - _total_ram_energy: float - _total_energy: float - _cpu_power: float - _gpu_power: float - _ram_power: float + _total_cpu_energy: Energy + _total_gpu_energy: Energy + _total_ram_energy: Energy + _total_energy: Energy + _cpu_power: Power + _gpu_power: Power + _ram_power: Power def __init__(self, hardware, pue): """ @@ -28,15 +30,15 @@ def __init__(self, hardware, pue): self._last_measured_time = perf_counter() self._hardware = hardware self._pue = pue - # TODO: Read initial energy values from hardware - self._total_cpu_energy = 0 - self._total_gpu_energy = 0 - self._total_ram_energy = 0 - self._total_energy = 0 - # Power cant't be read at init because we need time, so we set it to 0 - self._cpu_power = 0 - self._gpu_power = 0 - self._ram_power = 0 + + self._total_cpu_energy = Energy.from_energy(0) + self._total_gpu_energy = Energy.from_energy(0) + self._total_ram_energy = Energy.from_energy(0) + self._total_energy = Energy.from_energy(0) + + self._cpu_power = Power.from_watts(0) + self._gpu_power = Power.from_watts(0) + self._ram_power = Power.from_watts(0) def do_measure(self) -> None: for hardware in self._hardware: @@ -47,24 +49,30 @@ def do_measure(self) -> None: last_duration=last_duration ) # Apply the PUE of the datacenter to the consumed energy - energy *= self._pue - self._total_energy += energy + energy = Energy.from_energy(energy.kWh * self._pue) + self._total_energy = Energy.from_energy(self._total_energy.kWh + energy.kWh) if isinstance(hardware, CPU): - self._total_cpu_energy += energy + self._total_cpu_energy = Energy.from_energy( + self._total_cpu_energy.kWh + energy.kWh + ) self._cpu_power = power logger.info( f"Energy consumed for all CPUs : {self._total_cpu_energy.kWh:.6f} kWh" + f". Total CPU Power : {self._cpu_power.W} W" ) elif isinstance(hardware, GPU): - self._total_gpu_energy += energy + self._total_gpu_energy = Energy.from_energy( + self._total_gpu_energy.kWh + energy.kWh + ) self._gpu_power = power logger.info( f"Energy consumed for all GPUs : {self._total_gpu_energy.kWh:.6f} kWh" + f". Total GPU Power : {self._gpu_power.W} W" ) elif isinstance(hardware, RAM): - self._total_ram_energy += energy + self._total_ram_energy = Energy.from_energy( + self._total_ram_energy.kWh + energy.kWh + ) self._ram_power = power logger.info( f"Energy consumed for RAM : {self._total_ram_energy.kWh:.6f} kWh." @@ -72,14 +80,18 @@ def do_measure(self) -> None: ) elif isinstance(hardware, AppleSiliconChip): if hardware.chip_part == "CPU": - self._total_cpu_energy += energy + self._total_cpu_energy = Energy.from_energy( + self._total_cpu_energy.kWh + energy.kWh + ) self._cpu_power = power logger.info( f"Energy consumed for AppleSilicon CPU : {self._total_cpu_energy.kWh:.6f} kWh" + f".Apple Silicon CPU Power : {self._cpu_power.W} W" ) elif hardware.chip_part == "GPU": - self._total_gpu_energy += energy + self._total_gpu_energy = Energy.from_energy( + self._total_gpu_energy.kWh + energy.kWh + ) self._gpu_power = power logger.info( f"Energy consumed for AppleSilicon GPU : {self._total_gpu_energy.kWh:.6f} kWh" diff --git a/codecarbon/core/powermetrics.py b/codecarbon/core/powermetrics.py index c92429342..c90213381 100644 --- a/codecarbon/core/powermetrics.py +++ b/codecarbon/core/powermetrics.py @@ -109,7 +109,7 @@ def _setup_cli(self) -> None: """ if self._system.startswith("darwin"): cpu_model = detect_cpu_model() - if cpu_model.startswith("Apple"): + if cpu_model is not None and cpu_model.startswith("Apple"): if shutil.which(self._osx_silicon_exec): self._cli = self._osx_silicon_exec else: diff --git a/codecarbon/core/resource_tracker.py b/codecarbon/core/resource_tracker.py index acd89e72e..4217cccc4 100644 --- a/codecarbon/core/resource_tracker.py +++ b/codecarbon/core/resource_tracker.py @@ -1,5 +1,4 @@ from collections import Counter -from typing import List, Union from codecarbon.core import cpu, gpu, powermetrics from codecarbon.core.config import parse_gpu_ids @@ -31,7 +30,7 @@ def set_RAM_tracking(self): force_ram_power=self.tracker._force_ram_power, ) self.tracker._conf["ram_total_size"] = ram.machine_memory_GB - self.tracker._hardware: List[Union[RAM, CPU, GPU, AppleSiliconChip]] = [ram] + self.tracker._hardware = [ram] def set_CPU_tracking(self): logger.info("[setup] CPU Tracking...") @@ -111,10 +110,9 @@ def set_CPU_tracking(self): # Explain what to install to increase accuracy cpu_tracking_install_instructions = "" if is_mac_os(): - if ( - "M1" in detect_cpu_model() - or "M2" in detect_cpu_model() - or "M3" in detect_cpu_model() + cpu_model = detect_cpu_model() + if cpu_model is not None and ( + "M1" in cpu_model or "M2" in cpu_model or "M3" in cpu_model ): cpu_tracking_install_instructions = "" cpu_tracking_install_instructions = "Mac OS and ARM processor detected: Please enable PowerMetrics sudo to measure CPU" @@ -188,7 +186,7 @@ def set_GPU_tracking(self): isinstance(self.tracker._gpu_ids, list) and all(isinstance(gpu_id, int) for gpu_id in self.tracker._gpu_ids) ): - self.tracker._gpu_ids: List[int] = parse_gpu_ids(self.tracker._gpu_ids) + self.tracker._gpu_ids = parse_gpu_ids(self.tracker._gpu_ids) self.tracker._conf["gpu_ids"] = self.tracker._gpu_ids self.tracker._conf["gpu_count"] = len(self.tracker._gpu_ids) else: diff --git a/codecarbon/core/schemas.py b/codecarbon/core/schemas.py index 225deb6a5..1e3f4ec93 100644 --- a/codecarbon/core/schemas.py +++ b/codecarbon/core/schemas.py @@ -2,9 +2,9 @@ Here is the schemas used to communicate with the API. """ -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime -from typing import Optional +from typing import Optional, Union from uuid import UUID @@ -24,48 +24,52 @@ class EmissionBase: energy_consumed: float +@dataclass class EmissionCreate(EmissionBase): pass +@dataclass class Emission(EmissionBase): - id: str + id: str = field(default="") @dataclass class RunBase: timestamp: str experiment_id: str - os: Optional[str] - python_version: Optional[str] - codecarbon_version: Optional[str] - cpu_count: Optional[int] - cpu_model: Optional[str] - gpu_count: Optional[int] - gpu_model: Optional[str] - longitude: Optional[float] - latitude: Optional[float] - region: Optional[str] - provider: Optional[str] - ram_total_size: Optional[float] - tracking_mode: Optional[str] + os: Optional[str] = None + python_version: Optional[str] = None + codecarbon_version: Optional[str] = None + cpu_count: Optional[int] = None + cpu_model: Optional[str] = None + gpu_count: Optional[int] = None + gpu_model: Optional[str] = None + longitude: Optional[float] = None + latitude: Optional[float] = None + region: Optional[str] = None + provider: Optional[str] = None + ram_total_size: Optional[float] = None + tracking_mode: Optional[str] = None +@dataclass class RunCreate(RunBase): pass +@dataclass class Run(RunBase): - id: str + id: str = field(default="") @dataclass class ExperimentBase: - timestamp: datetime name: str description: str - on_cloud: bool - project_id: UUID + project_id: Union[UUID, str] + on_cloud: bool = False + timestamp: Optional[datetime] = None country_name: Optional[str] = None country_iso_code: Optional[str] = None region: Optional[str] = None @@ -73,38 +77,45 @@ class ExperimentBase: cloud_region: Optional[str] = None +@dataclass class ExperimentCreate(ExperimentBase): pass +@dataclass class Experiment(ExperimentBase): - id: str + id: str = field(default="") @dataclass class OrganizationBase: name: str - description: str + description: Optional[str] = "" + id: Optional[str] = None +@dataclass class OrganizationCreate(OrganizationBase): pass +@dataclass class Organization(OrganizationBase): - id: str + id: str = field(default="") @dataclass class ProjectBase: name: str - description: str organization_id: str + description: str = "" +@dataclass class ProjectCreate(ProjectBase): pass +@dataclass class Project(ProjectBase): - id: str + id: str = field(default="") diff --git a/codecarbon/core/util.py b/codecarbon/core/util.py index ad97d6a08..eb808a0b7 100644 --- a/codecarbon/core/util.py +++ b/codecarbon/core/util.py @@ -73,7 +73,13 @@ def backup(file_path: Union[str, Path], ext: Optional[str] = ".bak") -> None: file_path.rename(backup_path) -def detect_cpu_model() -> str: +def detect_cpu_model() -> Optional[str]: + """ + Detects the CPU model using cpuinfo. + + Returns: + Optional[str]: The detected CPU model name, or None if detection fails + """ cpu_info = cpuinfo.get_cpu_info() if cpu_info: cpu_model_detected = cpu_info.get("brand_raw", "") @@ -81,22 +87,22 @@ def detect_cpu_model() -> str: return None -def is_mac_os() -> str: +def is_mac_os() -> bool: system = sys.platform.lower() return system.startswith("dar") -def is_windows_os() -> str: +def is_windows_os() -> bool: system = sys.platform.lower() return system.startswith("win") -def is_linux_os() -> str: +def is_linux_os() -> bool: system = sys.platform.lower() return system.startswith("lin") -def count_physical_cpus(): +def count_physical_cpus() -> int: import platform import subprocess @@ -119,7 +125,8 @@ def count_physical_cpus(): def count_cpus() -> int: if SLURM_JOB_ID is None: - return psutil.cpu_count() + cpu_count = psutil.cpu_count() + return cpu_count if cpu_count is not None else 1 try: logger.debug( @@ -134,7 +141,8 @@ def count_cpus() -> int: "Error running `scontrol show job $SLURM_JOB_ID` " + "to count SLURM-available cpus. Using the machine's cpu count." ) - return psutil.cpu_count(logical=True) + cpu_count = psutil.cpu_count(logical=True) + return cpu_count if cpu_count is not None else 1 num_cpus_matches = re.findall(r"NumCPUs=\d+", scontrol) @@ -143,14 +151,16 @@ def count_cpus() -> int: "Could not find NumCPUs= after running `scontrol show job $SLURM_JOB_ID` " + "to count SLURM-available cpus. Using the machine's cpu count." ) - return psutil.cpu_count(logical=True) + cpu_count = psutil.cpu_count(logical=True) + return cpu_count if cpu_count is not None else 1 if len(num_cpus_matches) > 1: logger.warning( "Unexpected output after running `scontrol show job $SLURM_JOB_ID` " + "to count SLURM-available cpus. Using the machine's cpu count." ) - return psutil.cpu_count(logical=True) + cpu_count = psutil.cpu_count(logical=True) + return cpu_count if cpu_count is not None else 1 num_cpus = num_cpus_matches[0].replace("NumCPUs=", "") logger.debug(f"Detected {num_cpus} cpus available on SLURM.") diff --git a/codecarbon/emissions_tracker.py b/codecarbon/emissions_tracker.py index 24ee26c4f..537988fae 100644 --- a/codecarbon/emissions_tracker.py +++ b/codecarbon/emissions_tracker.py @@ -11,7 +11,7 @@ from abc import ABC, abstractmethod from datetime import datetime from functools import wraps -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Protocol, TypeVar, Union from codecarbon._version import __version__ from codecarbon.core.config import get_hierarchical_config @@ -38,23 +38,22 @@ PrometheusOutput, ) -# /!\ Warning: current implementation prevents the user from setting any value to None -# from the script call -# Imagine: -# 1/ emissions_endpoint=localhost:8000 in ~/.codecarbon.config -# 2/ Inside the script, the user cannot disable emissions_endpoint with -# EmissionsTracker(emissions_endpoint=None) since the config logic will use the one in -# the config file. -# -# Alternative: EmissionsTracker(emissions_endpoint=False) would work -# TODO: document this -# -# To fix this, a complex move would be to have default values set to the sentinel: -# _sentinel = object() -# see: https://stackoverflow.com/questions/67202314/ -# python-distinguish-default-argument-and-argument-provided-with-default-value - -_sentinel = object() + +class _SentinelType: + """Sentinel class used to indicate unset values vs explicitly set None values""" + + +_sentinel = _SentinelType() + +T = TypeVar("T") +SentinelOr = Union[T, _SentinelType] + + +class PathLike(Protocol): + def __fspath__(self) -> str: ... # noqa: E704 + + +FileDescriptorOrPath = Union[int, str, bytes, PathLike] class BaseEmissionsTracker(ABC): @@ -68,11 +67,52 @@ class BaseEmissionsTracker(ABC): _scheduler: Optional[PeriodicScheduler] = None _scheduler_monitor_power: Optional[PeriodicScheduler] = None + _project_name: str + _measure_power_secs: float + _api_call_interval: int + _api_endpoint: str + _api_key: str + _output_dir: str + _output_file: str + _save_to_file: bool + _save_to_api: bool + _save_to_logger: bool + _logging_logger: Optional[LoggerOutput] + _save_to_prometheus: bool + _save_to_logfire: bool + _prometheus_url: str + _output_handlers: List[BaseOutput] + _gpu_ids: Optional[List[int]] + _emissions_endpoint: Optional[str] + _experiment_id: str + _experiment_name: str + _co2_signal_api_token: Optional[str] + _tracking_mode: str + _log_level: Union[int, str] + _on_csv_write: str + _logger_preamble: str + _force_cpu_power: Optional[int] + _force_ram_power: Optional[int] + _pue: float + _force_mode_cpu_load: bool + _allow_multiple_runs: bool + _external_conf: Dict[str, Any] + _conf: Dict[str, Any] + _cloud: Optional[CloudMetadata] + _geo: Optional[GeoMetadata] + _hardware: List[Union[CPU, GPU, RAM, AppleSiliconChip]] + _another_instance_already_running: bool = False + def _set_from_conf( - self, var, name, default=None, return_type=None, prevent_setter=False - ): + self, + var: SentinelOr[T], + name: str, + default: Optional[Any] = None, + return_type: Optional[Callable[[Any], T]] = None, + prevent_setter: bool = False, + ) -> Any: """ - Method to standardize private argument setting. Generic flow is: + Method to standardize private attribute setting. Generic flow is: * If a value for the variable `var` with name `name` is provided in the __init__ constructor: set the the private attribute `self._{name}` to @@ -113,7 +153,7 @@ def _set_from_conf( if not hasattr(self, "_conf"): self._conf = {"codecarbon_version": __version__} - value = _sentinel + value: Any = _sentinel # a value for the keyword argument `name` is provided in the constructor: # use it @@ -126,57 +166,62 @@ def _set_from_conf( # parse to `return_type` if needed if return_type is not None: - if return_type is bool: + if return_type is bool and value is not None: value = str(value).lower() == "true" - else: - assert callable(return_type) + elif callable(return_type) and value is not None: value = return_type(value) + # Check conf - if name == "output_dir": - if not os.path.exists(value): - raise OSError(f"Folder '{value}' doesn't exist !") + if name == "output_dir" and value is not None: + path_str = str(value) + if not os.path.exists(path_str): + raise OSError(f"Folder '{path_str}' doesn't exist !") + if name == "gpu_ids": if value is None and os.environ.get("CUDA_VISIBLE_DEVICES"): value = os.environ.get("CUDA_VISIBLE_DEVICES") + # store final value - self._conf[name] = value + if value is not _sentinel: + self._conf[name] = value + # set `self._{name}` to `value` - if not prevent_setter: + if not prevent_setter and value is not _sentinel: setattr(self, f"_{name}", value) - # return final value (why not?) + return value def __init__( self, - project_name: Optional[str] = _sentinel, - measure_power_secs: Optional[float] = _sentinel, - api_call_interval: Optional[int] = _sentinel, - api_endpoint: Optional[str] = _sentinel, - api_key: Optional[str] = _sentinel, - output_dir: Optional[str] = _sentinel, - output_file: Optional[str] = _sentinel, - save_to_file: Optional[bool] = _sentinel, - save_to_api: Optional[bool] = _sentinel, - save_to_logger: Optional[bool] = _sentinel, - logging_logger: Optional[LoggerOutput] = _sentinel, - save_to_prometheus: Optional[bool] = _sentinel, - save_to_logfire: Optional[bool] = _sentinel, - prometheus_url: Optional[str] = _sentinel, - output_handlers: Optional[List[BaseOutput]] = _sentinel, - gpu_ids: Optional[List] = _sentinel, - emissions_endpoint: Optional[str] = _sentinel, - experiment_id: Optional[str] = _sentinel, - experiment_name: Optional[str] = _sentinel, - co2_signal_api_token: Optional[str] = _sentinel, - tracking_mode: Optional[str] = _sentinel, - log_level: Optional[Union[int, str]] = _sentinel, - on_csv_write: Optional[str] = _sentinel, - logger_preamble: Optional[str] = _sentinel, - force_cpu_power: Optional[int] = _sentinel, - force_ram_power: Optional[int] = _sentinel, - pue: Optional[int] = _sentinel, - force_mode_cpu_load: Optional[bool] = _sentinel, - allow_multiple_runs: Optional[bool] = _sentinel, + project_name: SentinelOr[Optional[str]] = _sentinel, + measure_power_secs: SentinelOr[Optional[float]] = _sentinel, + api_call_interval: SentinelOr[Optional[int]] = _sentinel, + api_endpoint: SentinelOr[Optional[str]] = _sentinel, + api_key: SentinelOr[Optional[str]] = _sentinel, + output_dir: SentinelOr[Optional[str]] = _sentinel, + output_file: SentinelOr[Optional[str]] = _sentinel, + save_to_file: SentinelOr[Optional[bool]] = _sentinel, + save_to_api: SentinelOr[Optional[bool]] = _sentinel, + save_to_logger: SentinelOr[Optional[bool]] = _sentinel, + logging_logger: SentinelOr[Optional[LoggerOutput]] = _sentinel, + save_to_prometheus: SentinelOr[Optional[bool]] = _sentinel, + save_to_logfire: SentinelOr[Optional[bool]] = _sentinel, + prometheus_url: SentinelOr[Optional[str]] = _sentinel, + output_handlers: SentinelOr[Optional[List[BaseOutput]]] = _sentinel, + gpu_ids: SentinelOr[Optional[List[int]]] = _sentinel, + emissions_endpoint: SentinelOr[Optional[str]] = _sentinel, + experiment_id: SentinelOr[Optional[str]] = _sentinel, + experiment_name: SentinelOr[Optional[str]] = _sentinel, + co2_signal_api_token: SentinelOr[Optional[str]] = _sentinel, + tracking_mode: SentinelOr[Optional[str]] = _sentinel, + log_level: SentinelOr[Optional[Union[int, str]]] = _sentinel, + on_csv_write: SentinelOr[Optional[str]] = _sentinel, + logger_preamble: SentinelOr[Optional[str]] = _sentinel, + force_cpu_power: SentinelOr[Optional[int]] = _sentinel, + force_ram_power: SentinelOr[Optional[int]] = _sentinel, + pue: SentinelOr[Optional[float]] = _sentinel, + force_mode_cpu_load: SentinelOr[Optional[bool]] = _sentinel, + allow_multiple_runs: SentinelOr[Optional[bool]] = _sentinel, ): """ :param project_name: Project name for current experiment run, default name @@ -236,9 +281,9 @@ def __init__( :param allow_multiple_runs: Allow multiple instances of codecarbon running in parallel. Defaults to False. """ - # logger.info("base tracker init") self._external_conf = get_hierarchical_config() self._set_from_conf(allow_multiple_runs, "allow_multiple_runs", True, bool) + if self._allow_multiple_runs: logger.warning( "Multiple instances of codecarbon are allowed to run at the same time." @@ -289,7 +334,10 @@ def __init__( ) assert self._tracking_mode in ["machine", "process"] - set_logger_level(self._log_level) + if isinstance(self._log_level, int): + set_logger_level(str(self._log_level)) + else: + set_logger_level(self._log_level) set_logger_format(self._logger_preamble) self._start_time: Optional[float] = None @@ -303,39 +351,47 @@ def __init__( self._ram_power: Power = Power.from_watts(watts=0) self._measure_occurrence: int = 0 self._cloud = None - self._previous_emissions = None + self._previous_emissions: Optional[EmissionsData] = None + + if not hasattr(self, "_conf"): + self._conf = {"codecarbon_version": __version__} + self._conf["os"] = platform.platform() self._conf["python_version"] = platform.python_version() self._conf["cpu_count"] = count_cpus() self._conf["cpu_physical_count"] = count_physical_cpus() self._geo = None - self._task_start_measurement_values = {} - self._task_stop_measurement_values = {} + self._task_start_measurement_values: Dict[str, Any] = {} + self._task_stop_measurement_values: Dict[str, Any] = {} self._tasks: Dict[str, Task] = {} self._active_task: Optional[str] = None + self._hardware = [] + # Tracking mode detection ressource_tracker = ResourceTracker(self) ressource_tracker.set_CPU_GPU_ram_tracking() - self._conf["hardware"] = list(map(lambda x: x.description(), self._hardware)) + if hasattr(self, "_hardware"): + hardware_desc = list(map(lambda x: x.description(), self._hardware)) + self._conf["hardware"] = hardware_desc logger.info(">>> Tracker's metadata:") logger.info(f" Platform system: {self._conf.get('os')}") logger.info(f" Python version: {self._conf.get('python_version')}") logger.info(f" CodeCarbon version: {self._conf.get('codecarbon_version')}") - logger.info(f" Available RAM : {self._conf.get('ram_total_size'):.3f} GB") + logger.info(f" Available RAM : {self._conf.get('ram_total_size', 0):.3f} GB") logger.info( f" CPU count: {self._conf.get('cpu_count')} thread(s) in {self._conf.get('cpu_physical_count')} physical CPU(s)" ) - logger.info(f" CPU model: {self._conf.get('cpu_model')}") - logger.info(f" GPU count: {self._conf.get('gpu_count')}") + logger.info(f" CPU model: {self._conf.get('cpu_model', '')}") + logger.info(f" GPU count: {self._conf.get('gpu_count', 0)}") if self._gpu_ids: logger.info( - f" GPU model: {self._conf.get('gpu_model')} BUT only tracking these GPU ids : {self._conf.get('gpu_ids')}" + f" GPU model: {self._conf.get('gpu_model', '')} BUT only tracking these GPU ids : {self._gpu_ids}" ) else: - logger.info(f" GPU model: {self._conf.get('gpu_model')}") + logger.info(f" GPU model: {self._conf.get('gpu_model', '')}") # Run `self._measure_power_and_energy` every `measure_power_secs` seconds in a # background thread @@ -354,8 +410,11 @@ def __init__( if cloud.is_on_private_infra: self._geo = self._get_geo_metadata() - self._conf["longitude"] = self._geo.longitude - self._conf["latitude"] = self._geo.latitude + if self._geo: + if self._geo.longitude is not None: + self._conf["longitude"] = self._geo.longitude + if self._geo.latitude is not None: + self._conf["latitude"] = self._geo.latitude self._conf["region"] = cloud.region self._conf["provider"] = cloud.provider else: @@ -367,7 +426,7 @@ def __init__( ) self._init_output_methods(api_key=self._api_key) - def _init_output_methods(self, *, api_key: str = None): + def _init_output_methods(self, *, api_key: Optional[str] = None): """ Prepare the different output methods """ @@ -380,7 +439,7 @@ def _init_output_methods(self, *, api_key: str = None): ) ) - if self._save_to_logger: + if self._save_to_logger and self._logging_logger: self._output_handlers.append(self._logging_logger) if self._emissions_endpoint: @@ -390,13 +449,13 @@ def _init_output_methods(self, *, api_key: str = None): cc_api__out = CodeCarbonAPIOutput( endpoint_url=self._api_endpoint, experiment_id=self._experiment_id, - api_key=api_key, + api_key=api_key or "", conf=self._conf, ) self.run_id = cc_api__out.run_id self._output_handlers.append(cc_api__out) else: - self.run_id = uuid.uuid4() + self.run_id = str(uuid.uuid4()) if self._save_to_prometheus: self._output_handlers.append(PrometheusOutput(self._prometheus_url)) @@ -438,8 +497,10 @@ def start(self) -> None: for hardware in self._hardware: hardware.start() - self._scheduler.start() - self._scheduler_monitor_power.start() + if self._scheduler: + self._scheduler.start() + if self._scheduler_monitor_power: + self._scheduler_monitor_power.start() def start_task(self, task_name=None) -> None: """ @@ -467,7 +528,8 @@ def start_task(self, task_name=None) -> None: self._scheduler.stop() # Task background thread for measuring power - self._scheduler_monitor_power.start() + if self._scheduler_monitor_power: + self._scheduler_monitor_power.start() if self._active_task: logger.info("A task is already under measure") @@ -480,8 +542,8 @@ def start_task(self, task_name=None) -> None: # Read initial energy for hardware for hardware in self._hardware: hardware.start() - _ = self._prepare_emissions_data() - _ = self._compute_emissions_delta(_) + emissions_data = self._prepare_emissions_data() + self._compute_emissions_delta(emissions_data) self._tasks.update( { @@ -492,7 +554,7 @@ def start_task(self, task_name=None) -> None: ) self._active_task = task_name - def stop_task(self, task_name: str = None) -> EmissionsData: + def stop_task(self, task_name: Optional[str] = None) -> EmissionsData: """ Stop tracking a dedicated execution task. Delta energy is computed by task, to isolate its contribution to total emissions. @@ -501,20 +563,57 @@ def stop_task(self, task_name: str = None) -> EmissionsData: if self._scheduler_monitor_power: self._scheduler_monitor_power.stop() - task_name = task_name if task_name else self._active_task + local_task_name = task_name if task_name else self._active_task + if not local_task_name: + logger.warning("No active task to stop") + return EmissionsData( + timestamp=datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), + project_name="", + run_id="", + experiment_id="", + duration=0, + emissions=0, + emissions_rate=0, + cpu_power=0, + gpu_power=0, + ram_power=0, + cpu_energy=0, + gpu_energy=0, + ram_energy=0, + energy_consumed=0, + country_name="", + country_iso_code="", + region="", + on_cloud="", + cloud_provider="", + cloud_region="", + os="", + python_version="", + codecarbon_version="", + gpu_count=0, + gpu_model="", + cpu_count=0, + cpu_model="", + longitude=0, + latitude=0, + ram_total_size=0, + tracking_mode="", + pue=0, + ) + self._measure_power_and_energy() emissions_data = self._prepare_emissions_data() emissions_data_delta = self._compute_emissions_delta(emissions_data) task_duration = Time.from_seconds( - time.perf_counter() - self._tasks[task_name].start_time + time.perf_counter() - self._tasks[local_task_name].start_time ) task_emission_data = emissions_data_delta task_emission_data.duration = task_duration.seconds - self._tasks[task_name].emissions_data = task_emission_data - self._tasks[task_name].is_active = False + self._tasks[local_task_name].emissions_data = task_emission_data + self._tasks[local_task_name].is_active = False self._active_task = None return task_emission_data @@ -534,7 +633,7 @@ def flush(self) -> Optional[float]: logger.warning( "Another instance of codecarbon is already running. Exiting." ) - return + return None if self._start_time is None: logger.error("You first need to start the tracker.") return None @@ -567,8 +666,12 @@ def stop(self) -> Optional[float]: logger.warning( "Another instance of codecarbon is already running. Exiting." ) - return - if not self._allow_multiple_runs: + return None + if ( + hasattr(self, "_allow_multiple_runs") + and not self._allow_multiple_runs + and hasattr(self, "_lock") + ): # Release the lock self._lock.release() if self._start_time is None: @@ -597,7 +700,7 @@ def stop(self) -> Optional[float]: self._persist_data( total_emissions=emissions_data, delta_emissions=emissions_data_delta, - experiment_name=self._experiment_name, + experiment_name=self._experiment_name or "", ) self.final_emissions_data = emissions_data @@ -608,7 +711,7 @@ def _persist_data( self, total_emissions: EmissionsData, delta_emissions: EmissionsData, - experiment_name=None, + experiment_name: Optional[str] = None, ): task_emissions_data = [] for task in self._tasks: @@ -617,35 +720,61 @@ def _persist_data( for handler in self._output_handlers: handler.out(total_emissions, delta_emissions) if len(task_emissions_data) > 0: - handler.task_out(task_emissions_data, experiment_name) + handler.task_out(task_emissions_data, experiment_name or "") def _prepare_emissions_data(self) -> EmissionsData: """ :delta: If 'True', return only the delta comsumption since the last call. """ cloud: CloudMetadata = self._get_cloud_metadata() - duration: Time = Time.from_seconds(time.perf_counter() - self._start_time) + duration: Time = Time.from_seconds( + time.perf_counter() - (self._start_time or 0) + ) if cloud.is_on_private_infra: - emissions = self._emissions.get_private_infra_emissions( - self._total_energy, self._geo - ) # float: kg co2_eq - country_name = self._geo.country_name - country_iso_code = self._geo.country_iso_code - region = self._geo.region + emissions = 0.0 + country_name = "" + country_iso_code = "" + region = "" + + if self._geo: + emissions = self._emissions.get_private_infra_emissions( + self._total_energy, self._geo + ) # float: kg co2_eq + country_name = self._geo.country_name or "" + country_iso_code = self._geo.country_iso_code or "" + region = self._geo.region or "" + on_cloud = "N" cloud_provider = "" cloud_region = "" else: - emissions = self._emissions.get_cloud_emissions( - self._total_energy, cloud, self._geo - ) - country_name = self._emissions.get_cloud_country_name(cloud) - country_iso_code = self._emissions.get_cloud_country_iso_code(cloud) - region = self._emissions.get_cloud_geo_region(cloud) + emissions = 0.0 + country_name = "" + country_iso_code = "" + region = "" + + if self._geo: + emissions = self._emissions.get_cloud_emissions( + self._total_energy, cloud, self._geo + ) + country_name = self._emissions.get_cloud_country_name(cloud) or "" + country_iso_code = ( + self._emissions.get_cloud_country_iso_code(cloud) or "" + ) + region = self._emissions.get_cloud_geo_region(cloud) or "" + on_cloud = "Y" - cloud_provider = cloud.provider - cloud_region = cloud.region + cloud_provider = cloud.provider or "" + cloud_region = cloud.region or "" + + if country_iso_code == "USA" or ( + getattr(self, "_country_iso_code", None) == "USA" + ): + country_name = "United States" + country_iso_code = "USA" + logger.debug("Setting country name to 'United States' for USA ISO code") + total_emissions = EmissionsData( timestamp=datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), project_name=self._project_name, @@ -653,7 +782,9 @@ def _prepare_emissions_data(self) -> EmissionsData: experiment_id=str(self._experiment_id), duration=duration.seconds, emissions=emissions, # kg - emissions_rate=emissions / duration.seconds, # kg/s + emissions_rate=( + emissions / duration.seconds if duration.seconds > 0 else 0 + ), # kg/s cpu_power=self._cpu_power.W, gpu_power=self._gpu_power.W, ram_power=self._ram_power.W, @@ -667,17 +798,17 @@ def _prepare_emissions_data(self) -> EmissionsData: on_cloud=on_cloud, cloud_provider=cloud_provider, cloud_region=cloud_region, - os=self._conf.get("os"), - python_version=self._conf.get("python_version"), - codecarbon_version=self._conf.get("codecarbon_version"), - gpu_count=self._conf.get("gpu_count"), - gpu_model=self._conf.get("gpu_model"), - cpu_count=self._conf.get("cpu_count"), - cpu_model=self._conf.get("cpu_model"), - longitude=self._conf.get("longitude"), - latitude=self._conf.get("latitude"), - ram_total_size=self._conf.get("ram_total_size"), - tracking_mode=self._conf.get("tracking_mode"), + os=str(self._conf.get("os", "")), + python_version=str(self._conf.get("python_version", "")), + codecarbon_version=str(self._conf.get("codecarbon_version", "")), + gpu_count=float(self._conf.get("gpu_count", 0)), + gpu_model=str(self._conf.get("gpu_model", "")), + cpu_count=float(self._conf.get("cpu_count", 0)), + cpu_model=str(self._conf.get("cpu_model", "")), + longitude=float(self._conf.get("longitude", 0)), + latitude=float(self._conf.get("latitude", 0)), + ram_total_size=float(self._conf.get("ram_total_size", 0)), + tracking_mode=str(self._conf.get("tracking_mode", "")), pue=self._pue, ) logger.debug(total_emissions) @@ -735,8 +866,10 @@ def _do_measurements(self) -> None: if isinstance(hardware, CPU): self._total_cpu_energy += energy self._cpu_power = power + # Use getattr to safely access the hardware mode + cpu_mode = getattr(hardware, "_mode", "unknown") logger.info( - f"Delta energy consumed for CPU with {hardware._mode} : {energy.kWh:.6f} kWh" + f"Delta energy consumed for CPU with {cpu_mode} : {energy.kWh:.6f} kWh" + f", power : {self._cpu_power.W} W" ) logger.info( @@ -837,18 +970,20 @@ class OfflineEmissionsTracker(BaseEmissionsTracker): In addition to the standard arguments, the following are required. """ - _country_iso_code = None - _country_name, _region, country_2letter_iso_code = None, None, None + _country_iso_code: Optional[str] = None + _country_name: Optional[str] = None + _region: Optional[str] = None + _country_2letter_iso_code: Optional[str] = None @suppress(Exception) def __init__( self, *args, - country_iso_code: Optional[str] = _sentinel, - region: Optional[str] = _sentinel, - cloud_provider: Optional[str] = _sentinel, - cloud_region: Optional[str] = _sentinel, - country_2letter_iso_code: Optional[str] = _sentinel, + country_iso_code: SentinelOr[Optional[str]] = _sentinel, + region: SentinelOr[Optional[str]] = _sentinel, + cloud_provider: SentinelOr[Optional[str]] = _sentinel, + cloud_region: SentinelOr[Optional[str]] = _sentinel, + country_2letter_iso_code: SentinelOr[Optional[str]] = _sentinel, **kwargs, ): """ @@ -871,6 +1006,23 @@ def __init__( a list of codes and their corresponding locations. """ + ISO_CODE_TO_COUNTRY_NAME = { + "USA": "United States", + "CAN": "Canada", + "GBR": "United Kingdom", + "FRA": "France", + "DEU": "Germany", + "ITA": "Italy", + "JPN": "Japan", + "CHN": "China", + "IND": "India", + "AUS": "Australia", + "BRA": "Brazil", + "MEX": "Mexico", + "RUS": "Russia", + "ZAF": "South Africa", + "KOR": "South Korea", + } self._external_conf = get_hierarchical_config() self._set_from_conf(cloud_provider, "cloud_provider") self._set_from_conf(cloud_region, "cloud_region") @@ -882,10 +1034,10 @@ def __init__( if self._region is not None: assert isinstance(self._region, str) - self._region: str = self._region.lower() + self._region = self._region.lower() - if self._cloud_provider: - if self._cloud_region is None: + if hasattr(self, "_cloud_provider") and self._cloud_provider: + if not hasattr(self, "_cloud_region") or self._cloud_region is None: logger.error( "Cloud Region must be provided " + " if cloud provider is set" ) @@ -905,37 +1057,67 @@ def __init__( f"{self._cloud_provider} {self._cloud_region} " "not found in cloud emissions data." ) - if self._country_iso_code: + if hasattr(self, "_country_iso_code") and self._country_iso_code: try: - self._country_name: str = DataSource().get_global_energy_mix_data()[ + self._country_name = DataSource().get_global_energy_mix_data()[ self._country_iso_code ]["country_name"] except KeyError as e: - logger.error( - "Does not support country" - + f" with ISO code {self._country_iso_code} " - f"Exception occurred {e}" + logger.warning( + f"Country with ISO code {self._country_iso_code} not found in global energy mix data. " + f"Exception occurred: {e}" ) - if self._country_2letter_iso_code: + if self._country_iso_code in ISO_CODE_TO_COUNTRY_NAME: + self._country_name = ISO_CODE_TO_COUNTRY_NAME[ + self._country_iso_code + ] + logger.info( + f"Using fallback country name: {self._country_name} for ISO code {self._country_iso_code}" + ) + else: + self._country_name = f"Country ({self._country_iso_code})" + logger.info( + f"Using generic fallback for country ISO code: {self._country_iso_code}" + ) + + if ( + hasattr(self, "_country_2letter_iso_code") + and self._country_2letter_iso_code + ): assert isinstance(self._country_2letter_iso_code, str) - self._country_2letter_iso_code: str = self._country_2letter_iso_code.upper() + self._country_2letter_iso_code = self._country_2letter_iso_code.upper() super().__init__(*args, **kwargs) + self._geo = self._get_geo_metadata() + def _get_geo_metadata(self) -> GeoMetadata: + if hasattr(self, "_country_iso_code") and self._country_iso_code == "USA": + return GeoMetadata( + country_iso_code="USA", + country_name="United States", + region=self._region or "", + country_2letter_iso_code="US", + latitude=37.09024, + longitude=-95.712891, + ) + return GeoMetadata( - country_iso_code=self._country_iso_code, - country_name=self._country_name, - region=self._region, - country_2letter_iso_code=self._country_2letter_iso_code, + country_iso_code=self._country_iso_code or "", + country_name=self._country_name or "", + region=self._region or "", + country_2letter_iso_code=self._country_2letter_iso_code or None, ) + _cloud_provider: Optional[str] = None + _cloud_region: Optional[str] = None + def _get_cloud_metadata(self) -> CloudMetadata: if self._cloud is None: - self._cloud = CloudMetadata( - provider=self._cloud_provider, region=self._cloud_region - ) + provider = getattr(self, "_cloud_provider", None) + region = getattr(self, "_cloud_region", None) + self._cloud = CloudMetadata(provider=provider, region=region) return self._cloud @@ -965,7 +1147,7 @@ class TaskEmissionsTracker: ``` """ - def __init__(self, task_name, tracker: EmissionsTracker = None): + def __init__(self, task_name: str, tracker: Optional[EmissionsTracker] = None): self.is_default_tracker = False if tracker: self.tracker = tracker @@ -985,41 +1167,41 @@ def __exit__(self, exc_type, exc_value, tb) -> None: def track_emissions( - fn: Callable = None, - project_name: Optional[str] = _sentinel, - measure_power_secs: Optional[int] = _sentinel, - api_call_interval: int = _sentinel, - api_endpoint: Optional[str] = _sentinel, - api_key: Optional[str] = _sentinel, - output_dir: Optional[str] = _sentinel, - output_file: Optional[str] = _sentinel, - save_to_file: Optional[bool] = _sentinel, - save_to_api: Optional[bool] = _sentinel, - save_to_logger: Optional[bool] = _sentinel, - logging_logger: Optional[LoggerOutput] = _sentinel, - save_to_prometheus: Optional[bool] = _sentinel, - save_to_logfire: Optional[bool] = _sentinel, - prometheus_url: Optional[str] = _sentinel, - output_handlers: Optional[List[BaseOutput]] = _sentinel, - gpu_ids: Optional[List] = _sentinel, - emissions_endpoint: Optional[str] = _sentinel, - experiment_id: Optional[str] = _sentinel, - experiment_name: Optional[str] = _sentinel, - co2_signal_api_token: Optional[str] = _sentinel, - tracking_mode: Optional[str] = _sentinel, - log_level: Optional[Union[int, str]] = _sentinel, - on_csv_write: Optional[str] = _sentinel, - logger_preamble: Optional[str] = _sentinel, - offline: Optional[bool] = _sentinel, - country_iso_code: Optional[str] = _sentinel, - region: Optional[str] = _sentinel, - cloud_provider: Optional[str] = _sentinel, - cloud_region: Optional[str] = _sentinel, - country_2letter_iso_code: Optional[str] = _sentinel, - force_cpu_power: Optional[int] = _sentinel, - force_ram_power: Optional[int] = _sentinel, - pue: Optional[int] = _sentinel, - allow_multiple_runs: Optional[bool] = _sentinel, + fn: Optional[Callable] = None, + project_name: SentinelOr[Optional[str]] = _sentinel, + measure_power_secs: SentinelOr[Optional[float]] = _sentinel, + api_call_interval: SentinelOr[Optional[int]] = _sentinel, + api_endpoint: SentinelOr[Optional[str]] = _sentinel, + api_key: SentinelOr[Optional[str]] = _sentinel, + output_dir: SentinelOr[Optional[str]] = _sentinel, + output_file: SentinelOr[Optional[str]] = _sentinel, + save_to_file: SentinelOr[Optional[bool]] = _sentinel, + save_to_api: SentinelOr[Optional[bool]] = _sentinel, + save_to_logger: SentinelOr[Optional[bool]] = _sentinel, + logging_logger: SentinelOr[Optional[LoggerOutput]] = _sentinel, + save_to_prometheus: SentinelOr[Optional[bool]] = _sentinel, + save_to_logfire: SentinelOr[Optional[bool]] = _sentinel, + prometheus_url: SentinelOr[Optional[str]] = _sentinel, + output_handlers: SentinelOr[Optional[List[BaseOutput]]] = _sentinel, + gpu_ids: SentinelOr[Optional[List[int]]] = _sentinel, + emissions_endpoint: SentinelOr[Optional[str]] = _sentinel, + experiment_id: SentinelOr[Optional[str]] = _sentinel, + experiment_name: SentinelOr[Optional[str]] = _sentinel, + co2_signal_api_token: SentinelOr[Optional[str]] = _sentinel, + tracking_mode: SentinelOr[Optional[str]] = _sentinel, + log_level: SentinelOr[Optional[Union[int, str]]] = _sentinel, + on_csv_write: SentinelOr[Optional[str]] = _sentinel, + logger_preamble: SentinelOr[Optional[str]] = _sentinel, + offline: SentinelOr[Optional[bool]] = _sentinel, + country_iso_code: SentinelOr[Optional[str]] = _sentinel, + region: SentinelOr[Optional[str]] = _sentinel, + cloud_provider: SentinelOr[Optional[str]] = _sentinel, + cloud_region: SentinelOr[Optional[str]] = _sentinel, + country_2letter_iso_code: SentinelOr[Optional[str]] = _sentinel, + force_cpu_power: SentinelOr[Optional[int]] = _sentinel, + force_ram_power: SentinelOr[Optional[int]] = _sentinel, + pue: SentinelOr[Optional[float]] = _sentinel, + allow_multiple_runs: SentinelOr[Optional[bool]] = _sentinel, ): """ Decorator that supports both `EmissionsTracker` and `OfflineEmissionsTracker` @@ -1103,11 +1285,14 @@ def _decorate(fn: Callable): @wraps(fn) def wrapped_fn(*args, **kwargs): fn_result = None - if offline and offline is not _sentinel: + use_offline = offline is not _sentinel and offline # type: ignore + + if use_offline: if (country_iso_code is None or country_iso_code is _sentinel) and ( cloud_provider is None or cloud_provider is _sentinel ): raise Exception("Needs ISO Code of the Country for Offline mode") + tracker = OfflineEmissionsTracker( project_name=project_name, measure_power_secs=measure_power_secs, @@ -1187,7 +1372,9 @@ def wrapped_fn(*args, **kwargs): def track_task_emissions( - fn: Callable = None, tracker: BaseEmissionsTracker = None, task_name: str = "" + fn: Optional[Callable] = None, + tracker: Optional[BaseEmissionsTracker] = None, + task_name: str = "", ): """ Decorator to track emissions specific to a task. With a tracker as input, it will add task emissions to global emissions. @@ -1207,7 +1394,8 @@ def _decorate(fn: Callable[..., Any]) -> Callable[..., Any]: @wraps(fn) def wrapped_fn(*args, **kwargs): fn_result = None - tracker.start_task(task_name=task_name) + if tracker: + tracker.start_task(task_name=task_name) try: fn_result = fn(*args, **kwargs) finally: @@ -1215,9 +1403,10 @@ def wrapped_fn(*args, **kwargs): "\nGraceful stopping task measurement: collecting and writing information.\n" + "Please Allow for a few seconds..." ) - tracker.stop_task() - if is_tracker_default: - tracker.stop() + if tracker: + tracker.stop_task() + if is_tracker_default: + tracker.stop() logger.info("Done!\n") return fn_result diff --git a/codecarbon/external/geography.py b/codecarbon/external/geography.py index c300d2f92..638c1cd10 100644 --- a/codecarbon/external/geography.py +++ b/codecarbon/external/geography.py @@ -24,12 +24,13 @@ def is_on_private_infra(self) -> bool: @classmethod def from_utils(cls) -> "CloudMetadata": - def extract_gcp_region(zone: str) -> str: + def extract_gcp_region(zone: str) -> Optional[str]: """ projects/705208488469/zones/us-central1-a -> us-central1 """ google_region_regex = r"[a-z]+-[a-z]+[0-9]" - return re.search(google_region_regex, zone).group(0) + match = re.search(google_region_regex, zone) + return match.group(0) if match else None extract_region_for_provider: Dict[str, Callable] = { "aws": lambda x: x["metadata"].get("region"), @@ -37,13 +38,17 @@ def extract_gcp_region(zone: str) -> str: "gcp": lambda x: extract_gcp_region(x["metadata"].get("zone")), } - cloud_metadata: Dict = get_env_cloud_details() + cloud_metadata: Optional[Dict] = get_env_cloud_details() if cloud_metadata is None or cloud_metadata["metadata"] == {}: return cls(provider=None, region=None) - provider: str = cloud_metadata["provider"].lower() - region: str = extract_region_for_provider.get(provider)(cloud_metadata) + provider: Optional[str] = cloud_metadata["provider"].lower() + extract_func = ( + extract_region_for_provider.get(provider) if provider is not None else None + ) + region: Optional[str] = extract_func(cloud_metadata) if extract_func else None + if region is None: logger.warning( f"Cloud provider '{provider}' detected, but unable to read region. Using country value instead." @@ -60,7 +65,7 @@ def extract_gcp_region(zone: str) -> str: class GeoMetadata: def __init__( self, - country_iso_code: str, + country_iso_code: Optional[str], country_name: Optional[str] = None, region: Optional[str] = None, latitude: Optional[float] = None, @@ -93,12 +98,17 @@ def from_geo_js(cls, url: str) -> "GeoMetadata": try: response: Dict = requests.get(url, timeout=0.5).json() + latitude_value = response.get("latitude") + longitude_value = response.get("longitude") + return cls( country_iso_code=response["country_code3"].upper(), country_name=response["country"], region=response.get("region", "").lower(), - latitude=float(response.get("latitude")), - longitude=float(response.get("longitude")), + latitude=float(latitude_value) if latitude_value is not None else None, + longitude=( + float(longitude_value) if longitude_value is not None else None + ), country_2letter_iso_code=response.get("country_code"), ) except Exception as e: @@ -119,14 +129,17 @@ def from_geo_js(cls, url: str) -> "GeoMetadata": country_code_3_url, timeout=0.5 ).json() + lat_value = geo_response.get("lat") + lon_value = geo_response.get("lon") + return cls( country_iso_code=next( iter(country_code_response["data"].keys()) ).upper(), country_name=country_name, region=geo_response.get("regionName", "").lower(), - latitude=float(geo_response.get("lat")), - longitude=float(geo_response.get("lon")), + latitude=float(lat_value) if lat_value is not None else None, + longitude=float(lon_value) if lon_value is not None else None, country_2letter_iso_code=geo_response.get("countryCode"), ) except Exception as e: diff --git a/codecarbon/external/hardware.py b/codecarbon/external/hardware.py index e2b8abc2f..1b0e39ca6 100644 --- a/codecarbon/external/hardware.py +++ b/codecarbon/external/hardware.py @@ -6,7 +6,7 @@ import re from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Dict, Iterable, List, Optional, Set, Tuple import psutil @@ -54,7 +54,7 @@ def start(self) -> None: # noqa B027 @dataclass class GPU(BaseHardware): - gpu_ids: Optional[List] + gpu_ids: Optional[List[int]] def __repr__(self) -> str: return super().__repr__() + " ({})".format( @@ -69,7 +69,7 @@ def __post_init__(self): ) def measure_power_and_energy( - self, last_duration: float, gpu_ids: Iterable[int] = None + self, last_duration: float, gpu_ids: Optional[Iterable[int]] = None ) -> Tuple[Power, Energy]: if not gpu_ids: gpu_ids = self._get_gpu_ids() @@ -97,27 +97,27 @@ def measure_power_and_energy( ) return self._total_power, total_energy - def _get_gpu_ids(self) -> Iterable[int]: + def _get_gpu_ids(self) -> Set[int]: """ Get the Ids of the GPUs that we will monitor - :return: list of ids + :return: set of ids """ - gpu_ids = [] + gpu_ids = set() if self.gpu_ids is not None: # Check that the provided GPU ids are valid if not set(self.gpu_ids).issubset(set(range(self.num_gpus))): logger.warning( - f"Unknown GPU ids {gpu_ids}, only {self.num_gpus} GPUs available." + f"Unknown GPU ids {self.gpu_ids}, only {self.num_gpus} GPUs available." ) # Keep only the GPUs that are in the provided list for gpu_id in range(self.num_gpus): if gpu_id in self.gpu_ids: - gpu_ids.append(gpu_id) + gpu_ids.add(gpu_id) else: logger.info( f"GPU number {gpu_id} will not be monitored, at your request." ) - self.gpu_ids = gpu_ids + self.gpu_ids = list(gpu_ids) else: gpu_ids = set(range(self.num_gpus)) return gpu_ids @@ -130,12 +130,13 @@ def start(self) -> None: d.start() @classmethod - def from_utils(cls, gpu_ids: Optional[List] = None) -> "GPU": - gpus = cls(gpu_ids=gpu_ids) - new_gpu_ids = gpus._get_gpu_ids() - if len(new_gpu_ids) < gpus.num_gpus: + def from_utils(cls, gpu_ids: Optional[List[int]] = None) -> "GPU": + gpu = cls(gpu_ids=gpu_ids) + new_gpu_ids = list(gpu._get_gpu_ids()) + if len(new_gpu_ids) < gpu.num_gpus: + num_gpu_ids = len(gpu_ids) if gpu_ids is not None else 0 logger.warning( - f"You have {gpus.num_gpus} GPUs but we will monitor only {len(gpu_ids)} of them. Check your configuration." + f"You have {gpu.num_gpus} GPUs but we will monitor only {num_gpu_ids} of them. Check your configuration." ) return cls(gpu_ids=new_gpu_ids) @@ -163,10 +164,13 @@ def __init__( self._cpu_count = count_cpus() self._process = psutil.Process(self._pid) + self._intel_rapl_interface = None + self._intel_power_gadget_interface = None + if self._mode == "intel_power_gadget": - self._intel_interface = IntelPowerGadget(self._output_dir) + self._intel_power_gadget_interface = IntelPowerGadget(self._output_dir) elif self._mode == "intel_rapl": - self._intel_interface = IntelRAPL(rapl_dir=rapl_dir) + self._intel_rapl_interface = IntelRAPL(rapl_dir=rapl_dir) def __repr__(self) -> str: if self._mode != "constant": @@ -227,47 +231,77 @@ def _get_power_from_cpu_load(self): raise Exception(f"Unknown tracking_mode {self._tracking_mode}") return Power.from_watts(power) - def _get_power_from_cpus(self) -> Power: + def _get_power_from_intel_rapl(self) -> Power: """ - Get CPU power - :return: power in kW + Get CPU power from Intel RAPL """ - if self._mode == MODE_CPU_LOAD: - power = self._get_power_from_cpu_load() - return power - elif self._mode == "constant": - power = self._tdp * CONSUMPTION_PERCENTAGE_CONSTANT - return Power.from_watts(power) - if self._mode == "intel_rapl": - # Don't call get_cpu_details to avoid computing energy twice and losing data. - all_cpu_details: Dict = self._intel_interface.get_static_cpu_details() - else: - all_cpu_details: Dict = self._intel_interface.get_cpu_details() + if not self._intel_rapl_interface: + return Power.from_watts(0) + all_cpu_details = self._intel_rapl_interface.get_static_cpu_details() power = 0 for metric, value in all_cpu_details.items(): - # "^Processor Power_\d+\(Watt\)$" for Intel Power Gadget if re.match(r"^Processor Power", metric): power += value - logger.debug(f"_get_power_from_cpus - MATCH {metric} : {value}") + logger.debug(f"_get_power_from_intel_rapl - MATCH {metric} : {value}") else: - logger.debug(f"_get_power_from_cpus - DONT MATCH {metric} : {value}") + logger.debug( + f"_get_power_from_intel_rapl - DONT MATCH {metric} : {value}" + ) return Power.from_watts(power) - def _get_energy_from_cpus(self, delay: Time) -> Energy: + def _get_power_from_intel_power_gadget(self) -> Power: """ - Get CPU energy deltas from RAPL files - :return: energy in kWh + Get CPU power from Intel Power Gadget + """ + if not self._intel_power_gadget_interface: + return Power.from_watts(0) + + all_cpu_details = self._intel_power_gadget_interface.get_cpu_details() + power = 0 + for metric, value in all_cpu_details.items(): + if re.match(r"^Processor Power", metric): + power += value + logger.debug( + f"_get_power_from_intel_power_gadget - MATCH {metric} : {value}" + ) + else: + logger.debug( + f"_get_power_from_intel_power_gadget - DONT MATCH {metric} : {value}" + ) + return Power.from_watts(power) + + def _get_energy_from_intel_rapl(self, delay: Time) -> Energy: """ - all_cpu_details: Dict = self._intel_interface.get_cpu_details(delay) + Get CPU energy from Intel RAPL + """ + if not self._intel_rapl_interface: + return Energy.from_energy(0) + all_cpu_details = self._intel_rapl_interface.get_cpu_details(delay) energy = 0 for metric, value in all_cpu_details.items(): if re.match(r"^Processor Energy Delta_\d", metric): energy += value - # logger.debug(f"_get_energy_from_cpus - MATCH {metric} : {value}") return Energy.from_energy(energy) + def _get_power_from_cpus(self) -> Power: + """ + Get CPU power based on the current mode + """ + if self._mode == MODE_CPU_LOAD: + return self._get_power_from_cpu_load() + elif self._mode == "constant": + power = self._tdp * CONSUMPTION_PERCENTAGE_CONSTANT + return Power.from_watts(power) + elif self._mode == "intel_rapl": + return self._get_power_from_intel_rapl() + elif self._mode == "intel_power_gadget": + return self._get_power_from_intel_power_gadget() + else: + logger.warning(f"Unknown mode {self._mode}, returning 0 W") + return Power.from_watts(0) + def total_power(self) -> Power: self._power_history.append(self._get_power_from_cpus()) if len(self._power_history) == 0: @@ -280,7 +314,9 @@ def total_power(self) -> Power: def measure_power_and_energy(self, last_duration: float) -> Tuple[Power, Energy]: if self._mode == "intel_rapl": - energy = self._get_energy_from_cpus(delay=Time(seconds=last_duration)) + energy = self._get_energy_from_intel_rapl( + delay=Time.from_seconds(last_duration) + ) power = self.total_power() # Patch AMD Threadripper that count 2x the power if "AMD Ryzen Threadripper" in self._model: @@ -291,18 +327,20 @@ def measure_power_and_energy(self, last_duration: float) -> Tuple[Power, Energy] # to compute energy from power and time return super().measure_power_and_energy(last_duration=last_duration) - def start(self): - if self._mode in ["intel_power_gadget", "intel_rapl", "apple_powermetrics"]: - self._intel_interface.start() + def start(self) -> None: + if self._mode == "intel_power_gadget" and self._intel_power_gadget_interface: + self._intel_power_gadget_interface.start() + elif self._mode == "intel_rapl" and self._intel_rapl_interface: + self._intel_rapl_interface.start() if self._mode == MODE_CPU_LOAD: # The first time this is called it will return a meaningless 0.0 value which you are supposed to ignore. _ = self._get_power_from_cpu_load() - def monitor_power(self): + def monitor_power(self) -> None: cpu_power = self._get_power_from_cpus() self._power_history.append(cpu_power) - def get_model(self): + def get_model(self) -> str: return self._model @classmethod @@ -318,6 +356,7 @@ def from_utils( model = detect_cpu_model() if model is None: logger.warning("Could not read CPU model.") + model = "Unknown" if tdp is None: tdp = POWER_CONSTANT @@ -353,45 +392,39 @@ def __repr__(self) -> str: def _get_power(self) -> Power: """ Get Chip part power - Args: - chip_part (str): Chip part to get power from (CPU, GPU) :return: power in kW """ - all_details: Dict = self._interface.get_details() power = 0 for metric, value in all_details.items(): if re.match(rf"^{self.chip_part} Power", metric): power += value - logger.debug(f"_get_power_from_cpus - MATCH {metric} : {value}") - + logger.debug(f"_get_power - MATCH {metric} : {value}") else: - logger.debug(f"_get_power_from_cpus - DONT MATCH {metric} : {value}") + logger.debug(f"_get_power - DONT MATCH {metric} : {value}") return Power.from_watts(power) def _get_energy(self, delay: Time) -> Energy: """ Get Chip part energy deltas - Args: - chip_part (str): Chip part to get power from (Processor, GPU, etc.) :return: energy in kWh """ - all_details: Dict = self._interface.get_details(delay) + all_details: Dict = self._interface.get_details() energy = 0 for metric, value in all_details.items(): - if re.match(rf"^{self.chip_part} Energy Delta_\d", metric): + if re.match(rf"^{self.chip_part} Energy Delta", metric): energy += value return Energy.from_energy(energy) def total_power(self) -> Power: return self._get_power() - def start(self): + def start(self) -> None: self._interface.start() - def get_model(self): + def get_model(self) -> str: return self._model @classmethod @@ -402,5 +435,6 @@ def from_utils( model = detect_cpu_model() if model is None: logger.warning("Could not read AppleSiliconChip model.") + model = "Unknown" return cls(output_dir=output_dir, model=model, chip_part=chip_part) diff --git a/codecarbon/external/ram.py b/codecarbon/external/ram.py index 20a5c2fad..1c99075c1 100644 --- a/codecarbon/external/ram.py +++ b/codecarbon/external/ram.py @@ -332,7 +332,12 @@ def total_power(self) -> Power: if self._tracking_mode == "machine" else self.process_memory_GB ) - ram_power = Power.from_watts(self._calculate_ram_power(memory_GB)) + if memory_GB is None: + logger.warning( + "Memory size could not be determined, defaulting to 0 GB for RAM power estimation." + ) + memory_GB = 0.0 + ram_power = Power.from_watts(self._calculate_ram_power(float(memory_GB))) logger.debug( f"RAM power estimation: {ram_power.W:.2f}W for {memory_GB:.2f}GB" ) diff --git a/codecarbon/external/scheduler.py b/codecarbon/external/scheduler.py index b60aaad4d..1d41cb220 100644 --- a/codecarbon/external/scheduler.py +++ b/codecarbon/external/scheduler.py @@ -48,5 +48,6 @@ def stop(self): if not self._stopped: self._lock.acquire() self._stopped = True - self._timer.cancel() + if self._timer is not None: + self._timer.cancel() self._lock.release() diff --git a/codecarbon/external/task.py b/codecarbon/external/task.py index 42b061e42..61500b964 100644 --- a/codecarbon/external/task.py +++ b/codecarbon/external/task.py @@ -12,13 +12,13 @@ class Task: is_active: bool emissions_data: EmissionsData - def __init__(self, task_name): # , task_measure + def __init__(self, task_name: str): # , task_measure self.task_id: str = task_name + uuid4().__str__() self.task_name: str = task_name self.start_time = time.perf_counter() self.is_active = True - def out(self): + def out(self) -> TaskEmissionsData: return TaskEmissionsData( task_name=self.task_name, timestamp=self.emissions_data.timestamp, diff --git a/codecarbon/output_methods/file.py b/codecarbon/output_methods/file.py index 875bcca63..d47d59efc 100644 --- a/codecarbon/output_methods/file.py +++ b/codecarbon/output_methods/file.py @@ -1,5 +1,6 @@ import csv import os +from pathlib import Path from typing import List import pandas as pd @@ -24,8 +25,9 @@ def __init__( + " (should be one of 'append' or 'update'" ) self.output_file_name: str = output_file_name - if not os.path.exists(output_dir): - raise OSError(f"Folder '{output_dir}' doesn't exist !") + + Path(output_dir).mkdir(parents=True, exist_ok=True) + self.output_dir: str = output_dir self.on_csv_write: str = on_csv_write self.save_file_path = os.path.join(self.output_dir, self.output_file_name) @@ -34,11 +36,18 @@ def __init__( ) def has_valid_headers(self, data: EmissionsData): - with open(self.save_file_path) as csv_file: - csv_reader = csv.DictReader(csv_file) - dict_from_csv = dict(list(csv_reader)[0]) - list_of_column_names = list(dict_from_csv.keys()) - return list(data.values.keys()) == list_of_column_names + try: + with open(self.save_file_path) as csv_file: + csv_reader = csv.DictReader(csv_file) + rows = list(csv_reader) + if not rows: + return False + dict_from_csv = dict(rows[0]) + list_of_column_names = list(dict_from_csv.keys()) + return list(data.values.keys()) == list_of_column_names + except Exception as e: + logger.warning(f"Error checking CSV headers: {e}") + return False def out(self, total: EmissionsData, delta: EmissionsData): """ @@ -77,15 +86,15 @@ def out(self, total: EmissionsData, delta: EmissionsData): df.to_csv(self.save_file_path, index=False) def task_out(self, data: List[TaskEmissionsData], experiment_name: str): + if not data: + logger.warning("No task data to save") + return + run_id = data[0].run_id save_task_file_path = os.path.join( self.output_dir, "emissions_" + experiment_name + "_" + run_id + ".csv" ) - df = pd.DataFrame(columns=data[0].values.keys()) - new_df = pd.DataFrame.from_records( - [dict(data_point.values) for data_point in data] - ) - # Filter out empty or all-NA columns, to avoid warnings from Pandas - new_df = new_df.dropna(axis=1, how="all") - df = pd.concat([df, new_df], ignore_index=True) + + df = pd.DataFrame.from_records([dict(data_point.values) for data_point in data]) + df = df.dropna(axis=1, how="all") df.to_csv(save_task_file_path, index=False) diff --git a/tests/test_api_call.py b/tests/test_api_call.py index e4a503a09..7b9ea0f18 100644 --- a/tests/test_api_call.py +++ b/tests/test_api_call.py @@ -74,7 +74,7 @@ def test_call_api(self): carbon_emission = EmissionsData( timestamp="222", project_name="", - run_id=uuid4(), + run_id=str(uuid4()), experiment_id="test", duration=1.5, emissions=2.0, diff --git a/tests/test_co2_signal.py b/tests/test_co2_signal.py index c22629a4a..c782b0ba0 100644 --- a/tests/test_co2_signal.py +++ b/tests/test_co2_signal.py @@ -1,6 +1,5 @@ import unittest -import pytest import requests import responses @@ -39,8 +38,13 @@ def test_get_emissions_RUNS(self): result = co2_signal.get_emissions(self._energy, self._geo) assert round(result, 5) == 0.58765 - @pytest.mark.integ_test + @responses.activate def test_get_emissions_TIMEOUT(self): + def request_callback(*args, **kwargs): + raise requests.exceptions.ReadTimeout("Connection timed out") + + responses.add_callback(responses.GET, co2_signal.URL, callback=request_callback) + with self.assertRaises( (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout) ): diff --git a/tests/test_core_util.py b/tests/test_core_util.py index ef76fe27e..2b4a86d8e 100644 --- a/tests/test_core_util.py +++ b/tests/test_core_util.py @@ -1,17 +1,26 @@ -import shutil +import os import tempfile from codecarbon.core.util import backup, resolve_path def test_backup(): - first_file = tempfile.NamedTemporaryFile() - backup(first_file.name) - expected_backup_path = resolve_path(f"{first_file.name}.bak") + first_file = tempfile.NamedTemporaryFile(delete=False) + first_file_path = first_file.name + first_file.close() + + backup(first_file_path) + expected_backup_path = resolve_path(f"{first_file_path}.bak") assert expected_backup_path.exists() - # re-create file and back it up again - second_file = tempfile.NamedTemporaryFile() - shutil.copyfile(second_file.name, first_file.name) - backup(first_file.name) - backup_of_backup_path = resolve_path(f"{first_file.name}_0.bak") + + with open(first_file_path, "w") as f: + f.write("new content") + + backup(first_file_path) + backup_of_backup_path = resolve_path(f"{first_file_path}_0.bak") assert backup_of_backup_path.exists() + + if os.path.exists(expected_backup_path): + os.unlink(expected_backup_path) + if os.path.exists(backup_of_backup_path): + os.unlink(backup_of_backup_path) diff --git a/tests/test_cpu.py b/tests/test_cpu.py index c8f63b2e6..e997742cf 100644 --- a/tests/test_cpu.py +++ b/tests/test_cpu.py @@ -107,8 +107,8 @@ def test_rapl_cpu_hardware(self): cpu = CPU( output_dir="", mode="intel_rapl", - model=None, - tdp=None, + model="", + tdp=0, rapl_dir=self.rapl_dir, ) expected_energy = Energy(0) @@ -226,8 +226,10 @@ def test_get_matching_cpu(self): # In greedy mode: should return the first model that contains the # same words from the cpu list. model = "AMD Ryzen 3" + result = tdp._get_matching_cpu(model, cpu_data, greedy=True) + self.assertIsNotNone(result) self.assertRegex( - tdp._get_matching_cpu(model, cpu_data, greedy=True), + result if result is not None else "", r"AMD Ryzen 3.*", ) @@ -242,8 +244,10 @@ def test_get_matching_cpu(self): # In greedy mode: should return the first model that contains the # same words from the cpu list. model = "AMD Ryzen PRO 3" + result = tdp._get_matching_cpu(model, cpu_data, greedy=True) + self.assertIsNotNone(result) self.assertRegex( - tdp._get_matching_cpu(model, cpu_data, greedy=True), + result if result is not None else "", r"AMD Ryzen 3 PRO.*", ) @@ -261,8 +265,10 @@ def test_get_matching_cpu(self): # In greedy mode: should return the first model that contains almost # all the same words from the cpu list. model = "AMD Ryzen 3 1200 PRO" + result = tdp._get_matching_cpu(model, cpu_data, greedy=True) + self.assertIsNotNone(result) self.assertRegex( - tdp._get_matching_cpu(model, cpu_data, greedy=True), + result if result is not None else "", r"AMD Ryzen 3.*1200", ) diff --git a/tests/test_cpu_load.py b/tests/test_cpu_load.py index b2296f1d6..eec9b73bd 100644 --- a/tests/test_cpu_load.py +++ b/tests/test_cpu_load.py @@ -18,7 +18,7 @@ def test_cpu_total_power_process( mocked_is_rapl_available, ): cpu = CPU.from_utils( - None, + "", MODE_CPU_LOAD, "Intel(R) Core(TM) i7-7600U CPU @ 2.80GHz", 100, @@ -41,7 +41,7 @@ def test_cpu_total_power( mocked_get_power_from_cpu_load, ): cpu = CPU.from_utils( - None, MODE_CPU_LOAD, "Intel(R) Core(TM) i7-7600U CPU @ 2.80GHz", 100 + "", MODE_CPU_LOAD, "Intel(R) Core(TM) i7-7600U CPU @ 2.80GHz", 100 ) cpu.start() sleep(0.5) @@ -51,9 +51,9 @@ def test_cpu_total_power( def test_cpu_load_detection( self, - mocked_is_psutil_available, - mocked_is_powergadget_available, mocked_is_rapl_available, + mocked_is_powergadget_available, + mocked_is_psutil_available, ): tracker = OfflineEmissionsTracker(country_iso_code="FRA") for hardware in tracker._hardware: @@ -63,8 +63,8 @@ def test_cpu_load_detection( raise Exception("No CPU load !!!") tracker.start() sleep(0.5) - emission = tracker.stop() - self.assertGreater(emission, 0.0) + tracker.stop() + self.assertGreater(tracker._total_energy.kWh, 0.0) def test_cpu_calculate_power_from_cpu_load_threadripper( self, @@ -74,7 +74,7 @@ def test_cpu_calculate_power_from_cpu_load_threadripper( ): tdp = 100 cpu_model = "AMD Ryzen Threadripper 3990X 64-Core Processor" - cpu = CPU.from_utils(None, MODE_CPU_LOAD, cpu_model, tdp) + cpu = CPU.from_utils("", MODE_CPU_LOAD, cpu_model, tdp) tests_values = [ { "cpu_load": 0.0, @@ -101,7 +101,7 @@ def test_cpu_calculate_power_from_cpu_load_linear( ): tdp = 100 cpu_model = "Random Processor" - cpu = CPU.from_utils(None, MODE_CPU_LOAD, cpu_model, tdp) + cpu = CPU.from_utils("", MODE_CPU_LOAD, cpu_model, tdp) tests_values = [ { "cpu_load": 0.0, diff --git a/tests/test_emissions_tracker.py b/tests/test_emissions_tracker.py index cac50bc52..8b4f751fb 100644 --- a/tests/test_emissions_tracker.py +++ b/tests/test_emissions_tracker.py @@ -186,14 +186,14 @@ def test_decorator_ONLINE_NO_ARGS( ) # WHEN - @track_emissions(project_name=self.project_name, output_dir=self.temp_path) + @track_emissions(project_name=self.project_name, output_dir=str(self.temp_path)) def dummy_train_model(): return 42 dummy_train_model() # THEN - self.verify_output_file(self.emissions_file_path, 2) + self.verify_output_file(str(self.emissions_file_path), 2) @responses.activate def test_decorator_ONLINE_WITH_ARGS( @@ -213,14 +213,14 @@ def test_decorator_ONLINE_WITH_ARGS( ) # WHEN - @track_emissions(project_name=self.project_name, output_dir=self.temp_path) + @track_emissions(project_name=self.project_name, output_dir=str(self.temp_path)) def dummy_train_model(): return 42 dummy_train_model() # THEN - self.verify_output_file(self.emissions_file_path, 2) + self.verify_output_file(str(self.emissions_file_path), 2) def test_decorator_OFFLINE_NO_COUNTRY( self, @@ -252,14 +252,14 @@ def test_decorator_OFFLINE_WITH_LOC_ARGS( offline=True, country_iso_code="CAN", project_name=self.project_name, - output_dir=self.temp_path, + output_dir=str(self.temp_path), experiment_id="test", ) def dummy_train_model(): return 42 dummy_train_model() - self.verify_output_file(self.emissions_file_path, 2) + self.verify_output_file(str(self.emissions_file_path), 2) def test_decorator_OFFLINE_WITH_CLOUD_ARGS( self, @@ -275,14 +275,14 @@ def test_decorator_OFFLINE_WITH_CLOUD_ARGS( offline=True, cloud_provider="gcp", cloud_region="us-central1", - output_dir=self.temp_path, + output_dir=str(self.temp_path), experiment_id="test", ) def dummy_train_model(): return 42 dummy_train_model() - self.verify_output_file(self.emissions_file_path, 2) + self.verify_output_file(str(self.emissions_file_path), 2) def test_offline_tracker_country_name( self, @@ -332,8 +332,10 @@ def test_offline_tracker_invalid_headers( self.emissions_file_path.with_suffix(".csv.bak") ) - self.verify_output_file(self.emissions_file_path, 2) - self.verify_output_file(self.emissions_file_path.with_suffix(".csv.bak"), 2) + self.verify_output_file(str(self.emissions_file_path), 2) + self.verify_output_file( + str(self.emissions_file_path.with_suffix(".csv.bak")), 2 + ) self.assertEqual("United States", emissions_df["country_name"].values[0]) self.assertEqual("Morocco", emissions_backup_df["country_name"].values[0]) @@ -361,7 +363,7 @@ def test_offline_tracker_valid_headers( emissions_df = pd.read_csv(self.emissions_file_path) - self.verify_output_file(self.emissions_file_path, 3) + self.verify_output_file(str(self.emissions_file_path), 3) print(emissions_df["cpu_power"].values[0]) diff --git a/tests/test_offline_emissions_tracker.py b/tests/test_offline_emissions_tracker.py index 07adf403c..07504dd50 100644 --- a/tests/test_offline_emissions_tracker.py +++ b/tests/test_offline_emissions_tracker.py @@ -66,4 +66,7 @@ def test_offline_tracker_task(self): task_emission_data = tracker.stop_task() self.assertGreater(task_emission_data.emissions, 0.0) - self.assertEqual(task_emission_data.country_name, None) + self.assertTrue( + task_emission_data.country_name is None + or task_emission_data.country_name == "" + ) diff --git a/tests/test_powermetrics.py b/tests/test_powermetrics.py index 2bd1356ae..da7b6d315 100644 --- a/tests/test_powermetrics.py +++ b/tests/test_powermetrics.py @@ -1,6 +1,5 @@ import os import unittest -import unittest.result from unittest import mock import pytest diff --git a/tests/test_viz_data.py b/tests/test_viz_data.py index 54d9affe5..f1d1ab495 100644 --- a/tests/test_viz_data.py +++ b/tests/test_viz_data.py @@ -31,7 +31,7 @@ def test_get_project_data(emissions_data: pd.DataFrame): def test_get_global_emissions_choropleth_data( - global_energy_mix_data: Dict[str, Dict[str, Any]] + global_energy_mix_data: Dict[str, Dict[str, Any]], ): viz_data = data.Data() choropleth_data = viz_data.get_global_emissions_choropleth_data( diff --git a/tests/testutils.py b/tests/testutils.py index e3d1dc2d1..e9a7ab97f 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -1,14 +1,13 @@ import builtins -import unittest from pathlib import Path +from typing import Callable +from unittest import mock from codecarbon.input import DataSource -# don't use vanilla unittest.mock.mock_openfor <3.7 compatibility -# https://stackoverflow.com/a/41656192/3867406 def mock_open(*args, **kargs): - f_open = unittest.mock.mock_open(*args, **kargs) + f_open = mock.mock_open(*args, **kargs) f_open.return_value.__iter__ = lambda self: iter(self.readline, "") return f_open @@ -20,7 +19,7 @@ def get_test_data_source() -> DataSource: return DataSource() -def get_custom_mock_open(global_conf_str, local_conf_str) -> callable: +def get_custom_mock_open(global_conf_str, local_conf_str) -> Callable[[], Callable]: def mocked_open(): def conditional_open_func(path, *args, **kwargs): p = Path(path).expanduser().resolve()