Skip to content

Commit ebaefd5

Browse files
authored
Merge pull request #29 from cta-observatory/chunked_reading
Chunked reading
2 parents 8e45125 + 5ae84f7 commit ebaefd5

File tree

4 files changed

+154
-15
lines changed

4 files changed

+154
-15
lines changed

corsikaio/file.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
)
1414
from .subblocks.longitudinal import longitudinal_header_dtype
1515
from .subblocks.data import mmcs_cherenkov_photons_dtype
16-
from .io import read_block, read_buffer_size, open_compressed
16+
from .io import iter_blocks, read_buffer_size, open_compressed
1717

1818
from .constants import BLOCK_SIZE_BYTES, EVTH_VERSION_POSITION
1919

@@ -30,7 +30,7 @@ def _to_floatarray(block):
3030

3131
class CorsikaFile:
3232
"""
33-
A file to iterate over events in a CORSIKA binary output file
33+
A file to iterate over events in a CORSIKA binary output file.
3434
"""
3535

3636
def __init__(self, path, parse_blocks=True):
@@ -39,8 +39,9 @@ def __init__(self, path, parse_blocks=True):
3939
self.parse_blocks = parse_blocks
4040
self._buffer_size = read_buffer_size(path)
4141
self._f = open_compressed(path)
42+
self._block_iter = iter_blocks(self._f)
4243

43-
runh_bytes = self.read_block()
44+
runh_bytes = next(self._block_iter)
4445
if not runh_bytes[:4] == b'RUNH':
4546
raise ValueError('File does not start with b"RUNH"')
4647

@@ -59,18 +60,18 @@ def run_end(self):
5960
self._f.seek(-4, 2)
6061

6162
self._f.seek(-BLOCK_SIZE_BYTES, 1)
62-
block = self.read_block()
63+
block = self._f.read(BLOCK_SIZE_BYTES)
6364
while block[:4] != b'RUNE':
6465
self._f.seek(-2 * BLOCK_SIZE_BYTES, 1)
65-
block = self.read_block()
66+
block = self._f.read(BLOCK_SIZE_BYTES)
6667

6768
self._run_end = parse_run_end(block)[0]
6869
self._f.seek(pos)
6970

7071
return self._run_end
7172

7273
def __next__(self):
73-
block = self.read_block()
74+
block = next(self._block_iter)
7475

7576
if block[:4] == b'RUNE':
7677
self._run_end = parse_run_end(block)
@@ -87,15 +88,15 @@ def __next__(self):
8788
data_bytes = bytearray()
8889
long_bytes = bytearray()
8990

90-
block = self.read_block()
91+
block = next(self._block_iter)
9192
while block[:4] != b'EVTE':
9293

9394
if block[:4] == b'LONG':
9495
long_bytes += block[longitudinal_header_dtype.itemsize:]
9596
else:
9697
data_bytes += block
9798

98-
block = self.read_block()
99+
block = next(self._block_iter)
99100

100101
if self.parse_blocks:
101102
event_end = parse_event_end(block)[0]
@@ -120,7 +121,8 @@ def read_headers(self):
120121
pos = self._f.tell()
121122
self._f.seek(0)
122123

123-
block = self.read_block()
124+
block_iter = iter_blocks(self._f)
125+
block = next(block_iter)
124126
event_header_data = bytearray()
125127
end_found = True
126128

@@ -142,17 +144,14 @@ def read_headers(self):
142144
elif block[:4] == b'EVTE':
143145
end_found = True
144146

145-
block = self.read_block()
147+
block = next(block_iter)
146148

147149
self._f.seek(pos)
148150

149151
event_headers = parse_event_header(event_header_data)
150152

151153
return self.run_header, event_headers, self._run_end
152154

153-
def read_block(self):
154-
return read_block(self._f, self._buffer_size)
155-
156155
def __enter__(self):
157156
return self
158157

corsikaio/io.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@
44
from .constants import BLOCK_SIZE_BYTES
55

66

7+
#: buffersize for files not written as fortran sequential file
8+
DEFAULT_BUFFER_SIZE = BLOCK_SIZE_BYTES * 100
9+
#: struct definition of the fortran record marker
10+
RECORD_MARKER = struct.Struct('i')
11+
12+
713
def is_gzip(path):
814
'''Test if a file is gzipped by reading its first two bytes and compare
915
to the gzip marker bytes.
@@ -42,16 +48,51 @@ def read_buffer_size(path):
4248
size of the CORSIKA buffer in bytes
4349
'''
4450
with open_compressed(path) as f:
45-
data = f.read(4)
51+
data = f.read(RECORD_MARKER.size)
4652

4753
if data == b'RUNH':
4854
return None
4955

50-
buffer_size, = struct.unpack('I', data)
56+
buffer_size, = RECORD_MARKER.unpack(data)
5157

5258
return buffer_size
5359

5460

61+
def iter_blocks(f):
62+
is_fortran_file = True
63+
buffer_size = DEFAULT_BUFFER_SIZE
64+
65+
66+
data = f.read(4)
67+
f.seek(0)
68+
if data == b'RUNH':
69+
is_fortran_file = False
70+
71+
while True:
72+
# for the fortran-chunked output, we need to read the record size
73+
if is_fortran_file:
74+
data = f.read(RECORD_MARKER.size)
75+
if len(data) < RECORD_MARKER.size:
76+
raise IOError("Read less bytes than expected, file seems to be truncated")
77+
78+
buffer_size, = RECORD_MARKER.unpack(data)
79+
80+
data = f.read(buffer_size)
81+
82+
n_blocks = len(data) // BLOCK_SIZE_BYTES
83+
for block in range(n_blocks):
84+
start = block * BLOCK_SIZE_BYTES
85+
stop = start + BLOCK_SIZE_BYTES
86+
block = data[start:stop]
87+
if len(block) < BLOCK_SIZE_BYTES:
88+
raise IOError("Read less bytes than expected, file seems to be truncated")
89+
yield block
90+
91+
# read trailing record marker
92+
if is_fortran_file:
93+
f.read(RECORD_MARKER.size)
94+
95+
5596
def read_block(f, buffer_size=None):
5697
'''
5798
Reads a block of CORSIKA output, e.g. 273 4-byte floats.

setup.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ zstd =
2525
zstandard
2626
tests =
2727
pytest
28+
scipy
2829
all =
2930
%(zstd)s
3031
%(tests)s

tests/test_io.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
import pytest
2+
import numpy as np
3+
from scipy.io import FortranFile
4+
5+
from corsikaio.io import iter_blocks
6+
7+
18
test_files = (
29
'tests/resources/mmcs65',
310
'tests/resources/corsika74100'
@@ -29,6 +36,97 @@ def test_read_block():
2936
assert block[:4] == b'RUNH'
3037

3138

39+
@pytest.fixture()
40+
def simple_dummy_file(tmp_path):
41+
path = tmp_path / "simple.dat"
42+
data = bytearray(np.arange(273).astype(np.float32))
43+
44+
with path.open('wb') as f:
45+
data[:4] = b'RUNH'
46+
f.write(data)
47+
48+
for _ in range(5):
49+
data[:4] = b'EVTH'
50+
f.write(data)
51+
for _ in range(3):
52+
data[:4] = np.float32(0).tobytes()
53+
f.write(data)
54+
data[:4] = b'EVTE'
55+
f.write(data)
56+
data[:4] = b'RUNE'
57+
f.write(data)
58+
return path
59+
60+
61+
@pytest.fixture()
62+
def fortran_raw_dummy_file(tmp_path):
63+
path = tmp_path / "fortran_raw.dat"
64+
data = np.arange(273).astype(np.float32)
65+
66+
67+
blocks = []
68+
data[0] = np.frombuffer(b'RUNH', dtype=np.float32)
69+
blocks.append(data.copy())
70+
71+
for _ in range(5):
72+
data[0] = np.frombuffer(b'EVTH', dtype=np.float32)
73+
blocks.append(data.copy())
74+
for _ in range(3):
75+
data[0] = 0.0
76+
blocks.append(data.copy())
77+
data[0] = np.frombuffer(b'EVTE', dtype=np.float32)
78+
blocks.append(data.copy())
79+
data[0] = np.frombuffer(b'RUNE', dtype=np.float32)
80+
blocks.append(data.copy())
81+
82+
blocks_per_record = 5
83+
with FortranFile(path, mode='w') as f:
84+
n_records = len(blocks) // blocks_per_record + 1
85+
86+
for i in range(n_records):
87+
start = i * blocks_per_record
88+
stop = (i + 1) * blocks_per_record
89+
print(start, stop, len(blocks))
90+
f.write_record(np.concatenate(blocks[start:stop]))
91+
92+
return path
93+
94+
95+
@pytest.fixture(params=["simple", "fortran_raw"])
96+
def dummy_file(request, fortran_raw_dummy_file, simple_dummy_file):
97+
if request.param == "simple":
98+
return simple_dummy_file
99+
else:
100+
return fortran_raw_dummy_file
101+
102+
103+
def test_iter_blocks_simple_file(dummy_file):
104+
"""Test for iterblocks for the case of no record markers"""
105+
data = np.arange(273).astype(np.float32)
106+
107+
with dummy_file.open('rb') as f:
108+
block_it = iter_blocks(f)
109+
block = next(block_it)
110+
assert block[:4] == b'RUNH'
111+
assert (np.frombuffer(block[4:], np.float32) == data[1:]).all()
112+
113+
for _ in range(5):
114+
block = next(block_it)
115+
assert block[:4] == b'EVTH'
116+
assert (np.frombuffer(block[4:], np.float32) == data[1:]).all()
117+
for _ in range(3):
118+
block = next(block_it)
119+
assert (np.frombuffer(block, np.float32) == data).all()
120+
block = next(block_it)
121+
assert block[:4] == b'EVTE'
122+
assert (np.frombuffer(block[4:], np.float32) == data[1:]).all()
123+
124+
block = next(block_it)
125+
assert block[:4] == b'RUNE'
126+
assert (np.frombuffer(block[4:], np.float32) == data[1:]).all()
127+
128+
129+
32130
def test_versions():
33131
from corsikaio.io import read_buffer_size, read_block
34132
from corsikaio.subblocks import get_version

0 commit comments

Comments
 (0)