Skip to content

Commit 138b5e4

Browse files
hanwen-clusterhanwen-pcluste
authored andcommitted
[integ-test-framework] Terminate processes where tests have finished
Some processes could hang in `sleeping` status forever. The suspected reason is that when tests fail, sometimes network connections are not closed, causing the parent process to hang in `sleeping`. Prior to this commit, we have a 4-hour timeout for processes. This commit adds another check to read the Pytest logs to terminate processes where tests have finished for more than 30 minutes. Signed-off-by: Hanwen <hanwenli@amazon.com>
1 parent eee1d6e commit 138b5e4

File tree

1 file changed

+32
-0
lines changed

1 file changed

+32
-0
lines changed

tests/integration-tests/framework/fixture_utils.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import logging
1010
import os
1111
import pickle
12+
import re
1213
import time
1314
from dataclasses import dataclass, field
1415
from inspect import getfullargspec, isgeneratorfunction
@@ -53,13 +54,15 @@ def __init__(
5354
fixture_func_args: tuple,
5455
fixture_func_kwargs: dict,
5556
xdist_worker_id_and_pid: str,
57+
log_file: str,
5658
):
5759
self.name = name
5860
self.shared_save_location = shared_save_location
5961
self.fixture_func = fixture_func
6062
self.fixture_func_args = fixture_func_args
6163
self.fixture_func_kwargs = fixture_func_kwargs
6264
self.xdist_worker_id_and_pid = xdist_worker_id_and_pid
65+
self.log_file = log_file
6366
self._lock_file = shared_save_location / f"{name}.lock"
6467
self._fixture_file = shared_save_location / f"{name}.fixture"
6568
self._generator = None
@@ -95,6 +98,19 @@ def _terminate_process(pid: int):
9598
except Exception:
9699
logging.error("Error terminating process %s.", pid)
97100

101+
@staticmethod
102+
def _completed_in_the_past(minutes: int, line: str):
103+
if line is None or "- Completed test" not in line:
104+
return False
105+
regex = r"^(.*?) \- .*"
106+
re_result = re.search(regex, line)
107+
if re_result:
108+
date = re_result.group(1)
109+
try:
110+
return (time.time() - time.mktime(time.strptime(date, "%Y-%m-%d %H:%M:%S,%f"))) / 60 > minutes
111+
except Exception:
112+
return False
113+
98114
def release(self):
99115
"""
100116
Release a shared fixture.
@@ -119,6 +135,16 @@ def release(self):
119135
)
120136
time.sleep(30)
121137

138+
last_message_of_each_proc = {}
139+
with open(self.log_file, "r") as f:
140+
lines = f.readlines()
141+
for line in lines:
142+
regex = r"^.* \- .* \- (\d+) - .*"
143+
re_result = re.search(regex, line)
144+
if re_result:
145+
pid = re.search(regex, line).group(1)
146+
last_message_of_each_proc[pid] = line
147+
122148
with FileLock(self._lock_file):
123149
for worker in data.currently_using_processes.copy():
124150
pid = int(worker.split(" ")[1])
@@ -132,6 +158,11 @@ def release(self):
132158
data.counter,
133159
)
134160
self._save_fixture_data(data)
161+
elif self._completed_in_the_past(30, last_message_of_each_proc.get(str(pid))):
162+
logging.warning(
163+
"%s is sleeping but the test has been finished. Terminating the process... ", worker
164+
)
165+
self._terminate_process(pid)
135166
data = self._load_fixture_data()
136167

137168
if time.time() > timeout:
@@ -222,6 +253,7 @@ def _xdist_session_fixture(request, *args, **kwargs):
222253
fixture_func_args=args,
223254
fixture_func_kwargs=kwargs,
224255
xdist_worker_id_and_pid=f"{xdist_worker_id}: {pid}",
256+
log_file=request.config.getoption("tests_log_file"),
225257
)
226258
try:
227259
yield shared_fixture.acquire().fixture_return_value

0 commit comments

Comments
 (0)