Skip to content

Commit 5dbabf0

Browse files
authored
Merge pull request #219 from Labelbox/ms/fix-deadlock
fix deadlock
2 parents 9170913 + aec395b commit 5dbabf0

File tree

1 file changed

+10
-5
lines changed

1 file changed

+10
-5
lines changed

labelbox/data/generator.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import threading
33
from queue import Queue
44
from typing import Any, Iterable
5-
from concurrent.futures import ThreadPoolExecutor
5+
import threading
66

77
logger = logging.getLogger(__name__)
88

@@ -45,10 +45,13 @@ def __init__(self, data: Iterable[Any], prefetch_limit=20, num_executors=4):
4545
# Can only iterate over once it the queue.get hangs forever.
4646
self.done = False
4747
self.num_executors = num_executors
48-
with ThreadPoolExecutor(max_workers=num_executors) as executor:
49-
self.futures = [
50-
executor.submit(self.fill_queue) for _ in range(num_executors)
51-
]
48+
self.threads = [
49+
threading.Thread(target=self.fill_queue)
50+
for _ in range(num_executors)
51+
]
52+
for thread in self.threads:
53+
thread.daemon = True
54+
thread.start()
5255

5356
def _process(self, value) -> Any:
5457
raise NotImplementedError("Abstract method needs to be implemented")
@@ -77,6 +80,8 @@ def __next__(self) -> Any:
7780
self.completed_threads += 1
7881
if self.completed_threads == self.num_executors:
7982
self.done = True
83+
for thread in self.threads:
84+
thread.join()
8085
raise StopIteration
8186
value = self.queue.get()
8287
return value

0 commit comments

Comments
 (0)