From f5b3a9b39204d94bc2c198d4f1098b37ec2dc81b Mon Sep 17 00:00:00 2001 From: lkstrp Date: Thu, 9 Jan 2025 08:59:42 +0100 Subject: [PATCH 1/5] fix: extra functionality not called --- data/custom_extra_functionality.py | 739 ++++++++++++++++++- rules/solve_myopic.smk | 3 - scripts/pypsa-de/additional_functionality.py | 736 ------------------ 3 files changed, 732 insertions(+), 746 deletions(-) delete mode 100644 scripts/pypsa-de/additional_functionality.py diff --git a/data/custom_extra_functionality.py b/data/custom_extra_functionality.py index e7a9df0fc..3912fa05b 100644 --- a/data/custom_extra_functionality.py +++ b/data/custom_extra_functionality.py @@ -1,11 +1,736 @@ -# -*- coding: utf-8 -*- -# SPDX-FileCopyrightText: : 2023- The PyPSA-Eur Authors -# -# SPDX-License-Identifier: MIT +import sys +import logging +import pandas as pd +from xarray import DataArray -def custom_extra_functionality(n, snapshots, snakemake): +from scripts.prepare_sector_network import determine_emission_sectors + +logger = logging.getLogger(__name__) + + +def add_capacity_limits(n, investment_year, limits_capacity, sense="maximum"): + for c in n.iterate_components(limits_capacity): + logger.info(f"Adding {sense} constraints for {c.list_name}") + + attr = "e" if c.name == "Store" else "p" + units = "MWh or tCO2" if c.name == "Store" else "MW" + + for carrier in limits_capacity[c.name]: + for ct in limits_capacity[c.name][carrier]: + if investment_year not in limits_capacity[c.name][carrier][ct].keys(): + continue + + limit = 1e3 * limits_capacity[c.name][carrier][ct][investment_year] + + logger.info( + f"Adding constraint on {c.name} {carrier} capacity in {ct} to be {sense} {limit} {units}" + ) + + valid_components = ( + (c.df.index.str[:2] == ct) + & (c.df.carrier.str[: len(carrier)] == carrier) + & ~c.df.carrier.str.contains("thermal") + ) # exclude solar thermal + + existing_index = c.df.index[ + valid_components & ~c.df[attr + "_nom_extendable"] + ] + extendable_index = c.df.index[ + valid_components & c.df[attr + "_nom_extendable"] + ] + + existing_capacity = c.df.loc[existing_index, attr + "_nom"].sum() + + logger.info( + f"Existing {c.name} {carrier} capacity in {ct}: {existing_capacity} {units}" + ) + + nom = n.model[c.name + "-" + attr + "_nom"].loc[extendable_index] + + lhs = nom.sum() + + cname = f"capacity_{sense}-{ct}-{c.name}-{carrier.replace(' ','-')}" + + if cname in n.global_constraints.index: + logger.warning( + f"Global constraint {cname} already exists. Dropping and adding it again." + ) + n.global_constraints.drop(cname, inplace=True) + + rhs = limit - existing_capacity + + if sense == "maximum": + if rhs <= 0: + logger.warning( + f"Existing capacity in {ct} for carrier {carrier} already exceeds the limit of {limit} MW. Limiting capacity expansion for this investment period to 0." + ) + rhs = 0 + + n.model.add_constraints( + lhs <= rhs, + name=f"GlobalConstraint-{cname}", + ) + n.add( + "GlobalConstraint", + cname, + constant=rhs, + sense="<=", + type="", + carrier_attribute="", + ) + + elif sense == "minimum": + n.model.add_constraints( + lhs >= rhs, + name=f"GlobalConstraint-{cname}", + ) + n.add( + "GlobalConstraint", + cname, + constant=rhs, + sense=">=", + type="", + carrier_attribute="", + ) + else: + logger.error("sense {sense} not recognised") + sys.exit() + + +def add_power_limits(n, investment_year, limits_power_max): + """ + " Restricts the maximum inflow/outflow of electricity from/to a country. + """ + for ct in limits_power_max: + if investment_year not in limits_power_max[ct].keys(): + continue + + limit = 1e3 * limits_power_max[ct][investment_year] / 10 + + logger.info( + f"Adding constraint on electricity import/export from/to {ct} to be < {limit} MW" + ) + incoming_line = n.lines.index[ + (n.lines.carrier == "AC") + & (n.lines.bus0.str[:2] != ct) + & (n.lines.bus1.str[:2] == ct) + ] + outgoing_line = n.lines.index[ + (n.lines.carrier == "AC") + & (n.lines.bus0.str[:2] == ct) + & (n.lines.bus1.str[:2] != ct) + ] + + incoming_link = n.links.index[ + (n.links.carrier == "DC") + & (n.links.bus0.str[:2] != ct) + & (n.links.bus1.str[:2] == ct) + ] + outgoing_link = n.links.index[ + (n.links.carrier == "DC") + & (n.links.bus0.str[:2] == ct) + & (n.links.bus1.str[:2] != ct) + ] + + # iterate over snapshots - otherwise exporting of postnetwork fails since + # the constraints are time dependent + for t in n.snapshots: + incoming_line_p = n.model["Line-s"].loc[t, incoming_line] + outgoing_line_p = n.model["Line-s"].loc[t, outgoing_line] + incoming_link_p = n.model["Link-p"].loc[t, incoming_link] + outgoing_link_p = n.model["Link-p"].loc[t, outgoing_link] + + lhs = ( + incoming_link_p.sum() + - outgoing_link_p.sum() + + incoming_line_p.sum() + - outgoing_line_p.sum() + ) / 10 + # divide by 10 to avoid numerical issues + + cname_upper = f"Power-import-limit-{ct}-{t}" + cname_lower = f"Power-export-limit-{ct}-{t}" + + n.model.add_constraints(lhs <= limit, name=cname_upper) + n.model.add_constraints(lhs >= -limit, name=cname_lower) + + # not adding to network as the shadow prices are not needed + + +def h2_import_limits(n, investment_year, limits_volume_max): + for ct in limits_volume_max["h2_import"]: + limit = limits_volume_max["h2_import"][ct][investment_year] * 1e6 + + logger.info(f"limiting H2 imports in {ct} to {limit/1e6} TWh/a") + pipeline_carrier = [ + "H2 pipeline", + "H2 pipeline (Kernnetz)", + "H2 pipeline retrofitted", + ] + incoming = n.links.index[ + (n.links.carrier.isin(pipeline_carrier)) + & (n.links.bus0.str[:2] != ct) + & (n.links.bus1.str[:2] == ct) + ] + outgoing = n.links.index[ + (n.links.carrier.isin(pipeline_carrier)) + & (n.links.bus0.str[:2] == ct) + & (n.links.bus1.str[:2] != ct) + ] + + incoming_p = ( + n.model["Link-p"].loc[:, incoming] * n.snapshot_weightings.generators + ).sum() + outgoing_p = ( + n.model["Link-p"].loc[:, outgoing] * n.snapshot_weightings.generators + ).sum() + + lhs = incoming_p - outgoing_p + + cname = f"H2_import_limit-{ct}" + + n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") + + if cname in n.global_constraints.index: + logger.warning( + f"Global constraint {cname} already exists. Dropping and adding it again." + ) + n.global_constraints.drop(cname, inplace=True) + + n.add( + "GlobalConstraint", + cname, + constant=limit, + sense="<=", + type="", + carrier_attribute="", + ) + + +def h2_production_limits(n, investment_year, limits_volume_min, limits_volume_max): + for ct in limits_volume_max["electrolysis"]: + if ct not in limits_volume_min["electrolysis"]: + logger.warning( + f"no lower limit for H2 electrolysis in {ct} assuming 0 TWh/a" + ) + limit_lower = 0 + else: + limit_lower = limits_volume_min["electrolysis"][ct][investment_year] * 1e6 + + limit_upper = limits_volume_max["electrolysis"][ct][investment_year] * 1e6 + + logger.info( + f"limiting H2 electrolysis in DE between {limit_lower/1e6} and {limit_upper/1e6} TWh/a" + ) + + production = n.links[ + (n.links.carrier == "H2 Electrolysis") & (n.links.bus0.str.contains(ct)) + ].index + efficiency = n.links.loc[production, "efficiency"] + + lhs = ( + n.model["Link-p"].loc[:, production] + * n.snapshot_weightings.generators + * efficiency + ).sum() + + cname_upper = f"H2_production_limit_upper-{ct}" + cname_lower = f"H2_production_limit_lower-{ct}" + + n.model.add_constraints( + lhs <= limit_upper, name=f"GlobalConstraint-{cname_upper}" + ) + + n.model.add_constraints( + lhs >= limit_lower, name=f"GlobalConstraint-{cname_lower}" + ) + + if cname_upper not in n.global_constraints.index: + n.add( + "GlobalConstraint", + cname_upper, + constant=limit_upper, + sense="<=", + type="", + carrier_attribute="", + ) + if cname_lower not in n.global_constraints.index: + n.add( + "GlobalConstraint", + cname_lower, + constant=limit_lower, + sense=">=", + type="", + carrier_attribute="", + ) + + +def electricity_import_limits(n, investment_year, limits_volume_max): + for ct in limits_volume_max["electricity_import"]: + limit = limits_volume_max["electricity_import"][ct][investment_year] * 1e6 + + logger.info(f"limiting electricity imports in {ct} to {limit/1e6} TWh/a") + + incoming_line = n.lines.index[ + (n.lines.carrier == "AC") + & (n.lines.bus0.str[:2] != ct) + & (n.lines.bus1.str[:2] == ct) + ] + outgoing_line = n.lines.index[ + (n.lines.carrier == "AC") + & (n.lines.bus0.str[:2] == ct) + & (n.lines.bus1.str[:2] != ct) + ] + + incoming_link = n.links.index[ + (n.links.carrier == "DC") + & (n.links.bus0.str[:2] != ct) + & (n.links.bus1.str[:2] == ct) + ] + outgoing_link = n.links.index[ + (n.links.carrier == "DC") + & (n.links.bus0.str[:2] == ct) + & (n.links.bus1.str[:2] != ct) + ] + + incoming_line_p = ( + n.model["Line-s"].loc[:, incoming_line] * n.snapshot_weightings.generators + ).sum() + outgoing_line_p = ( + n.model["Line-s"].loc[:, outgoing_line] * n.snapshot_weightings.generators + ).sum() + + incoming_link_p = ( + n.model["Link-p"].loc[:, incoming_link] * n.snapshot_weightings.generators + ).sum() + outgoing_link_p = ( + n.model["Link-p"].loc[:, outgoing_link] * n.snapshot_weightings.generators + ).sum() + + lhs = (incoming_link_p - outgoing_link_p) + (incoming_line_p - outgoing_line_p) + + cname = f"Electricity_import_limit-{ct}" + + n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") + + if cname in n.global_constraints.index: + logger.warning( + f"Global constraint {cname} already exists. Dropping and adding it again." + ) + n.global_constraints.drop(cname, inplace=True) + + n.add( + "GlobalConstraint", + cname, + constant=limit, + sense="<=", + type="", + carrier_attribute="", + ) + + +def add_co2limit_country(n, limit_countries, snakemake, debug=False): """ - Add custom extra functionality constraints. + Add a set of emissions limit constraints for specified countries. + + The countries and emissions limits are specified in the config file entry 'co2_budget_national'. + + Parameters + ---------- + n : pypsa.Network + limit_countries : dict + snakemake: snakemake object """ - pass + logger.info(f"Adding CO2 budget limit for each country as per unit of 1990 levels") + + nhours = n.snapshot_weightings.generators.sum() + nyears = nhours / 8760 + + sectors = determine_emission_sectors(n.config["sector"]) + + # convert MtCO2 to tCO2 + co2_totals = 1e6 * pd.read_csv(snakemake.input.co2_totals_name, index_col=0) + + co2_total_totals = co2_totals[sectors].sum(axis=1) * nyears + + for ct in limit_countries: + limit = co2_total_totals[ct] * limit_countries[ct] + logger.info( + f"Limiting emissions in country {ct} to {limit_countries[ct]:.1%} of " + f"1990 levels, i.e. {limit:,.2f} tCO2/a", + ) + + lhs = [] + + for port in [col[3:] for col in n.links if col.startswith("bus")]: + links = n.links.index[ + (n.links.index.str[:2] == ct) + & (n.links[f"bus{port}"] == "co2 atmosphere") + & ( + n.links.carrier != "kerosene for aviation" + ) # first exclude aviation to multiply it with a domestic factor later + ] + + logger.info( + f"For {ct} adding following link carriers to port {port} CO2 constraint: {n.links.loc[links,'carrier'].unique()}" + ) + + if port == "0": + efficiency = -1.0 + elif port == "1": + efficiency = n.links.loc[links, f"efficiency"] + else: + efficiency = n.links.loc[links, f"efficiency{port}"] + + lhs.append( + ( + n.model["Link-p"].loc[:, links] + * efficiency + * n.snapshot_weightings.generators + ).sum() + ) + + # Aviation demand + energy_totals = pd.read_csv(snakemake.input.energy_totals, index_col=[0, 1]) + domestic_aviation = energy_totals.loc[ + ("DE", snakemake.params.energy_year), "total domestic aviation" + ] + international_aviation = energy_totals.loc[ + ("DE", snakemake.params.energy_year), "total international aviation" + ] + domestic_factor = domestic_aviation / ( + domestic_aviation + international_aviation + ) + aviation_links = n.links[ + (n.links.index.str[:2] == ct) & (n.links.carrier == "kerosene for aviation") + ] + lhs.append + ( + n.model["Link-p"].loc[:, aviation_links.index] + * aviation_links.efficiency2 + * n.snapshot_weightings.generators + ).sum() * domestic_factor + logger.info( + f"Adding domestic aviation emissions for {ct} with a factor of {domestic_factor}" + ) + + # Adding Efuel imports and exports to constraint + incoming_oil = n.links.index[n.links.index == "EU renewable oil -> DE oil"] + outgoing_oil = n.links.index[n.links.index == "DE renewable oil -> EU oil"] + + if not debug: + lhs.append( + ( + -1 + * n.model["Link-p"].loc[:, incoming_oil] + * 0.2571 + * n.snapshot_weightings.generators + ).sum() + ) + lhs.append( + ( + n.model["Link-p"].loc[:, outgoing_oil] + * 0.2571 + * n.snapshot_weightings.generators + ).sum() + ) + + incoming_methanol = n.links.index[n.links.index == "EU methanol -> DE methanol"] + outgoing_methanol = n.links.index[n.links.index == "DE methanol -> EU methanol"] + + lhs.append( + ( + -1 + * n.model["Link-p"].loc[:, incoming_methanol] + / snakemake.config["sector"]["MWh_MeOH_per_tCO2"] + * n.snapshot_weightings.generators + ).sum() + ) + + lhs.append( + ( + n.model["Link-p"].loc[:, outgoing_methanol] + / snakemake.config["sector"]["MWh_MeOH_per_tCO2"] + * n.snapshot_weightings.generators + ).sum() + ) + + # Methane + incoming_CH4 = n.links.index[n.links.index == "EU renewable gas -> DE gas"] + outgoing_CH4 = n.links.index[n.links.index == "DE renewable gas -> EU gas"] + + lhs.append( + ( + -1 + * n.model["Link-p"].loc[:, incoming_CH4] + * 0.198 + * n.snapshot_weightings.generators + ).sum() + ) + + lhs.append( + ( + n.model["Link-p"].loc[:, outgoing_CH4] + * 0.198 + * n.snapshot_weightings.generators + ).sum() + ) + + lhs = sum(lhs) + + cname = f"co2_limit-{ct}" + + n.model.add_constraints( + lhs <= limit, + name=f"GlobalConstraint-{cname}", + ) + + if cname in n.global_constraints.index: + logger.warning( + f"Global constraint {cname} already exists. Dropping and adding it again." + ) + n.global_constraints.drop(cname, inplace=True) + + n.add( + "GlobalConstraint", + cname, + constant=limit, + sense="<=", + type="", + carrier_attribute="", + ) + + +def force_boiler_profiles_existing_per_load(n): + """ + This scales the boiler dispatch to the load profile with a factor common to + all boilers at load. + """ + + logger.info("Forcing boiler profiles for existing ones") + + decentral_boilers = n.links.index[ + n.links.carrier.str.contains("boiler") + & ~n.links.carrier.str.contains("urban central") + & ~n.links.p_nom_extendable + ] + + if decentral_boilers.empty: + return + + boiler_loads = n.links.loc[decentral_boilers, "bus1"] + boiler_loads = boiler_loads[boiler_loads.isin(n.loads_t.p_set.columns)] + decentral_boilers = boiler_loads.index + boiler_profiles_pu = n.loads_t.p_set[boiler_loads].div( + n.loads_t.p_set[boiler_loads].max(), axis=1 + ) + boiler_profiles_pu.columns = decentral_boilers + boiler_profiles = DataArray( + boiler_profiles_pu.multiply(n.links.loc[decentral_boilers, "p_nom"], axis=1) + ) + + boiler_load_index = pd.Index(boiler_loads.unique()) + boiler_load_index.name = "Load" + + # per load scaling factor + n.model.add_variables(coords=[boiler_load_index], name="Load-profile_factor") + + # clumsy indicator matrix to map boilers to loads + df = pd.DataFrame(index=boiler_load_index, columns=decentral_boilers, data=0.0) + for k, v in boiler_loads.items(): + df.loc[v, k] = 1.0 + + lhs = n.model["Link-p"].loc[:, decentral_boilers] - ( + boiler_profiles * DataArray(df) * n.model["Load-profile_factor"] + ).sum("Load") + + n.model.add_constraints(lhs, "=", 0, "Link-fixed_profile") + + # hack so that PyPSA doesn't complain there is nowhere to store the variable + n.loads["profile_factor_opt"] = 0.0 + + +def force_boiler_profiles_existing_per_boiler(n): + """ + This scales each boiler dispatch to be proportional to the load profile. + """ + + logger.info( + "Forcing each existing boiler dispatch to be proportional to the load profile" + ) + + decentral_boilers = n.links.index[ + n.links.carrier.str.contains("boiler") + & ~n.links.carrier.str.contains("urban central") + & ~n.links.p_nom_extendable + ] + + if decentral_boilers.empty: + return + + boiler_loads = n.links.loc[decentral_boilers, "bus1"] + boiler_loads = boiler_loads[boiler_loads.isin(n.loads_t.p_set.columns)] + decentral_boilers = boiler_loads.index + boiler_profiles_pu = n.loads_t.p_set[boiler_loads].div( + n.loads_t.p_set[boiler_loads].max(), axis=1 + ) + boiler_profiles_pu.columns = decentral_boilers + boiler_profiles = DataArray( + boiler_profiles_pu.multiply(n.links.loc[decentral_boilers, "p_nom"], axis=1) + ) + + # will be per unit + n.model.add_variables(coords=[decentral_boilers], name="Link-fixed_profile_scaling") + + lhs = ( + (1, n.model["Link-p"].loc[:, decentral_boilers]), + ( + -boiler_profiles, + n.model["Link-fixed_profile_scaling"], + ), + ) + + n.model.add_constraints(lhs, "=", 0, "Link-fixed_profile_scaling") + + # hack so that PyPSA doesn't complain there is nowhere to store the variable + n.links["fixed_profile_scaling_opt"] = 0.0 + + +def add_h2_derivate_limit(n, investment_year, limits_volume_max): + for ct in limits_volume_max["h2_derivate_import"]: + limit = limits_volume_max["h2_derivate_import"][ct][investment_year] * 1e6 + + logger.info(f"limiting H2 derivate imports in {ct} to {limit/1e6} TWh/a") + + incoming = n.links.loc[ + [ + "EU renewable oil -> DE oil", + "EU methanol -> DE methanol", + "EU renewable gas -> DE gas", + ] + ].index + outgoing = n.links.loc[ + [ + "DE renewable oil -> EU oil", + "DE methanol -> EU methanol", + "DE renewable gas -> EU gas", + ] + ].index + + incoming_p = ( + n.model["Link-p"].loc[:, incoming] * n.snapshot_weightings.generators + ).sum() + outgoing_p = ( + n.model["Link-p"].loc[:, outgoing] * n.snapshot_weightings.generators + ).sum() + + lhs = incoming_p - outgoing_p + + cname = f"H2_derivate_import_limit-{ct}" + + n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") + + if cname in n.global_constraints.index: + logger.warning( + f"Global constraint {cname} already exists. Dropping and adding it again." + ) + n.global_constraints.drop(cname, inplace=True) + + n.add( + "GlobalConstraint", + cname, + constant=limit, + sense="<=", + type="", + carrier_attribute="", + ) + + +def adapt_nuclear_output(n): + logger.info( + f"limiting german electricity generation from nuclear to 2020 value of 61 TWh" + ) + limit = 61e6 + + nuclear_de_index = n.links.index[ + (n.links.carrier == "nuclear") & (n.links.index.str[:2] == "DE") + ] + + nuclear_gen = ( + n.model["Link-p"].loc[:, nuclear_de_index] + * n.links.loc[nuclear_de_index, "efficiency"] + * n.snapshot_weightings.generators + ).sum() + + lhs = nuclear_gen + + cname = f"Nuclear_generation_limit-DE" + + n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") + + if cname in n.global_constraints.index: + logger.warning( + f"Global constraint {cname} already exists. Dropping and adding it again." + ) + n.global_constraints.drop(cname, inplace=True) + + n.add( + "GlobalConstraint", + cname, + constant=limit, + sense="<=", + type="", + carrier_attribute="", + ) + + +def custom_extra_functionality(n, snapshots, snakemake): + + logger.info("Adding Ariadne-specific functionality") + + investment_year = int(snakemake.wildcards.planning_horizons[-4:]) + constraints = snakemake.params.solving["constraints"] + + add_capacity_limits( + n, investment_year, constraints["limits_capacity_min"], "minimum" + ) + + add_capacity_limits( + n, investment_year, constraints["limits_capacity_max"], "maximum" + ) + + add_power_limits(n, investment_year, constraints["limits_power_max"]) + + if int(snakemake.wildcards.clusters) != 1: + h2_import_limits(n, investment_year, constraints["limits_volume_max"]) + + electricity_import_limits(n, investment_year, constraints["limits_volume_max"]) + + if investment_year >= 2025: + h2_production_limits( + n, + investment_year, + constraints["limits_volume_min"], + constraints["limits_volume_max"], + ) + + if not snakemake.config["run"]["debug_h2deriv_limit"]: + add_h2_derivate_limit(n, investment_year, constraints["limits_volume_max"]) + + # force_boiler_profiles_existing_per_load(n) + force_boiler_profiles_existing_per_boiler(n) + + if isinstance(constraints["co2_budget_national"], dict): + limit_countries = constraints["co2_budget_national"][investment_year] + add_co2limit_country( + n, + limit_countries, + snakemake, + debug=snakemake.config["run"]["debug_co2_limit"], + ) + else: + logger.warning("No national CO2 budget specified!") + + if investment_year == 2020: + adapt_nuclear_output(n) diff --git a/rules/solve_myopic.smk b/rules/solve_myopic.smk index 52d3b3524..8a07d0954 100644 --- a/rules/solve_myopic.smk +++ b/rules/solve_myopic.smk @@ -114,9 +114,6 @@ rule solve_sector_network_myopic: "sector", "co2_sequestration_potential", default=200 ), custom_extra_functionality=input_custom_extra_functionality, - # custom_extra_functionality=os.path.join( - # os.path.dirname(workflow.snakefile), "scripts/pypsa-de/additional_functionality.py" - # ), energy_year=config_provider("energy", "energy_totals_year"), input: network=RESULTS diff --git a/scripts/pypsa-de/additional_functionality.py b/scripts/pypsa-de/additional_functionality.py deleted file mode 100644 index 38d8e017d..000000000 --- a/scripts/pypsa-de/additional_functionality.py +++ /dev/null @@ -1,736 +0,0 @@ -# -*- coding: utf-8 -*- - -import logging - -import pandas as pd -from xarray import DataArray - -from scripts.prepare_sector_network import determine_emission_sectors - -logger = logging.getLogger(__name__) - - -def add_capacity_limits(n, investment_year, limits_capacity, sense="maximum"): - for c in n.iterate_components(limits_capacity): - logger.info(f"Adding {sense} constraints for {c.list_name}") - - attr = "e" if c.name == "Store" else "p" - units = "MWh or tCO2" if c.name == "Store" else "MW" - - for carrier in limits_capacity[c.name]: - for ct in limits_capacity[c.name][carrier]: - if investment_year not in limits_capacity[c.name][carrier][ct].keys(): - continue - - limit = 1e3 * limits_capacity[c.name][carrier][ct][investment_year] - - logger.info( - f"Adding constraint on {c.name} {carrier} capacity in {ct} to be {sense} {limit} {units}" - ) - - valid_components = ( - (c.df.index.str[:2] == ct) - & (c.df.carrier.str[: len(carrier)] == carrier) - & ~c.df.carrier.str.contains("thermal") - ) # exclude solar thermal - - existing_index = c.df.index[ - valid_components & ~c.df[attr + "_nom_extendable"] - ] - extendable_index = c.df.index[ - valid_components & c.df[attr + "_nom_extendable"] - ] - - existing_capacity = c.df.loc[existing_index, attr + "_nom"].sum() - - logger.info( - f"Existing {c.name} {carrier} capacity in {ct}: {existing_capacity} {units}" - ) - - nom = n.model[c.name + "-" + attr + "_nom"].loc[extendable_index] - - lhs = nom.sum() - - cname = f"capacity_{sense}-{ct}-{c.name}-{carrier.replace(' ','-')}" - - if cname in n.global_constraints.index: - logger.warning( - f"Global constraint {cname} already exists. Dropping and adding it again." - ) - n.global_constraints.drop(cname, inplace=True) - - rhs = limit - existing_capacity - - if sense == "maximum": - if rhs <= 0: - logger.warning( - f"Existing capacity in {ct} for carrier {carrier} already exceeds the limit of {limit} MW. Limiting capacity expansion for this investment period to 0." - ) - rhs = 0 - - n.model.add_constraints( - lhs <= rhs, - name=f"GlobalConstraint-{cname}", - ) - n.add( - "GlobalConstraint", - cname, - constant=rhs, - sense="<=", - type="", - carrier_attribute="", - ) - - elif sense == "minimum": - n.model.add_constraints( - lhs >= rhs, - name=f"GlobalConstraint-{cname}", - ) - n.add( - "GlobalConstraint", - cname, - constant=rhs, - sense=">=", - type="", - carrier_attribute="", - ) - else: - logger.error("sense {sense} not recognised") - sys.exit() - - -def add_power_limits(n, investment_year, limits_power_max): - """ - " Restricts the maximum inflow/outflow of electricity from/to a country. - """ - for ct in limits_power_max: - if investment_year not in limits_power_max[ct].keys(): - continue - - limit = 1e3 * limits_power_max[ct][investment_year] / 10 - - logger.info( - f"Adding constraint on electricity import/export from/to {ct} to be < {limit} MW" - ) - incoming_line = n.lines.index[ - (n.lines.carrier == "AC") - & (n.lines.bus0.str[:2] != ct) - & (n.lines.bus1.str[:2] == ct) - ] - outgoing_line = n.lines.index[ - (n.lines.carrier == "AC") - & (n.lines.bus0.str[:2] == ct) - & (n.lines.bus1.str[:2] != ct) - ] - - incoming_link = n.links.index[ - (n.links.carrier == "DC") - & (n.links.bus0.str[:2] != ct) - & (n.links.bus1.str[:2] == ct) - ] - outgoing_link = n.links.index[ - (n.links.carrier == "DC") - & (n.links.bus0.str[:2] == ct) - & (n.links.bus1.str[:2] != ct) - ] - - # iterate over snapshots - otherwise exporting of postnetwork fails since - # the constraints are time dependent - for t in n.snapshots: - incoming_line_p = n.model["Line-s"].loc[t, incoming_line] - outgoing_line_p = n.model["Line-s"].loc[t, outgoing_line] - incoming_link_p = n.model["Link-p"].loc[t, incoming_link] - outgoing_link_p = n.model["Link-p"].loc[t, outgoing_link] - - lhs = ( - incoming_link_p.sum() - - outgoing_link_p.sum() - + incoming_line_p.sum() - - outgoing_line_p.sum() - ) / 10 - # divide by 10 to avoid numerical issues - - cname_upper = f"Power-import-limit-{ct}-{t}" - cname_lower = f"Power-export-limit-{ct}-{t}" - - n.model.add_constraints(lhs <= limit, name=cname_upper) - n.model.add_constraints(lhs >= -limit, name=cname_lower) - - # not adding to network as the shadow prices are not needed - - -def h2_import_limits(n, investment_year, limits_volume_max): - for ct in limits_volume_max["h2_import"]: - limit = limits_volume_max["h2_import"][ct][investment_year] * 1e6 - - logger.info(f"limiting H2 imports in {ct} to {limit/1e6} TWh/a") - pipeline_carrier = [ - "H2 pipeline", - "H2 pipeline (Kernnetz)", - "H2 pipeline retrofitted", - ] - incoming = n.links.index[ - (n.links.carrier.isin(pipeline_carrier)) - & (n.links.bus0.str[:2] != ct) - & (n.links.bus1.str[:2] == ct) - ] - outgoing = n.links.index[ - (n.links.carrier.isin(pipeline_carrier)) - & (n.links.bus0.str[:2] == ct) - & (n.links.bus1.str[:2] != ct) - ] - - incoming_p = ( - n.model["Link-p"].loc[:, incoming] * n.snapshot_weightings.generators - ).sum() - outgoing_p = ( - n.model["Link-p"].loc[:, outgoing] * n.snapshot_weightings.generators - ).sum() - - lhs = incoming_p - outgoing_p - - cname = f"H2_import_limit-{ct}" - - n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") - - if cname in n.global_constraints.index: - logger.warning( - f"Global constraint {cname} already exists. Dropping and adding it again." - ) - n.global_constraints.drop(cname, inplace=True) - - n.add( - "GlobalConstraint", - cname, - constant=limit, - sense="<=", - type="", - carrier_attribute="", - ) - - -def h2_production_limits(n, investment_year, limits_volume_min, limits_volume_max): - for ct in limits_volume_max["electrolysis"]: - if ct not in limits_volume_min["electrolysis"]: - logger.warning( - f"no lower limit for H2 electrolysis in {ct} assuming 0 TWh/a" - ) - limit_lower = 0 - else: - limit_lower = limits_volume_min["electrolysis"][ct][investment_year] * 1e6 - - limit_upper = limits_volume_max["electrolysis"][ct][investment_year] * 1e6 - - logger.info( - f"limiting H2 electrolysis in DE between {limit_lower/1e6} and {limit_upper/1e6} TWh/a" - ) - - production = n.links[ - (n.links.carrier == "H2 Electrolysis") & (n.links.bus0.str.contains(ct)) - ].index - efficiency = n.links.loc[production, "efficiency"] - - lhs = ( - n.model["Link-p"].loc[:, production] - * n.snapshot_weightings.generators - * efficiency - ).sum() - - cname_upper = f"H2_production_limit_upper-{ct}" - cname_lower = f"H2_production_limit_lower-{ct}" - - n.model.add_constraints( - lhs <= limit_upper, name=f"GlobalConstraint-{cname_upper}" - ) - - n.model.add_constraints( - lhs >= limit_lower, name=f"GlobalConstraint-{cname_lower}" - ) - - if cname_upper not in n.global_constraints.index: - n.add( - "GlobalConstraint", - cname_upper, - constant=limit_upper, - sense="<=", - type="", - carrier_attribute="", - ) - if cname_lower not in n.global_constraints.index: - n.add( - "GlobalConstraint", - cname_lower, - constant=limit_lower, - sense=">=", - type="", - carrier_attribute="", - ) - - -def electricity_import_limits(n, investment_year, limits_volume_max): - for ct in limits_volume_max["electricity_import"]: - limit = limits_volume_max["electricity_import"][ct][investment_year] * 1e6 - - logger.info(f"limiting electricity imports in {ct} to {limit/1e6} TWh/a") - - incoming_line = n.lines.index[ - (n.lines.carrier == "AC") - & (n.lines.bus0.str[:2] != ct) - & (n.lines.bus1.str[:2] == ct) - ] - outgoing_line = n.lines.index[ - (n.lines.carrier == "AC") - & (n.lines.bus0.str[:2] == ct) - & (n.lines.bus1.str[:2] != ct) - ] - - incoming_link = n.links.index[ - (n.links.carrier == "DC") - & (n.links.bus0.str[:2] != ct) - & (n.links.bus1.str[:2] == ct) - ] - outgoing_link = n.links.index[ - (n.links.carrier == "DC") - & (n.links.bus0.str[:2] == ct) - & (n.links.bus1.str[:2] != ct) - ] - - incoming_line_p = ( - n.model["Line-s"].loc[:, incoming_line] * n.snapshot_weightings.generators - ).sum() - outgoing_line_p = ( - n.model["Line-s"].loc[:, outgoing_line] * n.snapshot_weightings.generators - ).sum() - - incoming_link_p = ( - n.model["Link-p"].loc[:, incoming_link] * n.snapshot_weightings.generators - ).sum() - outgoing_link_p = ( - n.model["Link-p"].loc[:, outgoing_link] * n.snapshot_weightings.generators - ).sum() - - lhs = (incoming_link_p - outgoing_link_p) + (incoming_line_p - outgoing_line_p) - - cname = f"Electricity_import_limit-{ct}" - - n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") - - if cname in n.global_constraints.index: - logger.warning( - f"Global constraint {cname} already exists. Dropping and adding it again." - ) - n.global_constraints.drop(cname, inplace=True) - - n.add( - "GlobalConstraint", - cname, - constant=limit, - sense="<=", - type="", - carrier_attribute="", - ) - - -def add_co2limit_country(n, limit_countries, snakemake, debug=False): - """ - Add a set of emissions limit constraints for specified countries. - - The countries and emissions limits are specified in the config file entry 'co2_budget_national'. - - Parameters - ---------- - n : pypsa.Network - limit_countries : dict - snakemake: snakemake object - """ - logger.info(f"Adding CO2 budget limit for each country as per unit of 1990 levels") - - nhours = n.snapshot_weightings.generators.sum() - nyears = nhours / 8760 - - sectors = determine_emission_sectors(n.config["sector"]) - - # convert MtCO2 to tCO2 - co2_totals = 1e6 * pd.read_csv(snakemake.input.co2_totals_name, index_col=0) - - co2_total_totals = co2_totals[sectors].sum(axis=1) * nyears - - for ct in limit_countries: - limit = co2_total_totals[ct] * limit_countries[ct] - logger.info( - f"Limiting emissions in country {ct} to {limit_countries[ct]:.1%} of " - f"1990 levels, i.e. {limit:,.2f} tCO2/a", - ) - - lhs = [] - - for port in [col[3:] for col in n.links if col.startswith("bus")]: - links = n.links.index[ - (n.links.index.str[:2] == ct) - & (n.links[f"bus{port}"] == "co2 atmosphere") - & ( - n.links.carrier != "kerosene for aviation" - ) # first exclude aviation to multiply it with a domestic factor later - ] - - logger.info( - f"For {ct} adding following link carriers to port {port} CO2 constraint: {n.links.loc[links,'carrier'].unique()}" - ) - - if port == "0": - efficiency = -1.0 - elif port == "1": - efficiency = n.links.loc[links, f"efficiency"] - else: - efficiency = n.links.loc[links, f"efficiency{port}"] - - lhs.append( - ( - n.model["Link-p"].loc[:, links] - * efficiency - * n.snapshot_weightings.generators - ).sum() - ) - - # Aviation demand - energy_totals = pd.read_csv(snakemake.input.energy_totals, index_col=[0, 1]) - domestic_aviation = energy_totals.loc[ - ("DE", snakemake.params.energy_year), "total domestic aviation" - ] - international_aviation = energy_totals.loc[ - ("DE", snakemake.params.energy_year), "total international aviation" - ] - domestic_factor = domestic_aviation / ( - domestic_aviation + international_aviation - ) - aviation_links = n.links[ - (n.links.index.str[:2] == ct) & (n.links.carrier == "kerosene for aviation") - ] - lhs.append - ( - n.model["Link-p"].loc[:, aviation_links.index] - * aviation_links.efficiency2 - * n.snapshot_weightings.generators - ).sum() * domestic_factor - logger.info( - f"Adding domestic aviation emissions for {ct} with a factor of {domestic_factor}" - ) - - # Adding Efuel imports and exports to constraint - incoming_oil = n.links.index[n.links.index == "EU renewable oil -> DE oil"] - outgoing_oil = n.links.index[n.links.index == "DE renewable oil -> EU oil"] - - if not debug: - lhs.append( - ( - -1 - * n.model["Link-p"].loc[:, incoming_oil] - * 0.2571 - * n.snapshot_weightings.generators - ).sum() - ) - lhs.append( - ( - n.model["Link-p"].loc[:, outgoing_oil] - * 0.2571 - * n.snapshot_weightings.generators - ).sum() - ) - - incoming_methanol = n.links.index[n.links.index == "EU methanol -> DE methanol"] - outgoing_methanol = n.links.index[n.links.index == "DE methanol -> EU methanol"] - - lhs.append( - ( - -1 - * n.model["Link-p"].loc[:, incoming_methanol] - / snakemake.config["sector"]["MWh_MeOH_per_tCO2"] - * n.snapshot_weightings.generators - ).sum() - ) - - lhs.append( - ( - n.model["Link-p"].loc[:, outgoing_methanol] - / snakemake.config["sector"]["MWh_MeOH_per_tCO2"] - * n.snapshot_weightings.generators - ).sum() - ) - - # Methane - incoming_CH4 = n.links.index[n.links.index == "EU renewable gas -> DE gas"] - outgoing_CH4 = n.links.index[n.links.index == "DE renewable gas -> EU gas"] - - lhs.append( - ( - -1 - * n.model["Link-p"].loc[:, incoming_CH4] - * 0.198 - * n.snapshot_weightings.generators - ).sum() - ) - - lhs.append( - ( - n.model["Link-p"].loc[:, outgoing_CH4] - * 0.198 - * n.snapshot_weightings.generators - ).sum() - ) - - lhs = sum(lhs) - - cname = f"co2_limit-{ct}" - - n.model.add_constraints( - lhs <= limit, - name=f"GlobalConstraint-{cname}", - ) - - if cname in n.global_constraints.index: - logger.warning( - f"Global constraint {cname} already exists. Dropping and adding it again." - ) - n.global_constraints.drop(cname, inplace=True) - - n.add( - "GlobalConstraint", - cname, - constant=limit, - sense="<=", - type="", - carrier_attribute="", - ) - - -def force_boiler_profiles_existing_per_load(n): - """ - This scales the boiler dispatch to the load profile with a factor common to - all boilers at load. - """ - - logger.info("Forcing boiler profiles for existing ones") - - decentral_boilers = n.links.index[ - n.links.carrier.str.contains("boiler") - & ~n.links.carrier.str.contains("urban central") - & ~n.links.p_nom_extendable - ] - - if decentral_boilers.empty: - return - - boiler_loads = n.links.loc[decentral_boilers, "bus1"] - boiler_loads = boiler_loads[boiler_loads.isin(n.loads_t.p_set.columns)] - decentral_boilers = boiler_loads.index - boiler_profiles_pu = n.loads_t.p_set[boiler_loads].div( - n.loads_t.p_set[boiler_loads].max(), axis=1 - ) - boiler_profiles_pu.columns = decentral_boilers - boiler_profiles = DataArray( - boiler_profiles_pu.multiply(n.links.loc[decentral_boilers, "p_nom"], axis=1) - ) - - boiler_load_index = pd.Index(boiler_loads.unique()) - boiler_load_index.name = "Load" - - # per load scaling factor - n.model.add_variables(coords=[boiler_load_index], name="Load-profile_factor") - - # clumsy indicator matrix to map boilers to loads - df = pd.DataFrame(index=boiler_load_index, columns=decentral_boilers, data=0.0) - for k, v in boiler_loads.items(): - df.loc[v, k] = 1.0 - - lhs = n.model["Link-p"].loc[:, decentral_boilers] - ( - boiler_profiles * DataArray(df) * n.model["Load-profile_factor"] - ).sum("Load") - - n.model.add_constraints(lhs, "=", 0, "Link-fixed_profile") - - # hack so that PyPSA doesn't complain there is nowhere to store the variable - n.loads["profile_factor_opt"] = 0.0 - - -def force_boiler_profiles_existing_per_boiler(n): - """ - This scales each boiler dispatch to be proportional to the load profile. - """ - - logger.info( - "Forcing each existing boiler dispatch to be proportional to the load profile" - ) - - decentral_boilers = n.links.index[ - n.links.carrier.str.contains("boiler") - & ~n.links.carrier.str.contains("urban central") - & ~n.links.p_nom_extendable - ] - - if decentral_boilers.empty: - return - - boiler_loads = n.links.loc[decentral_boilers, "bus1"] - boiler_loads = boiler_loads[boiler_loads.isin(n.loads_t.p_set.columns)] - decentral_boilers = boiler_loads.index - boiler_profiles_pu = n.loads_t.p_set[boiler_loads].div( - n.loads_t.p_set[boiler_loads].max(), axis=1 - ) - boiler_profiles_pu.columns = decentral_boilers - boiler_profiles = DataArray( - boiler_profiles_pu.multiply(n.links.loc[decentral_boilers, "p_nom"], axis=1) - ) - - # will be per unit - n.model.add_variables(coords=[decentral_boilers], name="Link-fixed_profile_scaling") - - lhs = ( - (1, n.model["Link-p"].loc[:, decentral_boilers]), - ( - -boiler_profiles, - n.model["Link-fixed_profile_scaling"], - ), - ) - - n.model.add_constraints(lhs, "=", 0, "Link-fixed_profile_scaling") - - # hack so that PyPSA doesn't complain there is nowhere to store the variable - n.links["fixed_profile_scaling_opt"] = 0.0 - - -def add_h2_derivate_limit(n, investment_year, limits_volume_max): - for ct in limits_volume_max["h2_derivate_import"]: - limit = limits_volume_max["h2_derivate_import"][ct][investment_year] * 1e6 - - logger.info(f"limiting H2 derivate imports in {ct} to {limit/1e6} TWh/a") - - incoming = n.links.loc[ - [ - "EU renewable oil -> DE oil", - "EU methanol -> DE methanol", - "EU renewable gas -> DE gas", - ] - ].index - outgoing = n.links.loc[ - [ - "DE renewable oil -> EU oil", - "DE methanol -> EU methanol", - "DE renewable gas -> EU gas", - ] - ].index - - incoming_p = ( - n.model["Link-p"].loc[:, incoming] * n.snapshot_weightings.generators - ).sum() - outgoing_p = ( - n.model["Link-p"].loc[:, outgoing] * n.snapshot_weightings.generators - ).sum() - - lhs = incoming_p - outgoing_p - - cname = f"H2_derivate_import_limit-{ct}" - - n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") - - if cname in n.global_constraints.index: - logger.warning( - f"Global constraint {cname} already exists. Dropping and adding it again." - ) - n.global_constraints.drop(cname, inplace=True) - - n.add( - "GlobalConstraint", - cname, - constant=limit, - sense="<=", - type="", - carrier_attribute="", - ) - - -def adapt_nuclear_output(n): - logger.info( - f"limiting german electricity generation from nuclear to 2020 value of 61 TWh" - ) - limit = 61e6 - - nuclear_de_index = n.links.index[ - (n.links.carrier == "nuclear") & (n.links.index.str[:2] == "DE") - ] - - nuclear_gen = ( - n.model["Link-p"].loc[:, nuclear_de_index] - * n.links.loc[nuclear_de_index, "efficiency"] - * n.snapshot_weightings.generators - ).sum() - - lhs = nuclear_gen - - cname = f"Nuclear_generation_limit-DE" - - n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") - - if cname in n.global_constraints.index: - logger.warning( - f"Global constraint {cname} already exists. Dropping and adding it again." - ) - n.global_constraints.drop(cname, inplace=True) - - n.add( - "GlobalConstraint", - cname, - constant=limit, - sense="<=", - type="", - carrier_attribute="", - ) - - -def additional_functionality(n, snapshots, snakemake): - logger.info("Adding Ariadne-specific functionality") - - investment_year = int(snakemake.wildcards.planning_horizons[-4:]) - constraints = snakemake.params.solving["constraints"] - - add_capacity_limits( - n, investment_year, constraints["limits_capacity_min"], "minimum" - ) - - add_capacity_limits( - n, investment_year, constraints["limits_capacity_max"], "maximum" - ) - - add_power_limits(n, investment_year, constraints["limits_power_max"]) - - if int(snakemake.wildcards.clusters) != 1: - h2_import_limits(n, investment_year, constraints["limits_volume_max"]) - - electricity_import_limits(n, investment_year, constraints["limits_volume_max"]) - - if investment_year >= 2025: - h2_production_limits( - n, - investment_year, - constraints["limits_volume_min"], - constraints["limits_volume_max"], - ) - - if not snakemake.config["run"]["debug_h2deriv_limit"]: - add_h2_derivate_limit(n, investment_year, constraints["limits_volume_max"]) - - # force_boiler_profiles_existing_per_load(n) - force_boiler_profiles_existing_per_boiler(n) - - if isinstance(constraints["co2_budget_national"], dict): - limit_countries = constraints["co2_budget_national"][investment_year] - add_co2limit_country( - n, - limit_countries, - snakemake, - debug=snakemake.config["run"]["debug_co2_limit"], - ) - else: - logger.warning("No national CO2 budget specified!") - - if investment_year == 2020: - adapt_nuclear_output(n) From a3ddba056e3d19072fb5acf0e7ffcdd13a9769ab Mon Sep 17 00:00:00 2001 From: lkstrp Date: Thu, 23 Jan 2025 12:28:17 +0100 Subject: [PATCH 2/5] fix --- config/config.default.yaml | 2 +- data/custom_extra_functionality.py | 739 +---------------------------- 2 files changed, 8 insertions(+), 733 deletions(-) diff --git a/config/config.default.yaml b/config/config.default.yaml index c2ec2ce4f..1fe8d262d 100644 --- a/config/config.default.yaml +++ b/config/config.default.yaml @@ -909,7 +909,7 @@ solving: skip_iterations: true rolling_horizon: false seed: 123 - custom_extra_functionality: "../data/custom_extra_functionality.py" + custom_extra_functionality: "../scripts/pypsa-de/additional_functionality.py" # io_api: "direct" # Increases performance but only supported for the highs and gurobi solvers # options that go into the optimize function track_iterations: false diff --git a/data/custom_extra_functionality.py b/data/custom_extra_functionality.py index 3912fa05b..e7a9df0fc 100644 --- a/data/custom_extra_functionality.py +++ b/data/custom_extra_functionality.py @@ -1,736 +1,11 @@ -import sys -import logging +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: : 2023- The PyPSA-Eur Authors +# +# SPDX-License-Identifier: MIT -import pandas as pd -from xarray import DataArray -from scripts.prepare_sector_network import determine_emission_sectors - -logger = logging.getLogger(__name__) - - -def add_capacity_limits(n, investment_year, limits_capacity, sense="maximum"): - for c in n.iterate_components(limits_capacity): - logger.info(f"Adding {sense} constraints for {c.list_name}") - - attr = "e" if c.name == "Store" else "p" - units = "MWh or tCO2" if c.name == "Store" else "MW" - - for carrier in limits_capacity[c.name]: - for ct in limits_capacity[c.name][carrier]: - if investment_year not in limits_capacity[c.name][carrier][ct].keys(): - continue - - limit = 1e3 * limits_capacity[c.name][carrier][ct][investment_year] - - logger.info( - f"Adding constraint on {c.name} {carrier} capacity in {ct} to be {sense} {limit} {units}" - ) - - valid_components = ( - (c.df.index.str[:2] == ct) - & (c.df.carrier.str[: len(carrier)] == carrier) - & ~c.df.carrier.str.contains("thermal") - ) # exclude solar thermal - - existing_index = c.df.index[ - valid_components & ~c.df[attr + "_nom_extendable"] - ] - extendable_index = c.df.index[ - valid_components & c.df[attr + "_nom_extendable"] - ] - - existing_capacity = c.df.loc[existing_index, attr + "_nom"].sum() - - logger.info( - f"Existing {c.name} {carrier} capacity in {ct}: {existing_capacity} {units}" - ) - - nom = n.model[c.name + "-" + attr + "_nom"].loc[extendable_index] - - lhs = nom.sum() - - cname = f"capacity_{sense}-{ct}-{c.name}-{carrier.replace(' ','-')}" - - if cname in n.global_constraints.index: - logger.warning( - f"Global constraint {cname} already exists. Dropping and adding it again." - ) - n.global_constraints.drop(cname, inplace=True) - - rhs = limit - existing_capacity - - if sense == "maximum": - if rhs <= 0: - logger.warning( - f"Existing capacity in {ct} for carrier {carrier} already exceeds the limit of {limit} MW. Limiting capacity expansion for this investment period to 0." - ) - rhs = 0 - - n.model.add_constraints( - lhs <= rhs, - name=f"GlobalConstraint-{cname}", - ) - n.add( - "GlobalConstraint", - cname, - constant=rhs, - sense="<=", - type="", - carrier_attribute="", - ) - - elif sense == "minimum": - n.model.add_constraints( - lhs >= rhs, - name=f"GlobalConstraint-{cname}", - ) - n.add( - "GlobalConstraint", - cname, - constant=rhs, - sense=">=", - type="", - carrier_attribute="", - ) - else: - logger.error("sense {sense} not recognised") - sys.exit() - - -def add_power_limits(n, investment_year, limits_power_max): - """ - " Restricts the maximum inflow/outflow of electricity from/to a country. - """ - for ct in limits_power_max: - if investment_year not in limits_power_max[ct].keys(): - continue - - limit = 1e3 * limits_power_max[ct][investment_year] / 10 - - logger.info( - f"Adding constraint on electricity import/export from/to {ct} to be < {limit} MW" - ) - incoming_line = n.lines.index[ - (n.lines.carrier == "AC") - & (n.lines.bus0.str[:2] != ct) - & (n.lines.bus1.str[:2] == ct) - ] - outgoing_line = n.lines.index[ - (n.lines.carrier == "AC") - & (n.lines.bus0.str[:2] == ct) - & (n.lines.bus1.str[:2] != ct) - ] - - incoming_link = n.links.index[ - (n.links.carrier == "DC") - & (n.links.bus0.str[:2] != ct) - & (n.links.bus1.str[:2] == ct) - ] - outgoing_link = n.links.index[ - (n.links.carrier == "DC") - & (n.links.bus0.str[:2] == ct) - & (n.links.bus1.str[:2] != ct) - ] - - # iterate over snapshots - otherwise exporting of postnetwork fails since - # the constraints are time dependent - for t in n.snapshots: - incoming_line_p = n.model["Line-s"].loc[t, incoming_line] - outgoing_line_p = n.model["Line-s"].loc[t, outgoing_line] - incoming_link_p = n.model["Link-p"].loc[t, incoming_link] - outgoing_link_p = n.model["Link-p"].loc[t, outgoing_link] - - lhs = ( - incoming_link_p.sum() - - outgoing_link_p.sum() - + incoming_line_p.sum() - - outgoing_line_p.sum() - ) / 10 - # divide by 10 to avoid numerical issues - - cname_upper = f"Power-import-limit-{ct}-{t}" - cname_lower = f"Power-export-limit-{ct}-{t}" - - n.model.add_constraints(lhs <= limit, name=cname_upper) - n.model.add_constraints(lhs >= -limit, name=cname_lower) - - # not adding to network as the shadow prices are not needed - - -def h2_import_limits(n, investment_year, limits_volume_max): - for ct in limits_volume_max["h2_import"]: - limit = limits_volume_max["h2_import"][ct][investment_year] * 1e6 - - logger.info(f"limiting H2 imports in {ct} to {limit/1e6} TWh/a") - pipeline_carrier = [ - "H2 pipeline", - "H2 pipeline (Kernnetz)", - "H2 pipeline retrofitted", - ] - incoming = n.links.index[ - (n.links.carrier.isin(pipeline_carrier)) - & (n.links.bus0.str[:2] != ct) - & (n.links.bus1.str[:2] == ct) - ] - outgoing = n.links.index[ - (n.links.carrier.isin(pipeline_carrier)) - & (n.links.bus0.str[:2] == ct) - & (n.links.bus1.str[:2] != ct) - ] - - incoming_p = ( - n.model["Link-p"].loc[:, incoming] * n.snapshot_weightings.generators - ).sum() - outgoing_p = ( - n.model["Link-p"].loc[:, outgoing] * n.snapshot_weightings.generators - ).sum() - - lhs = incoming_p - outgoing_p - - cname = f"H2_import_limit-{ct}" - - n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") - - if cname in n.global_constraints.index: - logger.warning( - f"Global constraint {cname} already exists. Dropping and adding it again." - ) - n.global_constraints.drop(cname, inplace=True) - - n.add( - "GlobalConstraint", - cname, - constant=limit, - sense="<=", - type="", - carrier_attribute="", - ) - - -def h2_production_limits(n, investment_year, limits_volume_min, limits_volume_max): - for ct in limits_volume_max["electrolysis"]: - if ct not in limits_volume_min["electrolysis"]: - logger.warning( - f"no lower limit for H2 electrolysis in {ct} assuming 0 TWh/a" - ) - limit_lower = 0 - else: - limit_lower = limits_volume_min["electrolysis"][ct][investment_year] * 1e6 - - limit_upper = limits_volume_max["electrolysis"][ct][investment_year] * 1e6 - - logger.info( - f"limiting H2 electrolysis in DE between {limit_lower/1e6} and {limit_upper/1e6} TWh/a" - ) - - production = n.links[ - (n.links.carrier == "H2 Electrolysis") & (n.links.bus0.str.contains(ct)) - ].index - efficiency = n.links.loc[production, "efficiency"] - - lhs = ( - n.model["Link-p"].loc[:, production] - * n.snapshot_weightings.generators - * efficiency - ).sum() - - cname_upper = f"H2_production_limit_upper-{ct}" - cname_lower = f"H2_production_limit_lower-{ct}" - - n.model.add_constraints( - lhs <= limit_upper, name=f"GlobalConstraint-{cname_upper}" - ) - - n.model.add_constraints( - lhs >= limit_lower, name=f"GlobalConstraint-{cname_lower}" - ) - - if cname_upper not in n.global_constraints.index: - n.add( - "GlobalConstraint", - cname_upper, - constant=limit_upper, - sense="<=", - type="", - carrier_attribute="", - ) - if cname_lower not in n.global_constraints.index: - n.add( - "GlobalConstraint", - cname_lower, - constant=limit_lower, - sense=">=", - type="", - carrier_attribute="", - ) - - -def electricity_import_limits(n, investment_year, limits_volume_max): - for ct in limits_volume_max["electricity_import"]: - limit = limits_volume_max["electricity_import"][ct][investment_year] * 1e6 - - logger.info(f"limiting electricity imports in {ct} to {limit/1e6} TWh/a") - - incoming_line = n.lines.index[ - (n.lines.carrier == "AC") - & (n.lines.bus0.str[:2] != ct) - & (n.lines.bus1.str[:2] == ct) - ] - outgoing_line = n.lines.index[ - (n.lines.carrier == "AC") - & (n.lines.bus0.str[:2] == ct) - & (n.lines.bus1.str[:2] != ct) - ] - - incoming_link = n.links.index[ - (n.links.carrier == "DC") - & (n.links.bus0.str[:2] != ct) - & (n.links.bus1.str[:2] == ct) - ] - outgoing_link = n.links.index[ - (n.links.carrier == "DC") - & (n.links.bus0.str[:2] == ct) - & (n.links.bus1.str[:2] != ct) - ] - - incoming_line_p = ( - n.model["Line-s"].loc[:, incoming_line] * n.snapshot_weightings.generators - ).sum() - outgoing_line_p = ( - n.model["Line-s"].loc[:, outgoing_line] * n.snapshot_weightings.generators - ).sum() - - incoming_link_p = ( - n.model["Link-p"].loc[:, incoming_link] * n.snapshot_weightings.generators - ).sum() - outgoing_link_p = ( - n.model["Link-p"].loc[:, outgoing_link] * n.snapshot_weightings.generators - ).sum() - - lhs = (incoming_link_p - outgoing_link_p) + (incoming_line_p - outgoing_line_p) - - cname = f"Electricity_import_limit-{ct}" - - n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") - - if cname in n.global_constraints.index: - logger.warning( - f"Global constraint {cname} already exists. Dropping and adding it again." - ) - n.global_constraints.drop(cname, inplace=True) - - n.add( - "GlobalConstraint", - cname, - constant=limit, - sense="<=", - type="", - carrier_attribute="", - ) - - -def add_co2limit_country(n, limit_countries, snakemake, debug=False): - """ - Add a set of emissions limit constraints for specified countries. - - The countries and emissions limits are specified in the config file entry 'co2_budget_national'. - - Parameters - ---------- - n : pypsa.Network - limit_countries : dict - snakemake: snakemake object - """ - logger.info(f"Adding CO2 budget limit for each country as per unit of 1990 levels") - - nhours = n.snapshot_weightings.generators.sum() - nyears = nhours / 8760 - - sectors = determine_emission_sectors(n.config["sector"]) - - # convert MtCO2 to tCO2 - co2_totals = 1e6 * pd.read_csv(snakemake.input.co2_totals_name, index_col=0) - - co2_total_totals = co2_totals[sectors].sum(axis=1) * nyears - - for ct in limit_countries: - limit = co2_total_totals[ct] * limit_countries[ct] - logger.info( - f"Limiting emissions in country {ct} to {limit_countries[ct]:.1%} of " - f"1990 levels, i.e. {limit:,.2f} tCO2/a", - ) - - lhs = [] - - for port in [col[3:] for col in n.links if col.startswith("bus")]: - links = n.links.index[ - (n.links.index.str[:2] == ct) - & (n.links[f"bus{port}"] == "co2 atmosphere") - & ( - n.links.carrier != "kerosene for aviation" - ) # first exclude aviation to multiply it with a domestic factor later - ] - - logger.info( - f"For {ct} adding following link carriers to port {port} CO2 constraint: {n.links.loc[links,'carrier'].unique()}" - ) - - if port == "0": - efficiency = -1.0 - elif port == "1": - efficiency = n.links.loc[links, f"efficiency"] - else: - efficiency = n.links.loc[links, f"efficiency{port}"] - - lhs.append( - ( - n.model["Link-p"].loc[:, links] - * efficiency - * n.snapshot_weightings.generators - ).sum() - ) - - # Aviation demand - energy_totals = pd.read_csv(snakemake.input.energy_totals, index_col=[0, 1]) - domestic_aviation = energy_totals.loc[ - ("DE", snakemake.params.energy_year), "total domestic aviation" - ] - international_aviation = energy_totals.loc[ - ("DE", snakemake.params.energy_year), "total international aviation" - ] - domestic_factor = domestic_aviation / ( - domestic_aviation + international_aviation - ) - aviation_links = n.links[ - (n.links.index.str[:2] == ct) & (n.links.carrier == "kerosene for aviation") - ] - lhs.append - ( - n.model["Link-p"].loc[:, aviation_links.index] - * aviation_links.efficiency2 - * n.snapshot_weightings.generators - ).sum() * domestic_factor - logger.info( - f"Adding domestic aviation emissions for {ct} with a factor of {domestic_factor}" - ) - - # Adding Efuel imports and exports to constraint - incoming_oil = n.links.index[n.links.index == "EU renewable oil -> DE oil"] - outgoing_oil = n.links.index[n.links.index == "DE renewable oil -> EU oil"] - - if not debug: - lhs.append( - ( - -1 - * n.model["Link-p"].loc[:, incoming_oil] - * 0.2571 - * n.snapshot_weightings.generators - ).sum() - ) - lhs.append( - ( - n.model["Link-p"].loc[:, outgoing_oil] - * 0.2571 - * n.snapshot_weightings.generators - ).sum() - ) - - incoming_methanol = n.links.index[n.links.index == "EU methanol -> DE methanol"] - outgoing_methanol = n.links.index[n.links.index == "DE methanol -> EU methanol"] - - lhs.append( - ( - -1 - * n.model["Link-p"].loc[:, incoming_methanol] - / snakemake.config["sector"]["MWh_MeOH_per_tCO2"] - * n.snapshot_weightings.generators - ).sum() - ) - - lhs.append( - ( - n.model["Link-p"].loc[:, outgoing_methanol] - / snakemake.config["sector"]["MWh_MeOH_per_tCO2"] - * n.snapshot_weightings.generators - ).sum() - ) - - # Methane - incoming_CH4 = n.links.index[n.links.index == "EU renewable gas -> DE gas"] - outgoing_CH4 = n.links.index[n.links.index == "DE renewable gas -> EU gas"] - - lhs.append( - ( - -1 - * n.model["Link-p"].loc[:, incoming_CH4] - * 0.198 - * n.snapshot_weightings.generators - ).sum() - ) - - lhs.append( - ( - n.model["Link-p"].loc[:, outgoing_CH4] - * 0.198 - * n.snapshot_weightings.generators - ).sum() - ) - - lhs = sum(lhs) - - cname = f"co2_limit-{ct}" - - n.model.add_constraints( - lhs <= limit, - name=f"GlobalConstraint-{cname}", - ) - - if cname in n.global_constraints.index: - logger.warning( - f"Global constraint {cname} already exists. Dropping and adding it again." - ) - n.global_constraints.drop(cname, inplace=True) - - n.add( - "GlobalConstraint", - cname, - constant=limit, - sense="<=", - type="", - carrier_attribute="", - ) - - -def force_boiler_profiles_existing_per_load(n): - """ - This scales the boiler dispatch to the load profile with a factor common to - all boilers at load. - """ - - logger.info("Forcing boiler profiles for existing ones") - - decentral_boilers = n.links.index[ - n.links.carrier.str.contains("boiler") - & ~n.links.carrier.str.contains("urban central") - & ~n.links.p_nom_extendable - ] - - if decentral_boilers.empty: - return - - boiler_loads = n.links.loc[decentral_boilers, "bus1"] - boiler_loads = boiler_loads[boiler_loads.isin(n.loads_t.p_set.columns)] - decentral_boilers = boiler_loads.index - boiler_profiles_pu = n.loads_t.p_set[boiler_loads].div( - n.loads_t.p_set[boiler_loads].max(), axis=1 - ) - boiler_profiles_pu.columns = decentral_boilers - boiler_profiles = DataArray( - boiler_profiles_pu.multiply(n.links.loc[decentral_boilers, "p_nom"], axis=1) - ) - - boiler_load_index = pd.Index(boiler_loads.unique()) - boiler_load_index.name = "Load" - - # per load scaling factor - n.model.add_variables(coords=[boiler_load_index], name="Load-profile_factor") - - # clumsy indicator matrix to map boilers to loads - df = pd.DataFrame(index=boiler_load_index, columns=decentral_boilers, data=0.0) - for k, v in boiler_loads.items(): - df.loc[v, k] = 1.0 - - lhs = n.model["Link-p"].loc[:, decentral_boilers] - ( - boiler_profiles * DataArray(df) * n.model["Load-profile_factor"] - ).sum("Load") - - n.model.add_constraints(lhs, "=", 0, "Link-fixed_profile") - - # hack so that PyPSA doesn't complain there is nowhere to store the variable - n.loads["profile_factor_opt"] = 0.0 - - -def force_boiler_profiles_existing_per_boiler(n): +def custom_extra_functionality(n, snapshots, snakemake): """ - This scales each boiler dispatch to be proportional to the load profile. + Add custom extra functionality constraints. """ - - logger.info( - "Forcing each existing boiler dispatch to be proportional to the load profile" - ) - - decentral_boilers = n.links.index[ - n.links.carrier.str.contains("boiler") - & ~n.links.carrier.str.contains("urban central") - & ~n.links.p_nom_extendable - ] - - if decentral_boilers.empty: - return - - boiler_loads = n.links.loc[decentral_boilers, "bus1"] - boiler_loads = boiler_loads[boiler_loads.isin(n.loads_t.p_set.columns)] - decentral_boilers = boiler_loads.index - boiler_profiles_pu = n.loads_t.p_set[boiler_loads].div( - n.loads_t.p_set[boiler_loads].max(), axis=1 - ) - boiler_profiles_pu.columns = decentral_boilers - boiler_profiles = DataArray( - boiler_profiles_pu.multiply(n.links.loc[decentral_boilers, "p_nom"], axis=1) - ) - - # will be per unit - n.model.add_variables(coords=[decentral_boilers], name="Link-fixed_profile_scaling") - - lhs = ( - (1, n.model["Link-p"].loc[:, decentral_boilers]), - ( - -boiler_profiles, - n.model["Link-fixed_profile_scaling"], - ), - ) - - n.model.add_constraints(lhs, "=", 0, "Link-fixed_profile_scaling") - - # hack so that PyPSA doesn't complain there is nowhere to store the variable - n.links["fixed_profile_scaling_opt"] = 0.0 - - -def add_h2_derivate_limit(n, investment_year, limits_volume_max): - for ct in limits_volume_max["h2_derivate_import"]: - limit = limits_volume_max["h2_derivate_import"][ct][investment_year] * 1e6 - - logger.info(f"limiting H2 derivate imports in {ct} to {limit/1e6} TWh/a") - - incoming = n.links.loc[ - [ - "EU renewable oil -> DE oil", - "EU methanol -> DE methanol", - "EU renewable gas -> DE gas", - ] - ].index - outgoing = n.links.loc[ - [ - "DE renewable oil -> EU oil", - "DE methanol -> EU methanol", - "DE renewable gas -> EU gas", - ] - ].index - - incoming_p = ( - n.model["Link-p"].loc[:, incoming] * n.snapshot_weightings.generators - ).sum() - outgoing_p = ( - n.model["Link-p"].loc[:, outgoing] * n.snapshot_weightings.generators - ).sum() - - lhs = incoming_p - outgoing_p - - cname = f"H2_derivate_import_limit-{ct}" - - n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") - - if cname in n.global_constraints.index: - logger.warning( - f"Global constraint {cname} already exists. Dropping and adding it again." - ) - n.global_constraints.drop(cname, inplace=True) - - n.add( - "GlobalConstraint", - cname, - constant=limit, - sense="<=", - type="", - carrier_attribute="", - ) - - -def adapt_nuclear_output(n): - logger.info( - f"limiting german electricity generation from nuclear to 2020 value of 61 TWh" - ) - limit = 61e6 - - nuclear_de_index = n.links.index[ - (n.links.carrier == "nuclear") & (n.links.index.str[:2] == "DE") - ] - - nuclear_gen = ( - n.model["Link-p"].loc[:, nuclear_de_index] - * n.links.loc[nuclear_de_index, "efficiency"] - * n.snapshot_weightings.generators - ).sum() - - lhs = nuclear_gen - - cname = f"Nuclear_generation_limit-DE" - - n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") - - if cname in n.global_constraints.index: - logger.warning( - f"Global constraint {cname} already exists. Dropping and adding it again." - ) - n.global_constraints.drop(cname, inplace=True) - - n.add( - "GlobalConstraint", - cname, - constant=limit, - sense="<=", - type="", - carrier_attribute="", - ) - - -def custom_extra_functionality(n, snapshots, snakemake): - - logger.info("Adding Ariadne-specific functionality") - - investment_year = int(snakemake.wildcards.planning_horizons[-4:]) - constraints = snakemake.params.solving["constraints"] - - add_capacity_limits( - n, investment_year, constraints["limits_capacity_min"], "minimum" - ) - - add_capacity_limits( - n, investment_year, constraints["limits_capacity_max"], "maximum" - ) - - add_power_limits(n, investment_year, constraints["limits_power_max"]) - - if int(snakemake.wildcards.clusters) != 1: - h2_import_limits(n, investment_year, constraints["limits_volume_max"]) - - electricity_import_limits(n, investment_year, constraints["limits_volume_max"]) - - if investment_year >= 2025: - h2_production_limits( - n, - investment_year, - constraints["limits_volume_min"], - constraints["limits_volume_max"], - ) - - if not snakemake.config["run"]["debug_h2deriv_limit"]: - add_h2_derivate_limit(n, investment_year, constraints["limits_volume_max"]) - - # force_boiler_profiles_existing_per_load(n) - force_boiler_profiles_existing_per_boiler(n) - - if isinstance(constraints["co2_budget_national"], dict): - limit_countries = constraints["co2_budget_national"][investment_year] - add_co2limit_country( - n, - limit_countries, - snakemake, - debug=snakemake.config["run"]["debug_co2_limit"], - ) - else: - logger.warning("No national CO2 budget specified!") - - if investment_year == 2020: - adapt_nuclear_output(n) + pass From 8c869ff4f1ea9944a6f5ae8eef156b93a4291917 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 23 Jan 2025 11:37:33 +0000 Subject: [PATCH 3/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a2da05ca1..de06f833f 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ Before running any analysis with scenarios, the rule `build_scenarios` must be e snakemake -call build_scenarios -f or in case of using the public database - + snakemake -call build_scenarios --configfile=config/config.public.yaml -f Note that the hierarchy of scenario files is the following: `scenarios.automated.yaml` > (any `explicitly specified --configfiles`) > `config.yaml `> `config.default.yaml `Changes in the file `scenarios.manual.yaml `are only taken into account if the rule `build_scenarios` is executed. From ae1f91f1a41cd675e2c0f989917fece7ce760414 Mon Sep 17 00:00:00 2001 From: lkstrp Date: Thu, 23 Jan 2025 12:38:31 +0100 Subject: [PATCH 4/5] fix --- scripts/pypsa-de/additional_functionality.py | 735 +++++++++++++++++++ 1 file changed, 735 insertions(+) create mode 100644 scripts/pypsa-de/additional_functionality.py diff --git a/scripts/pypsa-de/additional_functionality.py b/scripts/pypsa-de/additional_functionality.py new file mode 100644 index 000000000..9a53656d5 --- /dev/null +++ b/scripts/pypsa-de/additional_functionality.py @@ -0,0 +1,735 @@ +import sys +import logging + +import pandas as pd +from xarray import DataArray + +from scripts.prepare_sector_network import determine_emission_sectors + +logger = logging.getLogger(__name__) + + +def add_capacity_limits(n, investment_year, limits_capacity, sense="maximum"): + for c in n.iterate_components(limits_capacity): + logger.info(f"Adding {sense} constraints for {c.list_name}") + + attr = "e" if c.name == "Store" else "p" + units = "MWh or tCO2" if c.name == "Store" else "MW" + + for carrier in limits_capacity[c.name]: + for ct in limits_capacity[c.name][carrier]: + if investment_year not in limits_capacity[c.name][carrier][ct].keys(): + continue + + limit = 1e3 * limits_capacity[c.name][carrier][ct][investment_year] + + logger.info( + f"Adding constraint on {c.name} {carrier} capacity in {ct} to be {sense} {limit} {units}" + ) + + valid_components = ( + (c.df.index.str[:2] == ct) + & (c.df.carrier.str[: len(carrier)] == carrier) + & ~c.df.carrier.str.contains("thermal") + ) # exclude solar thermal + + existing_index = c.df.index[ + valid_components & ~c.df[attr + "_nom_extendable"] + ] + extendable_index = c.df.index[ + valid_components & c.df[attr + "_nom_extendable"] + ] + + existing_capacity = c.df.loc[existing_index, attr + "_nom"].sum() + + logger.info( + f"Existing {c.name} {carrier} capacity in {ct}: {existing_capacity} {units}" + ) + + nom = n.model[c.name + "-" + attr + "_nom"].loc[extendable_index] + + lhs = nom.sum() + + cname = f"capacity_{sense}-{ct}-{c.name}-{carrier.replace(' ', '-')}" + + if cname in n.global_constraints.index: + logger.warning( + f"Global constraint {cname} already exists. Dropping and adding it again." + ) + n.global_constraints.drop(cname, inplace=True) + + rhs = limit - existing_capacity + + if sense == "maximum": + if rhs <= 0: + logger.warning( + f"Existing capacity in {ct} for carrier {carrier} already exceeds the limit of {limit} MW. Limiting capacity expansion for this investment period to 0." + ) + rhs = 0 + + n.model.add_constraints( + lhs <= rhs, + name=f"GlobalConstraint-{cname}", + ) + n.add( + "GlobalConstraint", + cname, + constant=rhs, + sense="<=", + type="", + carrier_attribute="", + ) + + elif sense == "minimum": + n.model.add_constraints( + lhs >= rhs, + name=f"GlobalConstraint-{cname}", + ) + n.add( + "GlobalConstraint", + cname, + constant=rhs, + sense=">=", + type="", + carrier_attribute="", + ) + else: + logger.error("sense {sense} not recognised") + sys.exit() + + +def add_power_limits(n, investment_year, limits_power_max): + """ + " Restricts the maximum inflow/outflow of electricity from/to a country. + """ + for ct in limits_power_max: + if investment_year not in limits_power_max[ct].keys(): + continue + + limit = 1e3 * limits_power_max[ct][investment_year] / 10 + + logger.info( + f"Adding constraint on electricity import/export from/to {ct} to be < {limit} MW" + ) + incoming_line = n.lines.index[ + (n.lines.carrier == "AC") + & (n.lines.bus0.str[:2] != ct) + & (n.lines.bus1.str[:2] == ct) + ] + outgoing_line = n.lines.index[ + (n.lines.carrier == "AC") + & (n.lines.bus0.str[:2] == ct) + & (n.lines.bus1.str[:2] != ct) + ] + + incoming_link = n.links.index[ + (n.links.carrier == "DC") + & (n.links.bus0.str[:2] != ct) + & (n.links.bus1.str[:2] == ct) + ] + outgoing_link = n.links.index[ + (n.links.carrier == "DC") + & (n.links.bus0.str[:2] == ct) + & (n.links.bus1.str[:2] != ct) + ] + + # iterate over snapshots - otherwise exporting of postnetwork fails since + # the constraints are time dependent + for t in n.snapshots: + incoming_line_p = n.model["Line-s"].loc[t, incoming_line] + outgoing_line_p = n.model["Line-s"].loc[t, outgoing_line] + incoming_link_p = n.model["Link-p"].loc[t, incoming_link] + outgoing_link_p = n.model["Link-p"].loc[t, outgoing_link] + + lhs = ( + incoming_link_p.sum() + - outgoing_link_p.sum() + + incoming_line_p.sum() + - outgoing_line_p.sum() + ) / 10 + # divide by 10 to avoid numerical issues + + cname_upper = f"Power-import-limit-{ct}-{t}" + cname_lower = f"Power-export-limit-{ct}-{t}" + + n.model.add_constraints(lhs <= limit, name=cname_upper) + n.model.add_constraints(lhs >= -limit, name=cname_lower) + + # not adding to network as the shadow prices are not needed + + +def h2_import_limits(n, investment_year, limits_volume_max): + for ct in limits_volume_max["h2_import"]: + limit = limits_volume_max["h2_import"][ct][investment_year] * 1e6 + + logger.info(f"limiting H2 imports in {ct} to {limit / 1e6} TWh/a") + pipeline_carrier = [ + "H2 pipeline", + "H2 pipeline (Kernnetz)", + "H2 pipeline retrofitted", + ] + incoming = n.links.index[ + (n.links.carrier.isin(pipeline_carrier)) + & (n.links.bus0.str[:2] != ct) + & (n.links.bus1.str[:2] == ct) + ] + outgoing = n.links.index[ + (n.links.carrier.isin(pipeline_carrier)) + & (n.links.bus0.str[:2] == ct) + & (n.links.bus1.str[:2] != ct) + ] + + incoming_p = ( + n.model["Link-p"].loc[:, incoming] * n.snapshot_weightings.generators + ).sum() + outgoing_p = ( + n.model["Link-p"].loc[:, outgoing] * n.snapshot_weightings.generators + ).sum() + + lhs = incoming_p - outgoing_p + + cname = f"H2_import_limit-{ct}" + + n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") + + if cname in n.global_constraints.index: + logger.warning( + f"Global constraint {cname} already exists. Dropping and adding it again." + ) + n.global_constraints.drop(cname, inplace=True) + + n.add( + "GlobalConstraint", + cname, + constant=limit, + sense="<=", + type="", + carrier_attribute="", + ) + + +def h2_production_limits(n, investment_year, limits_volume_min, limits_volume_max): + for ct in limits_volume_max["electrolysis"]: + if ct not in limits_volume_min["electrolysis"]: + logger.warning( + f"no lower limit for H2 electrolysis in {ct} assuming 0 TWh/a" + ) + limit_lower = 0 + else: + limit_lower = limits_volume_min["electrolysis"][ct][investment_year] * 1e6 + + limit_upper = limits_volume_max["electrolysis"][ct][investment_year] * 1e6 + + logger.info( + f"limiting H2 electrolysis in DE between {limit_lower / 1e6} and {limit_upper / 1e6} TWh/a" + ) + + production = n.links[ + (n.links.carrier == "H2 Electrolysis") & (n.links.bus0.str.contains(ct)) + ].index + efficiency = n.links.loc[production, "efficiency"] + + lhs = ( + n.model["Link-p"].loc[:, production] + * n.snapshot_weightings.generators + * efficiency + ).sum() + + cname_upper = f"H2_production_limit_upper-{ct}" + cname_lower = f"H2_production_limit_lower-{ct}" + + n.model.add_constraints( + lhs <= limit_upper, name=f"GlobalConstraint-{cname_upper}" + ) + + n.model.add_constraints( + lhs >= limit_lower, name=f"GlobalConstraint-{cname_lower}" + ) + + if cname_upper not in n.global_constraints.index: + n.add( + "GlobalConstraint", + cname_upper, + constant=limit_upper, + sense="<=", + type="", + carrier_attribute="", + ) + if cname_lower not in n.global_constraints.index: + n.add( + "GlobalConstraint", + cname_lower, + constant=limit_lower, + sense=">=", + type="", + carrier_attribute="", + ) + + +def electricity_import_limits(n, investment_year, limits_volume_max): + for ct in limits_volume_max["electricity_import"]: + limit = limits_volume_max["electricity_import"][ct][investment_year] * 1e6 + + logger.info(f"limiting electricity imports in {ct} to {limit / 1e6} TWh/a") + + incoming_line = n.lines.index[ + (n.lines.carrier == "AC") + & (n.lines.bus0.str[:2] != ct) + & (n.lines.bus1.str[:2] == ct) + ] + outgoing_line = n.lines.index[ + (n.lines.carrier == "AC") + & (n.lines.bus0.str[:2] == ct) + & (n.lines.bus1.str[:2] != ct) + ] + + incoming_link = n.links.index[ + (n.links.carrier == "DC") + & (n.links.bus0.str[:2] != ct) + & (n.links.bus1.str[:2] == ct) + ] + outgoing_link = n.links.index[ + (n.links.carrier == "DC") + & (n.links.bus0.str[:2] == ct) + & (n.links.bus1.str[:2] != ct) + ] + + incoming_line_p = ( + n.model["Line-s"].loc[:, incoming_line] * n.snapshot_weightings.generators + ).sum() + outgoing_line_p = ( + n.model["Line-s"].loc[:, outgoing_line] * n.snapshot_weightings.generators + ).sum() + + incoming_link_p = ( + n.model["Link-p"].loc[:, incoming_link] * n.snapshot_weightings.generators + ).sum() + outgoing_link_p = ( + n.model["Link-p"].loc[:, outgoing_link] * n.snapshot_weightings.generators + ).sum() + + lhs = (incoming_link_p - outgoing_link_p) + (incoming_line_p - outgoing_line_p) + + cname = f"Electricity_import_limit-{ct}" + + n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") + + if cname in n.global_constraints.index: + logger.warning( + f"Global constraint {cname} already exists. Dropping and adding it again." + ) + n.global_constraints.drop(cname, inplace=True) + + n.add( + "GlobalConstraint", + cname, + constant=limit, + sense="<=", + type="", + carrier_attribute="", + ) + + +def add_co2limit_country(n, limit_countries, snakemake, debug=False): + """ + Add a set of emissions limit constraints for specified countries. + + The countries and emissions limits are specified in the config file entry 'co2_budget_national'. + + Parameters + ---------- + n : pypsa.Network + limit_countries : dict + snakemake: snakemake object + """ + logger.info(f"Adding CO2 budget limit for each country as per unit of 1990 levels") + + nhours = n.snapshot_weightings.generators.sum() + nyears = nhours / 8760 + + sectors = determine_emission_sectors(n.config["sector"]) + + # convert MtCO2 to tCO2 + co2_totals = 1e6 * pd.read_csv(snakemake.input.co2_totals_name, index_col=0) + + co2_total_totals = co2_totals[sectors].sum(axis=1) * nyears + + for ct in limit_countries: + limit = co2_total_totals[ct] * limit_countries[ct] + logger.info( + f"Limiting emissions in country {ct} to {limit_countries[ct]:.1%} of " + f"1990 levels, i.e. {limit:,.2f} tCO2/a", + ) + + lhs = [] + + for port in [col[3:] for col in n.links if col.startswith("bus")]: + links = n.links.index[ + (n.links.index.str[:2] == ct) + & (n.links[f"bus{port}"] == "co2 atmosphere") + & ( + n.links.carrier != "kerosene for aviation" + ) # first exclude aviation to multiply it with a domestic factor later + ] + + logger.info( + f"For {ct} adding following link carriers to port {port} CO2 constraint: {n.links.loc[links, 'carrier'].unique()}" + ) + + if port == "0": + efficiency = -1.0 + elif port == "1": + efficiency = n.links.loc[links, f"efficiency"] + else: + efficiency = n.links.loc[links, f"efficiency{port}"] + + lhs.append( + ( + n.model["Link-p"].loc[:, links] + * efficiency + * n.snapshot_weightings.generators + ).sum() + ) + + # Aviation demand + energy_totals = pd.read_csv(snakemake.input.energy_totals, index_col=[0, 1]) + domestic_aviation = energy_totals.loc[ + ("DE", snakemake.params.energy_year), "total domestic aviation" + ] + international_aviation = energy_totals.loc[ + ("DE", snakemake.params.energy_year), "total international aviation" + ] + domestic_factor = domestic_aviation / ( + domestic_aviation + international_aviation + ) + aviation_links = n.links[ + (n.links.index.str[:2] == ct) & (n.links.carrier == "kerosene for aviation") + ] + lhs.append + ( + n.model["Link-p"].loc[:, aviation_links.index] + * aviation_links.efficiency2 + * n.snapshot_weightings.generators + ).sum() * domestic_factor + logger.info( + f"Adding domestic aviation emissions for {ct} with a factor of {domestic_factor}" + ) + + # Adding Efuel imports and exports to constraint + incoming_oil = n.links.index[n.links.index == "EU renewable oil -> DE oil"] + outgoing_oil = n.links.index[n.links.index == "DE renewable oil -> EU oil"] + + if not debug: + lhs.append( + ( + -1 + * n.model["Link-p"].loc[:, incoming_oil] + * 0.2571 + * n.snapshot_weightings.generators + ).sum() + ) + lhs.append( + ( + n.model["Link-p"].loc[:, outgoing_oil] + * 0.2571 + * n.snapshot_weightings.generators + ).sum() + ) + + incoming_methanol = n.links.index[n.links.index == "EU methanol -> DE methanol"] + outgoing_methanol = n.links.index[n.links.index == "DE methanol -> EU methanol"] + + lhs.append( + ( + -1 + * n.model["Link-p"].loc[:, incoming_methanol] + / snakemake.config["sector"]["MWh_MeOH_per_tCO2"] + * n.snapshot_weightings.generators + ).sum() + ) + + lhs.append( + ( + n.model["Link-p"].loc[:, outgoing_methanol] + / snakemake.config["sector"]["MWh_MeOH_per_tCO2"] + * n.snapshot_weightings.generators + ).sum() + ) + + # Methane + incoming_CH4 = n.links.index[n.links.index == "EU renewable gas -> DE gas"] + outgoing_CH4 = n.links.index[n.links.index == "DE renewable gas -> EU gas"] + + lhs.append( + ( + -1 + * n.model["Link-p"].loc[:, incoming_CH4] + * 0.198 + * n.snapshot_weightings.generators + ).sum() + ) + + lhs.append( + ( + n.model["Link-p"].loc[:, outgoing_CH4] + * 0.198 + * n.snapshot_weightings.generators + ).sum() + ) + + lhs = sum(lhs) + + cname = f"co2_limit-{ct}" + + n.model.add_constraints( + lhs <= limit, + name=f"GlobalConstraint-{cname}", + ) + + if cname in n.global_constraints.index: + logger.warning( + f"Global constraint {cname} already exists. Dropping and adding it again." + ) + n.global_constraints.drop(cname, inplace=True) + + n.add( + "GlobalConstraint", + cname, + constant=limit, + sense="<=", + type="", + carrier_attribute="", + ) + + +def force_boiler_profiles_existing_per_load(n): + """ + This scales the boiler dispatch to the load profile with a factor common to + all boilers at load. + """ + + logger.info("Forcing boiler profiles for existing ones") + + decentral_boilers = n.links.index[ + n.links.carrier.str.contains("boiler") + & ~n.links.carrier.str.contains("urban central") + & ~n.links.p_nom_extendable + ] + + if decentral_boilers.empty: + return + + boiler_loads = n.links.loc[decentral_boilers, "bus1"] + boiler_loads = boiler_loads[boiler_loads.isin(n.loads_t.p_set.columns)] + decentral_boilers = boiler_loads.index + boiler_profiles_pu = n.loads_t.p_set[boiler_loads].div( + n.loads_t.p_set[boiler_loads].max(), axis=1 + ) + boiler_profiles_pu.columns = decentral_boilers + boiler_profiles = DataArray( + boiler_profiles_pu.multiply(n.links.loc[decentral_boilers, "p_nom"], axis=1) + ) + + boiler_load_index = pd.Index(boiler_loads.unique()) + boiler_load_index.name = "Load" + + # per load scaling factor + n.model.add_variables(coords=[boiler_load_index], name="Load-profile_factor") + + # clumsy indicator matrix to map boilers to loads + df = pd.DataFrame(index=boiler_load_index, columns=decentral_boilers, data=0.0) + for k, v in boiler_loads.items(): + df.loc[v, k] = 1.0 + + lhs = n.model["Link-p"].loc[:, decentral_boilers] - ( + boiler_profiles * DataArray(df) * n.model["Load-profile_factor"] + ).sum("Load") + + n.model.add_constraints(lhs, "=", 0, "Link-fixed_profile") + + # hack so that PyPSA doesn't complain there is nowhere to store the variable + n.loads["profile_factor_opt"] = 0.0 + + +def force_boiler_profiles_existing_per_boiler(n): + """ + This scales each boiler dispatch to be proportional to the load profile. + """ + + logger.info( + "Forcing each existing boiler dispatch to be proportional to the load profile" + ) + + decentral_boilers = n.links.index[ + n.links.carrier.str.contains("boiler") + & ~n.links.carrier.str.contains("urban central") + & ~n.links.p_nom_extendable + ] + + if decentral_boilers.empty: + return + + boiler_loads = n.links.loc[decentral_boilers, "bus1"] + boiler_loads = boiler_loads[boiler_loads.isin(n.loads_t.p_set.columns)] + decentral_boilers = boiler_loads.index + boiler_profiles_pu = n.loads_t.p_set[boiler_loads].div( + n.loads_t.p_set[boiler_loads].max(), axis=1 + ) + boiler_profiles_pu.columns = decentral_boilers + boiler_profiles = DataArray( + boiler_profiles_pu.multiply(n.links.loc[decentral_boilers, "p_nom"], axis=1) + ) + + # will be per unit + n.model.add_variables(coords=[decentral_boilers], name="Link-fixed_profile_scaling") + + lhs = ( + (1, n.model["Link-p"].loc[:, decentral_boilers]), + ( + -boiler_profiles, + n.model["Link-fixed_profile_scaling"], + ), + ) + + n.model.add_constraints(lhs, "=", 0, "Link-fixed_profile_scaling") + + # hack so that PyPSA doesn't complain there is nowhere to store the variable + n.links["fixed_profile_scaling_opt"] = 0.0 + + +def add_h2_derivate_limit(n, investment_year, limits_volume_max): + for ct in limits_volume_max["h2_derivate_import"]: + limit = limits_volume_max["h2_derivate_import"][ct][investment_year] * 1e6 + + logger.info(f"limiting H2 derivate imports in {ct} to {limit / 1e6} TWh/a") + + incoming = n.links.loc[ + [ + "EU renewable oil -> DE oil", + "EU methanol -> DE methanol", + "EU renewable gas -> DE gas", + ] + ].index + outgoing = n.links.loc[ + [ + "DE renewable oil -> EU oil", + "DE methanol -> EU methanol", + "DE renewable gas -> EU gas", + ] + ].index + + incoming_p = ( + n.model["Link-p"].loc[:, incoming] * n.snapshot_weightings.generators + ).sum() + outgoing_p = ( + n.model["Link-p"].loc[:, outgoing] * n.snapshot_weightings.generators + ).sum() + + lhs = incoming_p - outgoing_p + + cname = f"H2_derivate_import_limit-{ct}" + + n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") + + if cname in n.global_constraints.index: + logger.warning( + f"Global constraint {cname} already exists. Dropping and adding it again." + ) + n.global_constraints.drop(cname, inplace=True) + + n.add( + "GlobalConstraint", + cname, + constant=limit, + sense="<=", + type="", + carrier_attribute="", + ) + + +def adapt_nuclear_output(n): + logger.info( + f"limiting german electricity generation from nuclear to 2020 value of 61 TWh" + ) + limit = 61e6 + + nuclear_de_index = n.links.index[ + (n.links.carrier == "nuclear") & (n.links.index.str[:2] == "DE") + ] + + nuclear_gen = ( + n.model["Link-p"].loc[:, nuclear_de_index] + * n.links.loc[nuclear_de_index, "efficiency"] + * n.snapshot_weightings.generators + ).sum() + + lhs = nuclear_gen + + cname = f"Nuclear_generation_limit-DE" + + n.model.add_constraints(lhs <= limit, name=f"GlobalConstraint-{cname}") + + if cname in n.global_constraints.index: + logger.warning( + f"Global constraint {cname} already exists. Dropping and adding it again." + ) + n.global_constraints.drop(cname, inplace=True) + + n.add( + "GlobalConstraint", + cname, + constant=limit, + sense="<=", + type="", + carrier_attribute="", + ) + + +def additional_functionality(n, snapshots, snakemake): + logger.info("Adding Ariadne-specific functionality") + + investment_year = int(snakemake.wildcards.planning_horizons[-4:]) + constraints = snakemake.params.solving["constraints"] + + add_capacity_limits( + n, investment_year, constraints["limits_capacity_min"], "minimum" + ) + + add_capacity_limits( + n, investment_year, constraints["limits_capacity_max"], "maximum" + ) + + add_power_limits(n, investment_year, constraints["limits_power_max"]) + + if int(snakemake.wildcards.clusters) != 1: + h2_import_limits(n, investment_year, constraints["limits_volume_max"]) + + electricity_import_limits(n, investment_year, constraints["limits_volume_max"]) + + if investment_year >= 2025: + h2_production_limits( + n, + investment_year, + constraints["limits_volume_min"], + constraints["limits_volume_max"], + ) + + if not snakemake.config["run"]["debug_h2deriv_limit"]: + add_h2_derivate_limit(n, investment_year, constraints["limits_volume_max"]) + + # force_boiler_profiles_existing_per_load(n) + force_boiler_profiles_existing_per_boiler(n) + + if isinstance(constraints["co2_budget_national"], dict): + limit_countries = constraints["co2_budget_national"][investment_year] + add_co2limit_country( + n, + limit_countries, + snakemake, + debug=snakemake.config["run"]["debug_co2_limit"], + ) + else: + logger.warning("No national CO2 budget specified!") + + if investment_year == 2020: + adapt_nuclear_output(n) From 2e70a4fe08ed4d7bf782dc3cfc6ca2b7670d32ac Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 23 Jan 2025 11:39:43 +0000 Subject: [PATCH 5/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- scripts/pypsa-de/additional_functionality.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/pypsa-de/additional_functionality.py b/scripts/pypsa-de/additional_functionality.py index 9a53656d5..464c92aed 100644 --- a/scripts/pypsa-de/additional_functionality.py +++ b/scripts/pypsa-de/additional_functionality.py @@ -1,5 +1,6 @@ -import sys +# -*- coding: utf-8 -*- import logging +import sys import pandas as pd from xarray import DataArray