Skip to content

Commit c9db727

Browse files
committed
WIP: repair fileio
1 parent 4d25333 commit c9db727

File tree

2 files changed

+61
-34
lines changed

2 files changed

+61
-34
lines changed

mig/shared/fileio.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,19 @@
3030
from __future__ import print_function
3131
from __future__ import absolute_import
3232

33+
import codecs
3334
import errno
3435
import fcntl
3536
import os
3637
import shutil
3738
import sys
3839
import tempfile
40+
import traceback
3941
import time
4042
import zipfile
4143

44+
PY2 = sys.version_info[0] == 2
45+
4246
# NOTE: We expose optimized walk function directly for ease and efficiency.
4347
# Requires stand-alone scandir module on python 2 whereas the native os
4448
# functions are built-in and optimized similarly on python 3+
@@ -85,6 +89,10 @@ def _auto_adjust_mode(data, mode):
8589
return mode
8690

8791

92+
def _is_string(value):
93+
return isinstance(value, unicode) if PY2 else isinstance(value, str)
94+
95+
8896
def _write_chunk(path, chunk, offset, logger=None, mode='r+b',
8997
make_parent=True, create_file=True, force_string=False):
9098
"""Internal helper to wrap writing of chunks with offset to path.
@@ -100,6 +108,12 @@ def _write_chunk(path, chunk, offset, logger=None, mode='r+b',
100108
(offset, path))
101109
return False
102110

111+
if _is_string(chunk):
112+
chunk = bytearray(chunk, 'utf8')
113+
# detect byte writes and handle explicitly in a portable way
114+
if isinstance(chunk, (bytes, bytearray)) and 'b' not in mode:
115+
mode = "%sb" % mode # appended to avoid mode ordering error on PY2
116+
103117
# create dir and file if it does not exists
104118

105119
(head, _) = os.path.split(path)
@@ -140,6 +154,7 @@ def _write_chunk(path, chunk, offset, logger=None, mode='r+b',
140154
# logger.debug("file %r chunk written at %d" % (path, offset))
141155
return True
142156
except Exception as err:
157+
#traceback.print_exc()
143158
logger.error("could not write %r chunk at %d: %s" %
144159
(path, offset, err))
145160
return False
@@ -152,9 +167,6 @@ def write_chunk(path, chunk, offset, logger, mode='r+b', force_string=False):
152167
if not logger:
153168
logger = null_logger("dummy")
154169

155-
# TODO: enable this again once throuroughly tested and assured py2+3 safe
156-
# mode = _auto_adjust_mode(chunk, mode)
157-
158170
return _write_chunk(path, chunk, offset, logger, mode,
159171
force_string=force_string)
160172

@@ -169,9 +181,6 @@ def write_file(content, path, logger, mode='w', make_parent=True, umask=None,
169181
if umask is not None:
170182
old_umask = os.umask(umask)
171183

172-
# TODO: enable this again once throuroughly tested and assured py2+3 safe
173-
#mode = _auto_adjust_mode(content, mode)
174-
175184
retval = _write_chunk(path, content, offset=0, logger=logger, mode=mode,
176185
make_parent=make_parent, create_file=False,
177186
force_string=force_string)

tests/test_mig_shared_fileio.py

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,14 @@
2828
"""Unit test fileio functions"""
2929

3030
import binascii
31+
import codecs
3132
import os
3233
import sys
3334
import unittest
3435

3536
# NOTE: wrap next imports in try except to prevent autopep8 shuffling up
3637
try:
37-
from tests.support import MigTestCase, cleanpath, temppath, testmain
38+
from tests.support import PY2, MigTestCase, cleanpath, temppath, testmain
3839
import mig.shared.fileio as fileio
3940
except ImportError as ioe:
4041
print("Failed to import mig core modules: %s" % ioe)
@@ -43,12 +44,37 @@
4344
DUMMY_BYTES = binascii.unhexlify('DEADBEEF') # 4 bytes
4445
DUMMY_BYTES_LENGTH = 4
4546
DUMMY_UNICODE = u'UniCode123½¾µßðþđŋħĸþł@ª€£$¥©®'
46-
DUMMY_UNICODE_LENGTH = len(DUMMY_UNICODE)
47+
DUMMY_UNICODE_BYTES = bytearray(DUMMY_UNICODE, 'utf8')
48+
DUMMY_UNICODE_BYTES_LENGTH = len(DUMMY_UNICODE_BYTES)
4749
DUMMY_FILE_WRITECHUNK = 'fileio/write_chunk'
4850
DUMMY_FILE_WRITEFILE = 'fileio/write_file'
4951

5052
assert isinstance(DUMMY_BYTES, bytes)
5153

54+
def as_unicode_string(value):
55+
assert isinstance(value, bytearray)
56+
return unicode(codecs.decode(value, 'utf8')) if PY2 else str(value, 'utf8')
57+
58+
class TextFile:
59+
def __init__(self, path, mode='r'):
60+
self._file = None
61+
self._path = path
62+
self._mode = "%s%s" % (mode, 'b') if 'b' not in mode else mode
63+
64+
def read(self, size):
65+
content = self._file.read(size)
66+
# always read back the content as though it was raw bytes
67+
return bytearray(content)
68+
69+
def __enter__(self):
70+
self._file = open(self._path, mode=self._mode)
71+
return self
72+
73+
def __exit__(self, *args):
74+
self._file.close()
75+
if args[1] is not None:
76+
raise args[1]
77+
5278

5379
class MigSharedFileio__write_chunk(MigTestCase):
5480
# TODO: Add docstrings to this class and its methods
@@ -58,9 +84,7 @@ def setUp(self):
5884
cleanpath(os.path.dirname(DUMMY_FILE_WRITECHUNK), self)
5985

6086
def test_return_false_on_invalid_data(self):
61-
# NOTE: we make sure to disable any forced stringification here
62-
did_succeed = fileio.write_chunk(self.tmp_path, 1234, 0, self.logger,
63-
force_string=False)
87+
did_succeed = fileio.write_chunk(self.tmp_path, 1234, 0, self.logger)
6488
self.assertFalse(did_succeed)
6589

6690
def test_return_false_on_invalid_offset(self):
@@ -110,25 +134,24 @@ def test_store_bytes_in_text_mode(self):
110134
self.assertEqual(len(content), DUMMY_BYTES_LENGTH)
111135
self.assertEqual(content[:], DUMMY_BYTES)
112136

113-
@unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select")
114137
def test_store_unicode(self):
115-
fileio.write_chunk(self.tmp_path, DUMMY_UNICODE, 0, self.logger,
116-
mode='r+')
138+
did_succeed = fileio.write_chunk(self.tmp_path, DUMMY_UNICODE, 0, self.logger,
139+
mode='r+')
140+
self.assertTrue(did_succeed)
117141

118-
with open(self.tmp_path, 'r') as file:
142+
with TextFile(self.tmp_path) as file:
119143
content = file.read(1024)
120-
self.assertEqual(len(content), DUMMY_UNICODE_LENGTH)
121-
self.assertEqual(content[:], DUMMY_UNICODE)
144+
self.assertEqual(len(content), DUMMY_UNICODE_BYTES_LENGTH)
145+
self.assertEqual(content[:], DUMMY_UNICODE_BYTES)
122146

123-
@unittest.skip("TODO: enable again - requires the temporarily disabled auto mode select")
124147
def test_store_unicode_in_binary_mode(self):
125148
fileio.write_chunk(self.tmp_path, DUMMY_UNICODE, 0, self.logger,
126149
mode='r+b')
127150

128-
with open(self.tmp_path, 'r') as file:
151+
with TextFile(self.tmp_path) as file:
129152
content = file.read(1024)
130-
self.assertEqual(len(content), DUMMY_UNICODE_LENGTH)
131-
self.assertEqual(content[:], DUMMY_UNICODE)
153+
self.assertEqual(len(content), DUMMY_UNICODE_BYTES_LENGTH)
154+
self.assertEqual(as_unicode_string(content[:]), DUMMY_UNICODE)
132155

133156

134157
class MigSharedFileio__write_file(MigTestCase):
@@ -138,15 +161,14 @@ def setUp(self):
138161
cleanpath(os.path.dirname(DUMMY_FILE_WRITEFILE), self)
139162

140163
def test_return_false_on_invalid_data(self):
141-
# NOTE: we make sure to disable any forced stringification here
142-
did_succeed = fileio.write_file(1234, self.tmp_path, self.logger,
143-
force_string=False)
164+
did_succeed = fileio.write_file(1234, self.tmp_path, self.logger)
144165
self.assertFalse(did_succeed)
145166

146167
def test_return_false_on_invalid_dir(self):
147168
os.makedirs(self.tmp_path)
148169

149-
did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger)
170+
did_succeed = fileio.write_file(
171+
DUMMY_BYTES, self.tmp_path, self.logger)
150172
self.assertFalse(did_succeed)
151173

152174
def test_return_false_on_missing_dir(self):
@@ -155,20 +177,16 @@ def test_return_false_on_missing_dir(self):
155177
self.assertFalse(did_succeed)
156178

157179
def test_creates_directory(self):
158-
# TODO: temporarily use empty string to avoid any byte/unicode issues
159-
# did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger)
160-
did_succeed = fileio.write_file('', self.tmp_path, self.logger)
180+
did_succeed = fileio.write_file(
181+
DUMMY_BYTES, self.tmp_path, self.logger)
161182
self.assertTrue(did_succeed)
162183

163184
path_kind = self.assertPathExists(DUMMY_FILE_WRITEFILE)
164185
self.assertEqual(path_kind, "file")
165186

166187
def test_store_bytes(self):
167-
mode = 'w'
168-
# TODO: remove next once we have auto adjust mode in write helper
169-
mode = fileio._auto_adjust_mode(DUMMY_BYTES, mode)
170-
did_succeed = fileio.write_file(DUMMY_BYTES, self.tmp_path, self.logger,
171-
mode=mode)
188+
did_succeed = fileio.write_file(
189+
DUMMY_BYTES, self.tmp_path, self.logger)
172190
self.assertTrue(did_succeed)
173191

174192
with open(self.tmp_path, 'rb') as file:
@@ -211,4 +229,4 @@ def test_store_unicode_in_binary_mode(self):
211229

212230

213231
if __name__ == '__main__':
214-
testmain()
232+
testmain(failfast=True)

0 commit comments

Comments
 (0)