Skip to content

Add end to end test for st storage for yaml converter #85

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
54 changes: 27 additions & 27 deletions src/andromede/input_converter/src/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(
Initialize processor
"""
self.logger = logger
self.period = period if period else 168
self.period: int = period if period else 168

if isinstance(study_input, Study):
self.study = study_input
Expand All @@ -55,6 +55,7 @@ def __init__(
self.output_path = (
Path(output_path) if output_path else self.study_path / Path("output.yaml")
)
self.areas: Iterable[Area] = self.study.get_areas().values()

def _check_dataframe_validity(self, df: DataFrame) -> bool:
"""
Expand All @@ -69,13 +70,11 @@ def _check_dataframe_validity(self, df: DataFrame) -> bool:

return True

def _convert_area_to_component_list(
self, areas: Iterable[Area], lib_id: str
) -> list[InputComponent]:
def _convert_area_to_component_list(self, lib_id: str) -> list[InputComponent]:
components = []
self.logger.info("Converting areas to component list...")

for area in areas:
for area in self.areas:
components.append(
InputComponent(
id=area.id,
Expand All @@ -99,12 +98,12 @@ def _convert_area_to_component_list(
return components

def _convert_renewable_to_component_list(
self, areas: Iterable[Area], lib_id: str
self, lib_id: str
) -> tuple[list[InputComponent], list[InputPortConnections]]:
components = []
connections = []
self.logger.info("Converting renewables to component list...")
for area in areas:
for area in self.areas:
renewables = area.get_renewables()
for renewable in renewables.values():
series_path = (
Expand Down Expand Up @@ -154,14 +153,14 @@ def _convert_renewable_to_component_list(
return components, connections

def _convert_thermal_to_component_list(
self, areas: Iterable[Area], lib_id: str
self, lib_id: str
) -> tuple[list[InputComponent], list[InputPortConnections]]:
components = []
connections = []
self.logger.info("Converting thermals to component list...")
# Add thermal components for each area

for area in areas:
for area in self.areas:
thermals = area.get_thermals()
for thermal in thermals.values():
series_path = (
Expand All @@ -179,11 +178,15 @@ def _convert_thermal_to_component_list(
id=thermal.id,
model=f"{lib_id}.thermal",
parameters=[
tdp.process_p_min_cluster(),
tdp.process_nb_units_min(),
tdp.process_nb_units_max(),
tdp.process_nb_units_max_variation_forward(self.period),
tdp.process_nb_units_max_variation_backward(self.period),
tdp.generate_component_parameter("p_min_cluster"),
tdp.generate_component_parameter("nb_units_min"),
tdp.generate_component_parameter("nb_units_max"),
tdp.generate_component_parameter(
"nb_units_max_variation_forward", self.period
),
tdp.generate_component_parameter(
"nb_units_max_variation_backward", self.period
),
InputComponentParameter(
id="unit_count",
time_dependent=False,
Expand Down Expand Up @@ -259,13 +262,13 @@ def _convert_thermal_to_component_list(
return components, connections

def _convert_st_storage_to_component_list(
self, areas: Iterable[Area], lib_id: str
self, lib_id: str
) -> tuple[list[InputComponent], list[InputPortConnections]]:
components = []
connections = []
self.logger.info("Converting short-term storages to component list...")
# Add thermal components for each area
for area in areas:
for area in self.areas:
storages = area.get_st_storages()
for storage in storages.values():
series_path = (
Expand Down Expand Up @@ -431,12 +434,12 @@ def _convert_link_to_component_list(
return components, connections

def _convert_wind_to_component_list(
self, areas: Iterable[Area], lib_id: str
self, lib_id: str
) -> tuple[list[InputComponent], list[InputPortConnections]]:
components = []
connections = []
self.logger.info("Converting wind to component list...")
for area in areas:
for area in self.areas:
series_path = (
self.study_path / "input" / "wind" / "series" / f"wind_{area.id}.txt"
)
Expand Down Expand Up @@ -468,12 +471,12 @@ def _convert_wind_to_component_list(
return components, connections

def _convert_solar_to_component_list(
self, areas: Iterable[Area], lib_id: str
self, lib_id: str
) -> tuple[list[InputComponent], list[InputPortConnections]]:
components = []
connections = []
self.logger.info("Converting solar to component list...")
for area in areas:
for area in self.areas:
series_path = (
self.study_path / "input" / "solar" / "series" / f"solar_{area.id}.txt"
)
Expand Down Expand Up @@ -506,12 +509,12 @@ def _convert_solar_to_component_list(
return components, connections

def _convert_load_to_component_list(
self, areas: Iterable[Area], lib_id: str
self, lib_id: str
) -> tuple[list[InputComponent], list[InputPortConnections]]:
components = []
connections = []
self.logger.info("Converting load to component list...")
for area in areas:
for area in self.areas:
series_path = (
self.study_path / "input" / "load" / "series" / f"load_{area.id}.txt"
)
Expand Down Expand Up @@ -544,10 +547,7 @@ def _convert_load_to_component_list(

def convert_study_to_input_study(self) -> InputSystem:
antares_historic_lib_id = "antares-historic"
areas = self.study.get_areas().values()
area_components = self._convert_area_to_component_list(
areas, antares_historic_lib_id
)
area_components = self._convert_area_to_component_list(antares_historic_lib_id)

list_components: list[InputComponent] = []
list_connections: list[InputPortConnections] = []
Expand All @@ -567,7 +567,7 @@ def convert_study_to_input_study(self) -> InputSystem:
]

for method in conversion_methods:
components, connections = method(areas, antares_historic_lib_id)
components, connections = method(antares_historic_lib_id)
list_components.extend(components)
list_connections.extend(connections)

Expand Down
176 changes: 73 additions & 103 deletions src/andromede/input_converter/src/data_preprocessing/thermal.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from enum import Enum
from pathlib import Path
from typing import Callable

import numpy as np
import pandas as pd
Expand All @@ -8,11 +10,17 @@
from andromede.study.parsing import InputComponentParameter


class Direction(Enum):
FORWARD = "forward"
BACKWARD = "backward"


class ThermalDataPreprocessing:
DEFAULT_PERIOD: int = 168

def __init__(self, thermal: ThermalCluster, study_path: Path):
self.thermal = thermal
self.study_path = study_path

self.series_path = (
self.study_path
/ "input"
Expand All @@ -22,127 +30,89 @@ def __init__(self, thermal: ThermalCluster, study_path: Path):
/ self.thermal.id
)

def _write_dataframe_to_csv(self, dataframe: pd.DataFrame, filename: str) -> Path:
csv_path = self.series_path / filename
# This separator is chosen to comply with the antares_craft timeseries creation
dataframe.to_csv(csv_path, sep="\t", index=False, header=False)

return csv_path

def _compute_p_min_cluster(self) -> pd.DataFrame:
modulation_data = self.thermal.get_prepro_modulation_matrix().iloc[:, 3]
series_data = self.thermal.get_series_matrix()

unit_count = self.thermal.properties.unit_count
nominal_capacity = self.thermal.properties.nominal_capacity
modulation_data = modulation_data * nominal_capacity * unit_count

min_values = pd.concat([modulation_data, series_data], axis=1).min(axis=1)
return min_values.to_frame(
name="p_min_cluster"
) # Convert from series to dataframe

def process_p_min_cluster(self) -> InputComponentParameter:
p_min_cluster = self._compute_p_min_cluster()
csv_path = self._write_dataframe_to_csv(p_min_cluster, "p_min_cluster.txt")

return InputComponentParameter(
id="p_min_cluster",
time_dependent=True,
scenario_dependent=True,
value=str(csv_path).removesuffix(".txt"),
modulation_data: pd.Series = self.thermal.get_prepro_modulation_matrix().iloc[
:, 3
]
series_data: pd.DataFrame = self.thermal.get_series_matrix()
unit_count: int = self.thermal.properties.unit_count
nominal_capacity: float = self.thermal.properties.nominal_capacity
scaled_modulation: pd.Series = modulation_data * nominal_capacity * unit_count
# min(min_gen_modulation * unit_count * nominal_capacity, p_max_cluster)
min_values: pd.Series = pd.concat([scaled_modulation, series_data], axis=1).min(
axis=1
)
return min_values.to_frame(name="p_min_cluster")

def _compute_nb_units_min(self) -> pd.DataFrame:
p_min_cluster = load_ts_from_txt("p_min_cluster", self.series_path)
return pd.DataFrame(
np.ceil(p_min_cluster / self.thermal.properties.nominal_capacity)
p_min_cluster: pd.DataFrame = load_ts_from_txt(
"p_min_cluster", self.series_path
)

def process_nb_units_min(self) -> InputComponentParameter:
nb_units_min = self._compute_nb_units_min()
csv_path = self._write_dataframe_to_csv(nb_units_min, "nb_units_min.txt")

return InputComponentParameter(
id="nb_units_min",
time_dependent=True,
scenario_dependent=True,
value=str(csv_path).removesuffix(".txt"),
nominal_capacity: float = self.thermal.properties.nominal_capacity
return pd.DataFrame(
np.ceil(p_min_cluster / nominal_capacity),
)

def _compute_nb_units_max(self) -> pd.DataFrame:
series_data = self.thermal.get_series_matrix()

series_data: pd.DataFrame = self.thermal.get_series_matrix()
nominal_capacity: float = self.thermal.properties.nominal_capacity
return pd.DataFrame(
np.ceil(series_data / self.thermal.properties.nominal_capacity)
np.ceil(series_data / nominal_capacity),
)

def process_nb_units_max(self) -> InputComponentParameter:
nb_units_max = self._compute_nb_units_max()
csv_path = self._write_dataframe_to_csv(nb_units_max, "nb_units_max.txt")

return InputComponentParameter(
id="nb_units_max",
time_dependent=True,
scenario_dependent=True,
value=str(csv_path).removesuffix(".txt"),
)

def _compute_nb_units_max_variation_forward(
self, period: int = 168
def _compute_nb_units_max_variation(
self, direction: Direction, period: int = DEFAULT_PERIOD
) -> pd.DataFrame:
nb_units_max_output = load_ts_from_txt("nb_units_max", self.series_path)
nb_units_max = load_ts_from_txt("nb_units_max", self.series_path)
previous_indices = []
for i in range(len(nb_units_max_output)):
previous_indices.append((i - 1) % period + (i // period) * period)
nb_units_max_output = nb_units_max_output.iloc[previous_indices].reset_index(
drop=True
) - nb_units_max_output.reset_index(drop=True)

return nb_units_max_output.applymap(lambda x: max(0, x)) # type: ignore

def process_nb_units_max_variation_forward(
self, period: int = 168
) -> InputComponentParameter:
nb_units_max_variation = self._compute_nb_units_max_variation_forward(
period=period
)
csv_path = self._write_dataframe_to_csv(
nb_units_max_variation, "nb_units_max_variation_forward.txt"
)

return InputComponentParameter(
id="nb_units_max_variation_forward",
time_dependent=True,
scenario_dependent=True,
value=str(csv_path).removesuffix(".txt"),
indices = np.arange(len(nb_units_max))
previous_indices = (indices - 1) % period + (indices // period) * period

variation = pd.DataFrame()
if direction.value == "backward":
variation = nb_units_max.reset_index(drop=True) - nb_units_max.iloc[
previous_indices
].reset_index(drop=True)
elif direction.value == "forward":
variation = nb_units_max.iloc[previous_indices].reset_index(
drop=True
) - nb_units_max.reset_index(drop=True)

# Utilisation d'une opération vectorisée au lieu de applymap
variation = variation.clip(lower=0)
return variation.rename(
columns={variation.columns[0]: f"nb_units_max_variation_{direction.value}"}
)

def _compute_nb_units_max_variation_backward(
self, period: int = 168
) -> pd.DataFrame:
nb_units_max_output = load_ts_from_txt("nb_units_max", self.series_path)
previous_indices = []
for i in range(len(nb_units_max_output)):
previous_indices.append((i - 1) % period + (i // period) * period)
nb_units_max_output = nb_units_max_output.reset_index(
drop=True
) - nb_units_max_output.iloc[previous_indices].reset_index(drop=True)

return nb_units_max_output.applymap(lambda x: max(0, x)) # type: ignore
def _build_csv_path(self, component_id: str, suffix: str = ".txt") -> Path:
return self.series_path / Path(f"{component_id}").with_suffix(suffix)

def process_nb_units_max_variation_backward(
self, period: int = 168
def generate_component_parameter(
self, parameter_id: str, period: int = 0
) -> InputComponentParameter:
nb_units_max_variation = self._compute_nb_units_max_variation_backward(
period=period
)
csv_path = self._write_dataframe_to_csv(
nb_units_max_variation, "nb_units_max_variation_backward.txt"
)
prepro_parameter_function: dict[str, Callable[[], pd.DataFrame]] = {
"p_min_cluster": self._compute_p_min_cluster,
"nb_units_min": self._compute_nb_units_min,
"nb_units_max": self._compute_nb_units_max,
"nb_units_max_variation_forward": lambda: self._compute_nb_units_max_variation(
Direction.FORWARD, period
),
"nb_units_max_variation_backward": lambda: self._compute_nb_units_max_variation(
Direction.BACKWARD, period
),
}

if parameter_id not in prepro_parameter_function:
raise ValueError(f"Unsupported parameter_id: {parameter_id}")

df = prepro_parameter_function[parameter_id]()
csv_path = self._build_csv_path(parameter_id)

# This separator is chosen to comply with the antares_craft timeseries creation
df.to_csv(csv_path, sep="\t", index=False, header=False)

return InputComponentParameter(
id="nb_units_max_variation_backward",
id=parameter_id,
time_dependent=True,
scenario_dependent=True,
value=str(csv_path).removesuffix(".txt"),
Expand Down
Loading
Loading