Skip to content

Commit 179d424

Browse files
authored
Pseudo atomic annotation file write (#3953)
* Add AtomicWriteFileHandle to utils.py * Update convert.py with AtomicWriteFileHandle * Test AtomicWriteFileHandle with test_utils.py * Start all threads at once in TestAtomicWriteFileHandle * Create temporary file in target directory * Avoid too much lines in utils.py * Correct description remove unnecessary enumeration
1 parent 9c6d95a commit 179d424

File tree

2 files changed

+106
-3
lines changed

2 files changed

+106
-3
lines changed

tools/accuracy_checker/accuracy_checker/annotation_converters/convert.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616
import warnings
1717
import platform
1818
import sys
19-
19+
import os
2020
import copy
2121
import json
22+
import tempfile
2223
from pathlib import Path
2324
import pickle # nosec B403 # disable import-pickle check
2425
from argparse import ArgumentParser
@@ -327,7 +328,7 @@ def save_annotation(annotation, meta, annotation_file, meta_file, dataset_config
327328
annotation_dir = annotation_file.parent
328329
if not annotation_dir.exists():
329330
annotation_dir.mkdir(parents=True)
330-
with annotation_file.open('wb') as file:
331+
with AtomicWriteFileHandle(annotation_file,'wb') as file:
331332
if conversion_meta:
332333
pickle.dump(conversion_meta, file)
333334
for representation in annotation:
@@ -337,7 +338,7 @@ def save_annotation(annotation, meta, annotation_file, meta_file, dataset_config
337338
meta_dir = meta_file.parent
338339
if not meta_dir.exists():
339340
meta_dir.mkdir(parents=True)
340-
with meta_file.open('wt') as file:
341+
with AtomicWriteFileHandle(meta_file, 'wt') as file:
341342
json.dump(meta, file)
342343

343344

@@ -409,3 +410,38 @@ def analyze_dataset(annotations, metadata):
409410
else:
410411
metadata = {'data_analysis': data_analysis}
411412
return metadata
413+
414+
class AtomicWriteFileHandle:
415+
"""Ensure the file is written once in case of multi processes or threads."""
416+
417+
def __init__(self, file_path, open_mode):
418+
self.target_path = file_path
419+
self.mode = open_mode
420+
421+
self.temp_fd, self.temp_path = tempfile.mkstemp(dir=os.path.dirname(file_path))
422+
self.temp_file = os.fdopen(self.temp_fd, open_mode)
423+
424+
def write(self, data):
425+
self.temp_file.write(data)
426+
427+
def writelines(self, lines):
428+
self.temp_file.writelines(lines)
429+
430+
def close(self):
431+
if not self.temp_file.closed:
432+
self.temp_file.close()
433+
if not os.path.exists(self.target_path):
434+
os.rename(self.temp_path, self.target_path)
435+
else:
436+
os.remove(self.temp_path)
437+
438+
def __enter__(self):
439+
return self
440+
441+
def __exit__(self, exc_type, exc_val, exc_tb):
442+
self.close()
443+
444+
# Mimic other file object methods as needed
445+
def __getattr__(self, item):
446+
"""Delegate attribute access to the underlying temporary file object."""
447+
return getattr(self.temp_file, item)
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""
2+
Copyright (c) 2018-2024 Intel Corporation
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
"""
16+
17+
import os
18+
import threading
19+
import warnings
20+
from accuracy_checker.annotation_converters.convert import AtomicWriteFileHandle
21+
22+
def thread_access_file(file_path, data_dict, thread_id, write_lines):
23+
if os.path.exists(file_path):
24+
with open(file_path, 'r') as file:
25+
read_lines = len(file.readlines())
26+
# when a new thread reads a file, all lines must already be written
27+
if read_lines != write_lines:
28+
warn_message = f"Thread {thread_id}: Incorrect number of lines read from {file_path} ({read_lines} != {write_lines})"
29+
warnings.warn(warn_message)
30+
data_dict['assert'] = warn_message
31+
else:
32+
with AtomicWriteFileHandle(file_path, 'wt') as file:
33+
for i in range(write_lines):
34+
file.write(f"Thread {thread_id}:Line{i} {data_dict[thread_id]}\n")
35+
36+
class TestAtomicWriteFileHandle:
37+
38+
def test_multithreaded_atomic_file_write(self):
39+
target_file_path = "test_atomic_file.txt"
40+
threads = []
41+
num_threads = 10
42+
write_lines = 10
43+
data_chunks = [f"Data chunk {i}" for i in range(num_threads)]
44+
threads_dict = {i: data_chunks[i] for i in range(len(data_chunks))}
45+
46+
if os.path.exists(target_file_path):
47+
os.remove(target_file_path)
48+
49+
for i in range(num_threads):
50+
thread = threading.Thread(target=thread_access_file, args=(target_file_path, threads_dict, i, write_lines))
51+
threads.append(thread)
52+
53+
for thread in threads:
54+
thread.start()
55+
56+
for thread in threads:
57+
thread.join()
58+
59+
with open(target_file_path, 'r') as file:
60+
lines = file.readlines()
61+
62+
os.remove(target_file_path)
63+
64+
# check asserts passed from threads
65+
assert 'assert' not in threads_dict.keys() , threads_dict['assert']
66+
67+
assert sum(1 for line in lines for data_chunk in data_chunks if data_chunk in line) == write_lines, f"data_chunks data not found in the {target_file_path} file"

0 commit comments

Comments
 (0)