10
10
import tempfile
11
11
import zipfile
12
12
from contextlib import closing
13
- from typing import TYPE_CHECKING
13
+ from typing import TYPE_CHECKING , cast
14
14
15
15
import e3
16
16
import e3 .error
20
20
21
21
22
22
if TYPE_CHECKING :
23
- from typing import Literal , Text , Union
23
+ from typing import Literal , Text , Union , IO , Any
24
24
from collections .abc import Callable , Sequence
25
25
from os import PathLike
26
26
from e3 .mypy import assert_never
@@ -146,6 +146,7 @@ def check_type(
146
146
def unpack_archive (
147
147
filename : str ,
148
148
dest : str ,
149
+ fileobj : IO [bytes ] | None = None ,
149
150
selected_files : Sequence [str ] | None = None ,
150
151
remove_root_dir : RemoveRootDirType = False ,
151
152
unpack_cmd : Callable [..., None ] | None = None ,
@@ -159,6 +160,10 @@ def unpack_archive(
159
160
160
161
:param filename: archive to unpack
161
162
:param dest: destination directory (should exist)
163
+ :param fileobj: if specified, the archive is read from this file object
164
+ instead of opening a file. The file object must be opened in binary
165
+ mode. In this case filename is the name of the archive contained
166
+ in the file object.
162
167
:param selected_files: list of files to unpack (partial extraction). If
163
168
None all files are unpacked
164
169
:param remove_root_dir: if True then the root dir of the archive is
@@ -192,7 +197,7 @@ def unpack_archive(
192
197
logger .debug ("unpack %s in %s" , filename , dest )
193
198
# First do some checks such as archive existence or destination directory
194
199
# existence.
195
- if not os .path .isfile (filename ):
200
+ if fileobj is None and not os .path .isfile (filename ):
196
201
raise ArchiveError (origin = "unpack_archive" , message = f"cannot find { filename } " )
197
202
198
203
if not os .path .isdir (dest ):
@@ -205,14 +210,19 @@ def unpack_archive(
205
210
206
211
# We need to resolve to an absolute path as the extraction related
207
212
# processes will be run in the destination directory
208
- filename = os .path .abspath (filename )
213
+ if fileobj is None :
214
+ filename = os .path .abspath (filename )
209
215
210
216
if unpack_cmd is not None :
211
217
# Use user defined unpack command
212
- if not selected_files :
213
- return unpack_cmd (filename , dest )
214
- else :
215
- return unpack_cmd (filename , dest , selected_files = selected_files )
218
+ kwargs : dict [str , Any ] = {}
219
+ if selected_files :
220
+ kwargs ["selected_files" ] = selected_files
221
+
222
+ if fileobj is not None :
223
+ kwargs ["fileobj" ] = fileobj
224
+
225
+ return unpack_cmd (filename , dest , ** kwargs )
216
226
217
227
ext = check_type (filename , force_extension = force_extension )
218
228
@@ -237,7 +247,13 @@ def unpack_archive(
237
247
elif ext .endswith ("xz" ):
238
248
mode += "xz"
239
249
# Extract tar files
240
- with closing (tarfile .open (filename , mode = mode )) as fd :
250
+ with closing (
251
+ tarfile .open (
252
+ filename if fileobj is None else None ,
253
+ fileobj = fileobj ,
254
+ mode = mode ,
255
+ )
256
+ ) as fd :
241
257
check_selected = set (selected_files )
242
258
243
259
def is_match (name : str , files : Sequence [str ]) -> bool :
@@ -291,7 +307,9 @@ def is_match(name: str, files: Sequence[str]) -> bool:
291
307
292
308
elif ext == "zip" :
293
309
try :
294
- with closing (E3ZipFile (filename , mode = "r" )) as zip_fd :
310
+ with closing (
311
+ E3ZipFile (fileobj if fileobj is not None else filename , mode = "r" )
312
+ ) as zip_fd :
295
313
zip_fd .extractall (
296
314
tmp_dest , selected_files if selected_files else None
297
315
)
@@ -358,7 +376,8 @@ def is_match(name: str, files: Sequence[str]) -> bool:
358
376
def create_archive (
359
377
filename : str ,
360
378
from_dir : str ,
361
- dest : str ,
379
+ dest : str | None = None ,
380
+ fileobj : IO [bytes ] | None = None ,
362
381
force_extension : str | None = None ,
363
382
from_dir_rename : str | None = None ,
364
383
no_root_dir : bool = False ,
@@ -372,26 +391,45 @@ def create_archive(
372
391
373
392
:param filename: archive to create
374
393
:param from_dir: directory to pack (full path)
375
- :param dest: destination directory (should exist)
394
+ :param dest: destination directory (should exist). If not specified,
395
+ the archive is written to the file object passed with fileobj.
396
+ :param fileobj: if specified, the archive is written to this file object
397
+ instead of opening a file. The file object must be opened in binary
398
+ mode. In this case filename is the name of the archive contained
399
+ in the file object.
376
400
:param force_extension: specify the archive extension if not in the
377
401
filename. If filename has no extension and force_extension is None
378
402
create_archive will fail.
379
403
:param from_dir_rename: name of root directory in the archive.
380
404
:param no_root_dir: create archive without the root dir (zip only)
381
405
406
+ :raise ValueError: neither dest nor fileobj is provided
382
407
:raise ArchiveError: if an error occurs
383
408
"""
409
+ if dest is None and fileobj is None :
410
+ raise ValueError ("no destination provided" )
411
+
384
412
# Check extension
385
413
from_dir = from_dir .rstrip ("/" )
386
- filepath = os .path .abspath (os .path .join (dest , filename ))
414
+
415
+ # If fileobj is None, dest is not None
416
+ filepath = (
417
+ os .path .abspath (os .path .join (cast (str , dest ), filename ))
418
+ if fileobj is None
419
+ else None
420
+ )
387
421
388
422
ext = check_type (filename , force_extension = force_extension )
389
423
390
424
if from_dir_rename is None :
391
425
from_dir_rename = os .path .basename (from_dir )
392
426
393
427
if ext == "zip" :
394
- zip_archive = zipfile .ZipFile (filepath , "w" , zipfile .ZIP_DEFLATED )
428
+ zip_archive = zipfile .ZipFile (
429
+ cast (str , filepath ) if fileobj is None else fileobj ,
430
+ "w" ,
431
+ zipfile .ZIP_DEFLATED ,
432
+ )
395
433
for root , _ , files in os .walk (from_dir ):
396
434
relative_root = os .path .relpath (
397
435
os .path .abspath (root ), os .path .abspath (from_dir )
@@ -413,5 +451,7 @@ def create_archive(
413
451
tar_format = "w:xz"
414
452
else :
415
453
assert_never ()
416
- with closing (tarfile .open (filepath , tar_format )) as tar_archive :
454
+ with closing (
455
+ tarfile .open (filepath , fileobj = fileobj , mode = tar_format )
456
+ ) as tar_archive :
417
457
tar_archive .add (name = from_dir , arcname = from_dir_rename , recursive = True )
0 commit comments