Skip to content

Commit f3836ab

Browse files
Wheestpre-commit-ci[bot]skshetry
authored
[cli,lock] Allow wait_for_lock (#10784)
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Saugat Pachhai (सौगात) <suagatchhetri@outlook.com>
1 parent e278c9c commit f3836ab

File tree

6 files changed

+125
-20
lines changed

6 files changed

+125
-20
lines changed

dvc/cli/command.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ def __init__(self, args: Any):
1818

1919
os.chdir(args.cd)
2020

21-
self.repo: Repo = Repo(uninitialized=self.UNINITIALIZED)
21+
self.repo: Repo = Repo(
22+
uninitialized=self.UNINITIALIZED,
23+
_wait_for_lock=args.wait_for_lock,
24+
)
2225
self.config: Config = self.repo.config
2326
self.args = args
2427

dvc/cli/parser.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,14 @@ def get_main_parser():
200200
type=str,
201201
)
202202

203+
parser.add_argument(
204+
"--wait-for-lock",
205+
action="store_true",
206+
default=False,
207+
help="Wait for the lock if it is already held by another process, instead of"
208+
" failing immediately.",
209+
)
210+
203211
# Sub commands
204212
subparsers = parser.add_subparsers(
205213
title="Available Commands",

dvc/lock.py

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22

33
import hashlib
44
import os
5+
import time
56
from abc import ABC, abstractmethod
67
from datetime import timedelta
78
from typing import Optional, Union
89

910
import flufl.lock
1011
import zc.lockfile
11-
from funcy import retry
1212

1313
from dvc.exceptions import DvcException
1414
from dvc.progress import Tqdm
@@ -88,9 +88,10 @@ class Lock(LockBase):
8888
Uses zc.lockfile as backend.
8989
"""
9090

91-
def __init__(self, lockfile, friendly=False, **kwargs):
91+
def __init__(self, lockfile, friendly=False, wait=False, **kwargs):
9292
super().__init__(lockfile)
9393
self._friendly = friendly
94+
self._wait = wait
9495
self._lock = None
9596
self._lock_failed = False
9697

@@ -101,23 +102,38 @@ def files(self):
101102
def _do_lock(self):
102103
try:
103104
self._lock_failed = False
104-
with Tqdm(
105-
bar_format="{desc}",
106-
disable=not self._friendly,
107-
desc="If DVC froze, see `hardlink_lock` in {}".format(
108-
format_link("https://man.dvc.org/config#core")
109-
),
110-
):
111-
self._lock = zc.lockfile.LockFile(self._lockfile)
105+
self._lock = zc.lockfile.LockFile(self._lockfile)
112106
except zc.lockfile.LockError:
113107
self._lock_failed = True
114108
raise LockError(FAILED_TO_LOCK_MESSAGE) # noqa: B904
115109

116110
def lock(self):
117-
retries = 6
118-
delay = DEFAULT_TIMEOUT / retries
119-
lock_retry = retry(retries, LockError, timeout=delay)(self._do_lock)
120-
lock_retry()
111+
"""Acquire the lock, either waiting forever, or after default_retries."""
112+
default_retries = 6
113+
delay = DEFAULT_TIMEOUT / default_retries
114+
attempts = 0
115+
116+
max_retries = float("inf") if self._wait else default_retries
117+
118+
with Tqdm(
119+
bar_format="{desc}",
120+
disable=not self._friendly,
121+
desc="Waiting to acquire lock. "
122+
"If DVC froze, see `hardlink_lock` in {}.".format(
123+
format_link("https://man.dvc.org/config#core")
124+
),
125+
) as pbar:
126+
while True:
127+
try:
128+
self._do_lock()
129+
return
130+
except LockError:
131+
attempts += 1
132+
if attempts > max_retries:
133+
raise
134+
time.sleep(delay)
135+
finally:
136+
pbar.update()
121137

122138
def unlock(self):
123139
if self._lock_failed:
@@ -150,7 +166,7 @@ class HardlinkLock(flufl.lock.Lock, LockBase):
150166
tmp_dir (str): a directory to store claim files.
151167
"""
152168

153-
def __init__(self, lockfile, tmp_dir=None, **kwargs):
169+
def __init__(self, lockfile, tmp_dir=None, wait=False, **kwargs):
154170
import socket
155171

156172
self._tmp_dir = tmp_dir
@@ -173,11 +189,21 @@ def __init__(self, lockfile, tmp_dir=None, **kwargs):
173189
self._separator = flufl.lock.SEP
174190
self._set_claimfile()
175191
self._owned = True
192+
self._wait = wait
176193
self._retry_errnos = []
194+
self._friendly = kwargs.get("friendly", False)
177195

178196
def lock(self, timeout: Optional[Union[timedelta, int]] = None):
179197
try:
180-
super().lock(timeout or timedelta(seconds=DEFAULT_TIMEOUT))
198+
if not self._wait:
199+
timeout = timeout or timedelta(seconds=DEFAULT_TIMEOUT)
200+
201+
with Tqdm(
202+
bar_format="{desc}",
203+
disable=not (self._wait and self._friendly),
204+
desc="Waiting to acquire lock",
205+
):
206+
super().lock(timeout)
181207
except flufl.lock.TimeOutError:
182208
raise LockError(FAILED_TO_LOCK_MESSAGE) # noqa: B904
183209

@@ -191,6 +217,6 @@ def _set_claimfile(self):
191217
self._claimfile = os.path.join(self._tmp_dir, filename + ".lock")
192218

193219

194-
def make_lock(lockfile, tmp_dir=None, friendly=False, hardlink_lock=False):
220+
def make_lock(lockfile, tmp_dir=None, friendly=False, hardlink_lock=False, wait=False):
195221
cls = HardlinkLock if hardlink_lock else Lock
196-
return cls(lockfile, tmp_dir=tmp_dir, friendly=friendly)
222+
return cls(lockfile, tmp_dir=tmp_dir, friendly=friendly, wait=wait)

dvc/repo/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ def __init__( # noqa: PLR0915, PLR0913
144144
scm: Optional[Union["Git", "NoSCM"]] = None,
145145
remote: Optional[str] = None,
146146
remote_config: Optional["DictStrAny"] = None,
147+
_wait_for_lock: bool = False,
147148
):
148149
from dvc.cachemgr import CacheManager
149150
from dvc.data_cloud import DataCloud
@@ -167,6 +168,7 @@ def __init__( # noqa: PLR0915, PLR0913
167168
self._remote = remote
168169
self._remote_config = remote_config
169170
self._data_index = None
171+
self._wait_for_lock = _wait_for_lock
170172

171173
if rev and not fs:
172174
self._scm = scm = SCM(root_dir or os.curdir)
@@ -204,6 +206,7 @@ def __init__( # noqa: PLR0915, PLR0913
204206
tmp_dir=self.tmp_dir,
205207
hardlink_lock=self.config["core"].get("hardlink_lock", False),
206208
friendly=True,
209+
wait=self._wait_for_lock,
207210
)
208211
os.makedirs(self.site_cache_dir, exist_ok=True)
209212
if not fs and (

tests/func/test_cli.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ def run(self):
124124
class A:
125125
quiet = False
126126
verbose = True
127+
wait_for_lock = False
127128
cd = os.path.pardir
128129

129130
args = A()
@@ -139,6 +140,7 @@ def run(self):
139140
class A:
140141
quiet = False
141142
verbose = True
143+
wait_for_lock = False
142144
cd = os.path.pardir
143145

144146
parent_dir = os.path.realpath(os.path.pardir)

tests/func/test_lock.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
import multiprocessing
2+
import time
3+
14
import pytest
25

36
from dvc.cli import main
47
from dvc.exceptions import DvcException
5-
from dvc.lock import Lock, LockError
8+
from dvc.lock import Lock, LockError, make_lock
69

710

811
def test_with(tmp_dir, dvc, mocker):
@@ -52,3 +55,63 @@ def test_cli(tmp_dir, dvc, mocker, caplog):
5255
with Lock(tmp_dir / dvc.tmp_dir / "lock"):
5356
assert main(["add", "foo"]) == 1
5457
assert expected_error_msg in caplog.text
58+
59+
60+
def hold_lock_until_signaled(lockfile_path, result_queue, release_signal):
61+
lock = make_lock(lockfile_path)
62+
with lock:
63+
result_queue.put("p1_acquired")
64+
release_signal.wait()
65+
result_queue.put("p1_released")
66+
67+
68+
def try_lock_with_wait(lockfile_path, wait, result_queue):
69+
result_queue.put("p2_starting")
70+
try:
71+
lock = make_lock(lockfile_path, wait=wait)
72+
with lock:
73+
result_queue.put("p2_acquired")
74+
except LockError as e:
75+
result_queue.put(f"error: {e}")
76+
else:
77+
result_queue.put("p2_released")
78+
79+
80+
def test_lock_waits_when_requested(request, tmp_path):
81+
lockfile = tmp_path / "lock"
82+
83+
q: multiprocessing.Queue[str] = multiprocessing.Queue()
84+
release_signal = multiprocessing.Event()
85+
# Process 1 holds the lock until signaled to release it
86+
p1 = multiprocessing.Process(
87+
target=hold_lock_until_signaled, args=(lockfile, q, release_signal)
88+
)
89+
p2 = multiprocessing.Process(target=try_lock_with_wait, args=(lockfile, True, q))
90+
91+
p1.start()
92+
request.addfinalizer(p1.kill)
93+
94+
assert q.get(timeout=1) == "p1_acquired"
95+
96+
# Process 2 will wait for the lock (should succeed)
97+
p2.start()
98+
request.addfinalizer(p2.kill)
99+
100+
assert q.get(timeout=1) == "p2_starting"
101+
assert q.empty()
102+
103+
# sleep to ensure Process 2 is waiting for the lock
104+
time.sleep(1)
105+
release_signal.set() # release the lock
106+
107+
p1.join(timeout=4)
108+
109+
events = [q.get(timeout=2), q.get(timeout=2), q.get(timeout=2)]
110+
# we still can't be sure of the order of events.
111+
assert "p1_released" in events
112+
assert "p2_acquired" in events
113+
assert "p2_released" in events
114+
115+
p2.join(timeout=1)
116+
117+
assert q.empty()

0 commit comments

Comments
 (0)