Skip to content

Commit 1ea5dbb

Browse files
FIX: Parallel build JSON file issues (#13241)
Co-authored-by: Eric Larson <larson.eric.d@gmail.com>
1 parent cc5e2e6 commit 1ea5dbb

File tree

8 files changed

+188
-7
lines changed

8 files changed

+188
-7
lines changed

doc/changes/devel/13241.bugfix.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improved the configuration json to handle with parallel access with file lock configuration by :newcontrib:`Bruno Aristimunha`.

doc/changes/names.inc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
.. _Austin Hurst: https://github.com/a-hurst
3636
.. _Ben Beasley: https://github.com/musicinmybrain
3737
.. _Britta Westner: https://britta-wstnr.github.io
38+
.. _Bruno Aristimunha: https://bruaristimunha.github.io
3839
.. _Bruno Nicenboim: https://bnicenboim.github.io
3940
.. _btkcodedev: https://github.com/btkcodedev
4041
.. _buildqa: https://github.com/buildqa

environment.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ dependencies:
1111
- dipy
1212
- edfio >=0.2.1
1313
- eeglabio
14+
- filelock >=3.18.0
1415
- h5io >=0.2.4
1516
- h5py
1617
- imageio >=2.6.1

mne/forward/_lead_dots.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from numpy.polynomial import legendre
1313

1414
from ..parallel import parallel_func
15-
from ..utils import _get_extra_data_path, fill_doc, logger, verbose
15+
from ..utils import _get_extra_data_path, _open_lock, fill_doc, logger, verbose
1616

1717
##############################################################################
1818
# FAST LEGENDRE (DERIVATIVE) POLYNOMIALS USING LOOKUP TABLE
@@ -80,11 +80,11 @@ def _get_legen_table(
8080
x_interp = np.linspace(-1, 1, n_interp + 1)
8181
lut = leg_fun(x_interp, n_coeff).astype(np.float32)
8282
if not force_calc:
83-
with open(fname, "wb") as fid:
83+
with _open_lock(fname, "wb") as fid:
8484
fid.write(lut.tobytes())
8585
else:
8686
logger.info(f"Reading Legendre{extra_str} table...")
87-
with open(fname, "rb", buffering=0) as fid:
87+
with _open_lock(fname, "rb", buffering=0) as fid:
8888
lut = np.fromfile(fid, np.float32)
8989
lut.shape = lut_shape
9090

mne/utils/__init__.pyi

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ __all__ = [
8888
"_julian_to_date",
8989
"_mask_to_onsets_offsets",
9090
"_on_missing",
91+
"_open_lock",
9192
"_parse_verbose",
9293
"_path_like",
9394
"_pl",
@@ -280,6 +281,7 @@ from .config import (
280281
_get_numpy_libs,
281282
_get_root_dir,
282283
_get_stim_channel,
284+
_open_lock,
283285
get_config,
284286
get_config_path,
285287
get_subjects_dir,

mne/utils/config.py

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
# Copyright the MNE-Python contributors.
66

77
import atexit
8+
import contextlib
89
import json
910
import multiprocessing
1011
import os
@@ -23,7 +24,13 @@
2324
from packaging.version import parse
2425

2526
from ._logging import logger, warn
26-
from .check import _check_fname, _check_option, _check_qt_version, _validate_type
27+
from .check import (
28+
_check_fname,
29+
_check_option,
30+
_check_qt_version,
31+
_soft_import,
32+
_validate_type,
33+
)
2734
from .docs import fill_doc
2835
from .misc import _pl
2936

@@ -218,9 +225,53 @@ def set_memmap_min_size(memmap_min_size):
218225
)
219226

220227

228+
@contextlib.contextmanager
229+
def _open_lock(path, *args, **kwargs):
230+
"""
231+
Context manager that opens a file with an optional file lock.
232+
233+
If the `filelock` package is available, a lock is acquired on a lock file
234+
based on the given path (by appending '.lock').
235+
236+
Otherwise, a null context is used. The path is then opened in the
237+
specified mode.
238+
239+
Parameters
240+
----------
241+
path : str
242+
The path to the file to be opened.
243+
*args, **kwargs : optional
244+
Additional arguments and keyword arguments to be passed to the
245+
`open` function.
246+
247+
"""
248+
filelock = _soft_import(
249+
"filelock", purpose="parallel config set and get", strict=False
250+
)
251+
252+
lock_context = contextlib.nullcontext() # default to no lock
253+
254+
if filelock is not None:
255+
lock_path = f"{path}.lock"
256+
try:
257+
from filelock import FileLock
258+
259+
lock_context = FileLock(lock_path, timeout=5)
260+
lock_context.acquire()
261+
except TimeoutError:
262+
warn(
263+
"Could not acquire lock file after 5 seconds, consider deleting it "
264+
f"if you know the corresponding file is usable:\n{lock_path}"
265+
)
266+
lock_context = contextlib.nullcontext()
267+
268+
with lock_context, open(path, *args, **kwargs) as fid:
269+
yield fid
270+
271+
221272
def _load_config(config_path, raise_error=False):
222273
"""Safely load a config file."""
223-
with open(config_path) as fid:
274+
with _open_lock(config_path, "r+") as fid:
224275
try:
225276
config = json.load(fid)
226277
except ValueError:
@@ -398,8 +449,29 @@ def set_config(key, value, home_dir=None, set_env=True):
398449
directory = op.dirname(config_path)
399450
if not op.isdir(directory):
400451
os.mkdir(directory)
401-
with open(config_path, "w") as fid:
402-
json.dump(config, fid, sort_keys=True, indent=0)
452+
453+
# Adapting the mode depend if you are create the file
454+
# or no.
455+
mode = "r+" if op.isfile(config_path) else "w+"
456+
457+
with _open_lock(config_path, mode) as fid:
458+
try:
459+
data = json.load(fid)
460+
except (ValueError, json.JSONDecodeError) as exc:
461+
logger.info(
462+
f"Could not read the {config_path} json file during the writing."
463+
f" Assuming it is empty. Got: {exc}"
464+
)
465+
data = {}
466+
467+
if value is None:
468+
data.pop(key, None)
469+
else:
470+
data[key] = value
471+
472+
fid.seek(0)
473+
fid.truncate()
474+
json.dump(data, fid, sort_keys=True, indent=0)
403475

404476

405477
def _get_extra_data_path(home_dir=None):

mne/utils/tests/test_config.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22
# License: BSD-3-Clause
33
# Copyright the MNE-Python contributors.
44

5+
import json
56
import os
67
import platform
8+
import random
79
import re
10+
import time
811
from functools import partial
912
from pathlib import Path
1013
from urllib.error import URLError
@@ -232,3 +235,103 @@ def bad_open(url, timeout, msg):
232235
out = out.getvalue()
233236
assert "devel, " in out
234237
assert "updating.html" not in out
238+
239+
240+
def _worker_update_config_loop(home_dir, worker_id, iterations=10):
241+
"""Util function to update config in parallel.
242+
243+
Worker function that repeatedly reads the config (via get_config)
244+
and then updates it (via set_config) with a unique key/value pair.
245+
A short random sleep is added to encourage interleaving.
246+
247+
Dummy function to simulate a worker that reads and updates the config.
248+
249+
Parameters
250+
----------
251+
home_dir : str
252+
The home directory where the config file is located.
253+
worker_id : int
254+
The ID of the worker (for creating unique keys).
255+
iterations : int
256+
The number of iterations to run the loop.
257+
258+
"""
259+
for i in range(iterations):
260+
# Read current configuration (to simulate a read-modify cycle)
261+
_ = get_config(home_dir=home_dir)
262+
# Create a unique key/value pair.
263+
new_key = f"worker_{worker_id}_{i}"
264+
new_value = f"value_{worker_id}_{i}"
265+
# Update the configuration (our set_config holds the lock over the full cycle)
266+
set_config(new_key, new_value, home_dir=home_dir)
267+
time.sleep(random.uniform(0, 0.05))
268+
return worker_id
269+
270+
271+
def test_parallel_get_set_config(tmp_path: Path):
272+
"""Test that uses parallel workers to get and set config.
273+
274+
All the workers update the same configuration file concurrently. In a
275+
correct implementation with proper path file locking, the final
276+
config file remains valid JSON and includes all expected updates.
277+
278+
"""
279+
pytest.importorskip("joblib")
280+
pytest.importorskip("filelock")
281+
from joblib import Parallel, delayed
282+
283+
# Use the temporary directory as our home directory.
284+
home_dir = str(tmp_path)
285+
# get_config_path will return home_dir/.mne/mne-python.json
286+
config_file = get_config_path(home_dir=home_dir)
287+
288+
# if the config file already exists, remove it
289+
if os.path.exists(config_file):
290+
os.remove(config_file)
291+
292+
# Ensure that the .mne directory exists.
293+
config_dir = tmp_path / ".mne"
294+
config_dir.mkdir(exist_ok=True)
295+
296+
# Write an initial (valid) config file.
297+
initial_config = {"initial": "True"}
298+
with open(config_file, "w") as f:
299+
json.dump(initial_config, f)
300+
301+
n_workers = 50
302+
iterations = 10
303+
304+
# Launch multiple workers concurrently using joblib.
305+
Parallel(n_jobs=10)(
306+
delayed(_worker_update_config_loop)(home_dir, worker_id, iterations)
307+
for worker_id in range(n_workers)
308+
)
309+
310+
# Now, read back the config file.
311+
final_config = get_config(home_dir=home_dir)
312+
expected_keys = set()
313+
expected_values = set()
314+
# For each worker and iteration, check that the expected key/value pair is present.
315+
for worker_id in range(n_workers):
316+
for i in range(iterations):
317+
expected_key = f"worker_{worker_id}_{i}"
318+
expected_value = f"value_{worker_id}_{i}"
319+
320+
assert final_config.get(expected_key) == expected_value, (
321+
f"Missing or incorrect value for key {expected_key}"
322+
)
323+
expected_keys.add(expected_key)
324+
expected_values.add(expected_value)
325+
326+
# include the initial key/value pair
327+
# that was written before the workers started
328+
329+
assert len(expected_keys - set(final_config.keys())) == 0
330+
assert len(expected_values - set(final_config.values())) == 0
331+
332+
# Check that the final config is valid JSON.
333+
with open(config_file) as f:
334+
try:
335+
json.load(f)
336+
except json.JSONDecodeError as e:
337+
pytest.fail(f"Config file is not valid JSON: {e}")

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ full-no-qt = [
9393
"dipy",
9494
"edfio >= 0.2.1",
9595
"eeglabio",
96+
"filelock>=3.18.0",
9697
"h5py",
9798
"imageio >= 2.6.1",
9899
"imageio-ffmpeg >= 0.4.1",

0 commit comments

Comments
 (0)