Skip to content

Modelica system cleanup2 #297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 11, 2025
63 changes: 36 additions & 27 deletions OMPython/ModelicaSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,16 @@
import warnings
import xml.etree.ElementTree as ET

from OMPython.OMCSession import OMCSessionZMQ, OMCSessionException
from OMPython.OMCSession import OMCSessionException, OMCSessionZMQ

# define logger using the current module name as ID
logger = logging.getLogger(__name__)


class ModelicaSystemError(Exception):
pass
"""
Exception used in ModelicaSystem and ModelicaSystemCmd classes.
"""


@dataclass
Expand Down Expand Up @@ -235,8 +237,8 @@ def run(self) -> int:
if not path_bat.exists():
raise ModelicaSystemError("Batch file (*.bat) does not exist " + str(path_bat))

with open(path_bat, 'r') as file:
for line in file:
with open(file=path_bat, mode='r', encoding='utf-8') as fh:
for line in fh:
match = re.match(r"^SET PATH=([^%]*)", line, re.IGNORECASE)
if match:
path_dll = match.group(1).strip(';') # Remove any trailing semicolons
Expand All @@ -248,7 +250,7 @@ def run(self) -> int:

try:
cmdres = subprocess.run(cmdl, capture_output=True, text=True, env=my_env, cwd=self._runpath,
timeout=self._timeout)
timeout=self._timeout, check=True)
stdout = cmdres.stdout.strip()
stderr = cmdres.stderr.strip()
returncode = cmdres.returncode
Expand All @@ -257,8 +259,8 @@ def run(self) -> int:

if stderr:
raise ModelicaSystemError(f"Error running command {repr(cmdl)}: {stderr}")
except subprocess.TimeoutExpired:
raise ModelicaSystemError(f"Timeout running command {repr(cmdl)}")
except subprocess.TimeoutExpired as ex:
raise ModelicaSystemError(f"Timeout running command {repr(cmdl)}") from ex
except subprocess.CalledProcessError as ex:
raise ModelicaSystemError(f"Error running command {repr(cmdl)}") from ex

Expand Down Expand Up @@ -298,7 +300,7 @@ def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, str]]]:
override_dict = {}
for item in override.split(','):
kv = item.split('=')
if not (0 < len(kv) < 3):
if not 0 < len(kv) < 3:
raise ModelicaSystemError(f"Invalid value for '-override': {override}")
if kv[0]:
try:
Expand All @@ -322,7 +324,7 @@ def __init__(
customBuildDirectory: Optional[str | os.PathLike | pathlib.Path] = None,
omhome: Optional[str] = None,
session: Optional[OMCSessionZMQ] = None,
build: Optional[bool] = True,
build: bool = True,
) -> None:
"""Initialize, load and build a model.

Expand Down Expand Up @@ -573,9 +575,11 @@ def getQuantities(self, names=None): # 3
"""
if names is None:
return self.quantitiesList
elif isinstance(names, str):

if isinstance(names, str):
return [x for x in self.quantitiesList if x["name"] == names]
elif isinstance(names, list):

if isinstance(names, list):
return [x for y in names for x in self.quantitiesList if x["name"] == y]

raise ModelicaSystemError("Unhandled input for getQuantities()")
Expand All @@ -591,9 +595,11 @@ def getContinuous(self, names=None): # 4
if not self.simulationFlag:
if names is None:
return self.continuouslist
elif isinstance(names, str):

if isinstance(names, str):
return [self.continuouslist.get(names, "NotExist")]
elif isinstance(names, list):

if isinstance(names, list):
return [self.continuouslist.get(x, "NotExist") for x in names]
else:
if names is None:
Expand All @@ -605,15 +611,15 @@ def getContinuous(self, names=None): # 4
raise ModelicaSystemError(f"{i} could not be computed") from ex
return self.continuouslist

elif isinstance(names, str):
if isinstance(names, str):
if names in self.continuouslist:
value = self.getSolutions(names)
self.continuouslist[names] = value[0][-1]
return [self.continuouslist.get(names)]
else:
raise ModelicaSystemError(f"{names} is not continuous")

elif isinstance(names, list):
if isinstance(names, list):
valuelist = []
for i in names:
if i in self.continuouslist:
Expand Down Expand Up @@ -851,9 +857,9 @@ def simulate(self, resultfile: Optional[str] = None, simflags: Optional[str] = N
tmpdict = self.overridevariables.copy()
tmpdict.update(self.simoptionsoverride)
# write to override file
with open(overrideFile, "w") as file:
with open(file=overrideFile, mode="w", encoding="utf-8") as fh:
for key, value in tmpdict.items():
file.write(f"{key}={value}\n")
fh.write(f"{key}={value}\n")

om_cmd.arg_set(key="overrideFile", val=overrideFile.as_posix())

Expand Down Expand Up @@ -909,14 +915,16 @@ def getSolutions(self, varList=None, resultfile=None): # 12
self.sendExpression("closeSimulationResultFile()")
if varList is None:
return resultVars
elif isinstance(varList, str):

if isinstance(varList, str):
if varList not in resultVars and varList != "time":
raise ModelicaSystemError(f"Requested data {repr(varList)} does not exist")
res = self.sendExpression(f'readSimulationResult("{resFile}", {{{varList}}})')
npRes = np.array(res)
self.sendExpression("closeSimulationResultFile()")
return npRes
elif isinstance(varList, list):

if isinstance(varList, list):
for var in varList:
if var == "time":
continue
Expand All @@ -934,7 +942,8 @@ def getSolutions(self, varList=None, resultfile=None): # 12
def _strip_space(name):
if isinstance(name, str):
return name.replace(" ", "")
elif isinstance(name, list):

if isinstance(name, list):
return [x.replace(" ", "") for x in name]

raise ModelicaSystemError("Unhandled input for strip_space()")
Expand Down Expand Up @@ -1051,7 +1060,7 @@ def setInputs(self, name): # 15
value = name.split("=")
if value[0] in self.inputlist:
tmpvalue = eval(value[1])
if isinstance(tmpvalue, int) or isinstance(tmpvalue, float):
if isinstance(tmpvalue, (int, float)):
self.inputlist[value[0]] = [(float(self.simulateOptions["startTime"]), float(value[1])),
(float(self.simulateOptions["stopTime"]), float(value[1]))]
elif isinstance(tmpvalue, list):
Expand All @@ -1066,7 +1075,7 @@ def setInputs(self, name): # 15
value = var.split("=")
if value[0] in self.inputlist:
tmpvalue = eval(value[1])
if isinstance(tmpvalue, int) or isinstance(tmpvalue, float):
if isinstance(tmpvalue, (int, float)):
self.inputlist[value[0]] = [(float(self.simulateOptions["startTime"]), float(value[1])),
(float(self.simulateOptions["stopTime"]), float(value[1]))]
elif isinstance(tmpvalue, list):
Expand Down Expand Up @@ -1132,8 +1141,8 @@ def createCSVData(self) -> pathlib.Path:

csvFile = self.tempdir / f'{self.modelName}.csv'

with open(csvFile, "w", newline="") as f:
writer = csv.writer(f)
with open(file=csvFile, mode="w", encoding="utf-8", newline="") as fh:
writer = csv.writer(fh)
writer.writerows(csv_rows)

return csvFile
Expand Down Expand Up @@ -1234,11 +1243,11 @@ def load_module_from_path(module_name, file_path):

overrideLinearFile = self.tempdir / f'{self.modelName}_override_linear.txt'

with open(overrideLinearFile, "w") as file:
with open(file=overrideLinearFile, mode="w", encoding="utf-8") as fh:
for key, value in self.overridevariables.items():
file.write(f"{key}={value}\n")
fh.write(f"{key}={value}\n")
for key, value in self.linearOptions.items():
file.write(f"{key}={value}\n")
fh.write(f"{key}={value}\n")

om_cmd.arg_set(key="overrideFile", val=overrideLinearFile.as_posix())

Expand Down
Loading