Skip to content

Fix/dangling #117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions operate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

"""Operate app CLI module."""
import asyncio
import atexit
import logging
import multiprocessing
import os
Expand Down Expand Up @@ -325,6 +326,9 @@ def pause_all_services_on_exit(signum: int, frame: t.Optional[FrameType]) -> Non
# on backend app started we assume there are now started agents, so we force to pause all
pause_all_services_on_startup()

# stop all services at middleware exit
atexit.register(pause_all_services)

app = FastAPI()

app.add_middleware(
Expand Down
203 changes: 169 additions & 34 deletions operate/services/deployment_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#
# ------------------------------------------------------------------------------
"""Source code to run and stop deployments created."""
import ctypes
import json
import multiprocessing
import os
Expand All @@ -30,7 +31,7 @@
from abc import ABC, ABCMeta, abstractmethod
from pathlib import Path
from traceback import print_exc
from typing import Any, List
from typing import Any, Dict, List
from venv import main as venv_cli

import psutil
Expand Down Expand Up @@ -293,28 +294,17 @@ def _start_agent(self) -> None:
env["PYTHONUTF8"] = "1"
env["PYTHONIOENCODING"] = "utf8"
env = {**os.environ, **env}
agent_runner_log_file = (
Path(self._work_directory).parent.parent.parent / "agent_runner.log"
).open("a+")
process = subprocess.Popen( # pylint: disable=consider-using-with # nosec
args=[
self._agent_runner_bin,
"-s",
"run",
], # TODO: Patch for Windows failing hash
cwd=working_dir / "agent",
stdout=agent_runner_log_file,
stderr=agent_runner_log_file,
env=env,
creationflags=(
0x00000200 if platform.system() == "Windows" else 0
), # Detach process from the main process
)

process = self._start_agent_process(env=env, working_dir=working_dir)
(working_dir / "agent.pid").write_text(
data=str(process.pid),
encoding="utf-8",
)

def _start_agent_process(self, env: Dict, working_dir: Path) -> subprocess.Popen:
"""Start agent process."""
raise NotImplementedError

def _start_tendermint(self) -> None:
"""Start tendermint process."""
working_dir = self._work_directory
Expand All @@ -327,41 +317,186 @@ def _start_tendermint(self) -> None:
**env,
}

if platform.system() == "Windows":
# to look up for bundled in tendermint.exe
env["PATH"] = os.path.dirname(sys.executable) + ";" + os.environ["PATH"]
else:
env["PATH"] = os.path.dirname(sys.executable) + ":" + os.environ["PATH"]
process = self._start_tendermint_process(env=env, working_dir=working_dir)

tendermint_com = self._tendermint_bin # type: ignore # pylint: disable=protected-access
process = subprocess.Popen( # pylint: disable=consider-using-with # nosec
args=[tendermint_com],
cwd=working_dir,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env=env,
creationflags=(
0x00000200 if platform.system() == "Windows" else 0
), # Detach process from the main process
)
(working_dir / "tendermint.pid").write_text(
data=str(process.pid),
encoding="utf-8",
)

def _start_tendermint_process(
self, env: Dict, working_dir: Path
) -> subprocess.Popen:
raise NotImplementedError


class PyInstallerHostDeploymentRunnerMac(PyInstallerHostDeploymentRunner):
"""Mac deployment runner."""

def _start_agent_process(self, env: Dict, working_dir: Path) -> subprocess.Popen:
"""Start agent process."""
agent_runner_log_file = (
Path(self._work_directory).parent.parent.parent / "agent_runner.log"
).open("a+")
process = subprocess.Popen( # pylint: disable=consider-using-with,subprocess-popen-preexec-fn # nosec
args=[
self._agent_runner_bin,
"-s",
"run",
],
cwd=working_dir / "agent",
stdout=agent_runner_log_file,
stderr=agent_runner_log_file,
env=env,
preexec_fn=os.setpgrp,
)
return process

def _start_tendermint_process(
self, env: Dict, working_dir: Path
) -> subprocess.Popen:
"""Start tendermint process."""
env = {
**env,
}
env["PATH"] = os.path.dirname(sys.executable) + ":" + os.environ["PATH"]

process = subprocess.Popen( # pylint: disable=consider-using-with,subprocess-popen-preexec-fn # nosec
args=[self._tendermint_bin],
cwd=working_dir,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env=env,
preexec_fn=os.setpgrp, # pylint: disable=subprocess-popen-preexec-fn # nosec
)
return process


class PyInstallerHostDeploymentRunnerWindows(PyInstallerHostDeploymentRunner):
"""Windows deployment runner."""

def __init__(self, work_directory: Path) -> None:
"""Init the runner."""
super().__init__(work_directory)
self._job = self.set_windows_object_job()

@staticmethod
def set_windows_object_job() -> Any:
"""Set windows job object to handle sub processes."""
from ctypes import ( # type: ignore # pylint:disable=import-outside-toplevel,reimported
wintypes,
)

kernel32 = ctypes.windll.kernel32 # type: ignore

class JOBOBJECT_BASIC_LIMIT_INFORMATION(
ctypes.Structure
): # pylint: disable=missing-class-docstring
_fields_ = [
("PerProcessUserTimeLimit", wintypes.LARGE_INTEGER),
("PerJobUserTimeLimit", wintypes.LARGE_INTEGER),
("LimitFlags", wintypes.DWORD),
("MinimumWorkingSetSize", ctypes.c_size_t),
("MaximumWorkingSetSize", ctypes.c_size_t),
("ActiveProcessLimit", wintypes.DWORD),
("Affinity", ctypes.POINTER(wintypes.ULONG)),
("PriorityClass", wintypes.DWORD),
("SchedulingClass", wintypes.DWORD),
]

class IO_COUNTERS(ctypes.Structure): # pylint: disable=missing-class-docstring
_fields_ = [
("ReadOperationCount", ctypes.c_ulonglong),
("WriteOperationCount", ctypes.c_ulonglong),
("OtherOperationCount", ctypes.c_ulonglong),
("ReadTransferCount", ctypes.c_ulonglong),
("WriteTransferCount", ctypes.c_ulonglong),
("OtherTransferCount", ctypes.c_ulonglong),
]

class JOBOBJECT_EXTENDED_LIMIT_INFORMATION(
ctypes.Structure
): # pylint: disable=missing-class-docstring
_fields_ = [
("BasicLimitInformation", JOBOBJECT_BASIC_LIMIT_INFORMATION),
("IoInfo", IO_COUNTERS),
("ProcessMemoryLimit", ctypes.c_size_t),
("JobMemoryLimit", ctypes.c_size_t),
("PeakProcessMemoryUsed", ctypes.c_size_t),
("PeakJobMemoryUsed", ctypes.c_size_t),
]

# Создаем Job Object
job = kernel32.CreateJobObjectW(None, None)
if not job:
raise ctypes.WinError() # type: ignore

# Настраиваем автоматическое завершение процессов при закрытии Job
info = JOBOBJECT_EXTENDED_LIMIT_INFORMATION()
info.BasicLimitInformation.LimitFlags = (
0x2000 # JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
)

if not kernel32.SetInformationJobObject(
job,
9, # JobObjectExtendedLimitInformation
ctypes.byref(info),
ctypes.sizeof(info),
):
kernel32.CloseHandle(job)
raise ctypes.WinError() # type: ignore

return job

def assign_to_job(self, pid: int) -> None:
"""Windows-only: привязывает процесс к Job Object."""
ctypes.windll.kernel32.AssignProcessToJobObject(self._job, pid) # type: ignore

@property
def _tendermint_bin(self) -> str:
"""Return tendermint path."""
return str(Path(os.path.dirname(sys.executable)) / "tendermint_win.exe") # type: ignore # pylint: disable=protected-access

def _start_agent_process(self, env: Dict, working_dir: Path) -> subprocess.Popen:
"""Start agent process."""
agent_runner_log_file = (
Path(self._work_directory).parent.parent.parent / "agent_runner.log"
).open("a+")
process = subprocess.Popen( # pylint: disable=consider-using-with # nosec
args=[
self._agent_runner_bin,
"-s",
"run",
], # TODO: Patch for Windows failing hash
cwd=working_dir / "agent",
stdout=agent_runner_log_file,
stderr=agent_runner_log_file,
env=env,
creationflags=0x00000200, # Detach process from the main process
)
self.assign_to_job(process._handle) # type: ignore # pylint: disable=protected-access
return process

def _start_tendermint_process(
self, env: Dict, working_dir: Path
) -> subprocess.Popen:
"""Start tendermint process."""
env = {
**env,
}
env["PATH"] = os.path.dirname(sys.executable) + ";" + os.environ["PATH"]

process = subprocess.Popen( # pylint: disable=consider-using-with # nosec
args=[self._tendermint_bin],
cwd=working_dir,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env=env,
creationflags=0x00000200, # Detach process from the main process
)
self.assign_to_job(process._handle) # type: ignore # pylint: disable=protected-access
return process


class HostPythonHostDeploymentRunner(BaseDeploymentRunner):
"""Deployment runner for host installed python."""
Expand Down