5
5
import logging
6
6
import os
7
7
from collections .abc import Sequence
8
- from datetime import datetime
9
- from datetime import timezone
10
8
from importlib import metadata
11
9
from typing import Any
12
10
15
13
from segy import SegyFile
16
14
from segy .config import SegySettings
17
15
from segy .schema import HeaderField
16
+ from zarr import Blosc
18
17
19
18
from mdio .api .io_utils import process_url
20
19
from mdio .converters .exceptions import EnvironmentFormatError
21
20
from mdio .converters .exceptions import GridTraceCountError
22
21
from mdio .converters .exceptions import GridTraceSparsityError
23
22
from mdio .core import Grid
23
+ from mdio .core .factory import MDIOCreateConfig
24
+ from mdio .core .factory import MDIOVariableConfig
25
+ from mdio .core .factory import create_empty
24
26
from mdio .core .utils_write import write_attribute
25
27
from mdio .segy import blocked_io
26
28
from mdio .segy .compat import mdio_segy_spec
27
- from mdio .segy .helpers_segy import create_zarr_hierarchy
28
29
from mdio .segy .utilities import get_grid_plan
29
30
30
31
32
+ try :
33
+ import zfpy # Base library
34
+ from zarr import ZFPY # Codec
35
+ except ImportError :
36
+ ZFPY = None
37
+ zfpy = None
38
+
39
+
31
40
logger = logging .getLogger (__name__ )
32
41
33
42
try :
@@ -103,6 +112,28 @@ def grid_density_qc(grid: Grid, num_traces: int) -> None:
103
112
raise GridTraceSparsityError (grid .shape , num_traces , msg )
104
113
105
114
115
+ def get_compressor (
116
+ lossless : bool , compression_tolerance : float = - 1
117
+ ) -> Blosc | ZFPY | None :
118
+ """Get the appropriate compressor for the seismic traces."""
119
+ if lossless :
120
+ compressor = Blosc ("zstd" )
121
+ else :
122
+ if zfpy is None or ZFPY is None :
123
+ msg = (
124
+ "Lossy compression requires the 'zfpy' library. It is "
125
+ "not installed in your environment. To proceed please "
126
+ "install 'zfpy' or install mdio with `--extras lossy`"
127
+ )
128
+ raise ImportError (msg )
129
+
130
+ compressor = ZFPY (
131
+ mode = zfpy .mode_fixed_accuracy ,
132
+ tolerance = compression_tolerance ,
133
+ )
134
+ return compressor
135
+
136
+
106
137
def segy_to_mdio ( # noqa: C901
107
138
segy_path : str ,
108
139
mdio_path_or_buffer : str ,
@@ -364,14 +395,6 @@ def segy_to_mdio( # noqa: C901
364
395
if storage_options_output is None :
365
396
storage_options_output = {}
366
397
367
- store = process_url (
368
- url = mdio_path_or_buffer ,
369
- mode = "w" ,
370
- storage_options = storage_options_output ,
371
- memory_cache_size = 0 , # Making sure disk caching is disabled,
372
- disk_cache = False , # Making sure disk caching is disabled
373
- )
374
-
375
398
# Open SEG-Y with MDIO's SegySpec. Endianness will be inferred.
376
399
mdio_spec = mdio_segy_spec ()
377
400
segy_settings = SegySettings (storage_options = storage_options_input )
@@ -406,45 +429,6 @@ def segy_to_mdio( # noqa: C901
406
429
logger .warning (f"Ingestion grid shape: { grid .shape } ." )
407
430
raise GridTraceCountError (np .sum (grid .live_mask ), num_traces )
408
431
409
- zarr_root = create_zarr_hierarchy (
410
- store = store ,
411
- overwrite = overwrite ,
412
- )
413
-
414
- # Get UTC time, then add local timezone information offset.
415
- iso_datetime = datetime .now (timezone .utc ).isoformat ()
416
-
417
- write_attribute (name = "created" , zarr_group = zarr_root , attribute = iso_datetime )
418
- write_attribute (name = "api_version" , zarr_group = zarr_root , attribute = API_VERSION )
419
-
420
- dimensions_dict = [dim .to_dict () for dim in dimensions ]
421
- write_attribute (name = "dimension" , zarr_group = zarr_root , attribute = dimensions_dict )
422
-
423
- # Write trace count
424
- trace_count = np .count_nonzero (grid .live_mask )
425
- write_attribute (name = "trace_count" , zarr_group = zarr_root , attribute = trace_count )
426
-
427
- # Note, live mask is not chunked since it's bool and small.
428
- zarr_root ["metadata" ].create_dataset (
429
- data = grid .live_mask ,
430
- name = "live_mask" ,
431
- shape = grid .shape [:- 1 ],
432
- chunks = - 1 ,
433
- dimension_separator = "/" ,
434
- )
435
-
436
- write_attribute (
437
- name = "text_header" ,
438
- zarr_group = zarr_root ["metadata" ],
439
- attribute = text_header .split ("\n " ),
440
- )
441
-
442
- write_attribute (
443
- name = "binary_header" ,
444
- zarr_group = zarr_root ["metadata" ],
445
- attribute = binary_header .to_dict (),
446
- )
447
-
448
432
if chunksize is None :
449
433
dim_count = len (index_names ) + 1
450
434
if dim_count == 2 :
@@ -467,18 +451,59 @@ def segy_to_mdio( # noqa: C901
467
451
suffix = [str (idx ) for idx , value in enumerate (suffix ) if value is not None ]
468
452
suffix = "" .join (suffix )
469
453
454
+ compressor = get_compressor (compression_tolerance , lossless )
455
+ header_dtype = segy .spec .trace .header .dtype .newbyteorder ("=" )
456
+ var_conf = MDIOVariableConfig (
457
+ name = f"chunked_{ suffix } " ,
458
+ dtype = "float32" ,
459
+ chunks = chunksize ,
460
+ compressor = compressor ,
461
+ header_dtype = header_dtype ,
462
+ )
463
+ config = MDIOCreateConfig (path = mdio_path_or_buffer , grid = grid , variables = [var_conf ])
464
+
465
+ zarr_root = create_empty (
466
+ config ,
467
+ overwrite = overwrite ,
468
+ storage_options = storage_options_output ,
469
+ consolidate_meta = False ,
470
+ )
471
+ data_group , meta_group = zarr_root ["data" ], zarr_root ["metadata" ]
472
+ data_array = data_group [f"chunked_{ suffix } " ]
473
+ header_array = meta_group [f"chunked_{ suffix } _trace_headers" ]
474
+
475
+ # Write actual live mask and metadata to empty MDIO
476
+ meta_group ["live_mask" ][:] = grid .live_mask
477
+ write_attribute (
478
+ name = "trace_count" ,
479
+ zarr_group = zarr_root ,
480
+ attribute = np .count_nonzero (grid .live_mask ),
481
+ )
482
+ write_attribute (
483
+ name = "text_header" ,
484
+ zarr_group = zarr_root ["metadata" ],
485
+ attribute = text_header .split ("\n " ),
486
+ )
487
+ write_attribute (
488
+ name = "binary_header" ,
489
+ zarr_group = zarr_root ["metadata" ],
490
+ attribute = binary_header .to_dict (),
491
+ )
492
+
493
+ # Write traces
470
494
stats = blocked_io .to_zarr (
471
495
segy_file = segy ,
472
496
grid = grid ,
473
- data_root = zarr_root [ "data" ] ,
474
- metadata_root = zarr_root [ "metadata" ] ,
497
+ data_array = data_array ,
498
+ header_array = header_array ,
475
499
name = "_" .join (["chunked" , suffix ]),
476
500
dtype = "float32" ,
477
501
chunks = chunksize ,
478
502
lossless = lossless ,
479
503
compression_tolerance = compression_tolerance ,
480
504
)
481
505
506
+ # Write actual stats
482
507
for key , value in stats .items ():
483
508
write_attribute (name = key , zarr_group = zarr_root , attribute = value )
484
509
0 commit comments