Skip to content

feat: Implement read coalescing algorithm #1198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions src/uproot/source/coalesce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""Read coalescing algorithms

Inspired in part by https://github.com/cms-sw/cmssw/blob/master/IOPool/TFileAdaptor/src/ReadRepacker.h
"""

from __future__ import annotations

import queue
from concurrent.futures import Future
from dataclasses import dataclass
from typing import Callable

import uproot.source.chunk


@dataclass
class CoalesceConfig:
max_range_gap: int = 32 * 1024
max_request_ranges: int = 1024
max_request_bytes: int = 10 * 1024 * 1024
min_first_request_bytes: int = 32 * 1024


DEFAULT_CONFIG = CoalesceConfig()


class SliceFuture:
def __init__(self, parent: Future, s: slice | int):
self._parent = parent
self._s = s

def add_done_callback(self, callback, *, context=None):
self._parent.add_done_callback(callback)

def result(self, timeout=None):
return self._parent.result(timeout=timeout)[self._s]


@dataclass
class RangeRequest:
start: int
stop: int
future: Future | None


@dataclass
class Cluster:
ranges: list[RangeRequest]

@property
def start(self):
# since these are built from sorted ranges, this is the min start
return self.ranges[0].start

@property
def stop(self):
return max(range.stop for range in self.ranges)

def __len__(self):
return self.stop - self.start

def set_future(self, future: Future):
for range in self.ranges:
local_start = range.start - self.start
local_stop = range.stop - self.start
range.future = SliceFuture(future, slice(local_start, local_stop))


@dataclass
class CoalescedRequest:
clusters: list[Cluster]

def ranges(self):
return [(cluster.start, cluster.stop) for cluster in self.clusters]

def set_future(self, future: Future):
for i, cluster in enumerate(self.clusters):
cluster.set_future(SliceFuture(future, i))


def _merge_adjacent(ranges: list[RangeRequest], config: CoalesceConfig):
sorted_ranges = sorted(ranges, key=lambda r: r.start)
cluster = Cluster([])
for current_range in sorted_ranges:
if cluster.ranges and current_range.start - cluster.stop > config.max_range_gap:
yield cluster
cluster = Cluster([])
cluster.ranges.append(current_range)
if cluster.ranges:
yield cluster


def _coalesce(ranges: list[RangeRequest], config: CoalesceConfig):
clusters: list[Cluster] = []
request_bytes: int = 0
first_request = True
for cluster in _merge_adjacent(ranges, config):
if clusters and (
len(clusters) + 1 >= config.max_request_ranges
or request_bytes + len(cluster) >= config.max_request_bytes
or (first_request and request_bytes >= config.min_first_request_bytes)
):
yield CoalescedRequest(clusters)
clusters = []
request_bytes = 0
first_request = False
clusters.append(cluster)
request_bytes += len(cluster)
if clusters:
yield CoalescedRequest(clusters)


def coalesce_requests(
ranges: list[tuple[int, int]],
submit_fn: Callable[[list[tuple[int, int]]], Future],
source: uproot.source.chunk.Source,
notifications: queue.Queue,
config: CoalesceConfig | None = None,
):
if config is None:
config = DEFAULT_CONFIG
all_requests = [RangeRequest(start, stop, None) for start, stop in ranges]
for merged_request in _coalesce(all_requests, config):
future = submit_fn(merged_request.ranges())
merged_request.set_future(future)

def chunkify(req: RangeRequest):
chunk = uproot.source.chunk.Chunk(source, req.start, req.stop, req.future)
req.future.add_done_callback(uproot.source.chunk.notifier(chunk, notifications))
return chunk

return list(map(chunkify, all_requests))
73 changes: 24 additions & 49 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,14 @@
import uproot
import uproot.source.chunk
import uproot.source.futures


class PartFuture:
"""For splitting the result of fs._cat_ranges into its components"""

def __init__(self, parent_future: concurrent.futures.Future, part_index: int):
self._parent = parent_future
self._part_index = part_index

def add_done_callback(self, callback, *, context=None):
self._parent.add_done_callback(callback)

def result(self, timeout=None):
return self._parent.result(timeout=timeout)[self._part_index]
from uproot.source.coalesce import CoalesceConfig, coalesce_requests


class FSSpecSource(uproot.source.chunk.Source):
"""
Args:
file_path (str): A URL for the file to open.
coalesce_config (struct, optional): Configuration options for read coalescing
**kwargs (dict): any extra arguments to be forwarded to the particular
FileSystem instance constructor. This might include S3 access keys,
or HTTP headers, etc.
Expand All @@ -40,8 +28,11 @@ class FSSpecSource(uproot.source.chunk.Source):
to get many chunks in one request.
"""

def __init__(self, file_path: str, **options):
def __init__(
self, file_path: str, coalesce_config: CoalesceConfig | None = None, **options
):
super().__init__()
self._coalesce_config = coalesce_config
self._fs, self._file_path = fsspec.core.url_to_fs(
file_path, **self.extract_fsspec_options(options)
)
Expand All @@ -50,7 +41,6 @@ def __init__(self, file_path: str, **options):
self._async_impl = self._fs.async_impl

self._file = None
self._fh = None

self._open()

Expand All @@ -75,25 +65,20 @@ def __repr__(self):
return f"<{type(self).__name__} {path} at 0x{id(self):012x}>"

def __getstate__(self):
self._fh = None
state = dict(self.__dict__)
state.pop("_executor")
state.pop("_file")
state.pop("_fh")
return state

def __setstate__(self, state):
self.__dict__ = state
self._file = None
self._fh = None
self._open()

def __enter__(self):
self._fh = self._file.__enter__()
return self

def __exit__(self, exception_type, exception_value, traceback):
self._fh = None
self._file.__exit__(exception_type, exception_value, traceback)
self._executor.shutdown()

Expand All @@ -110,20 +95,16 @@ def chunk(self, start: int, stop: int) -> uproot.source.chunk.Chunk:
self._num_requests += 1
self._num_requested_chunks += 1
self._num_requested_bytes += stop - start
if self._fh:
self._fh.seek(start)
data = self._fh.read(stop - start)
else:
data = self._fs.cat_file(self._file_path, start, stop)
data = self._fs.cat_file(self._file_path, start=start, end=stop)
future = uproot.source.futures.TrivialFuture(data)
return uproot.source.chunk.Chunk(self, start, stop, future)

def chunks(
self, ranges: list[(int, int)], notifications: queue.Queue
self, ranges: list[tuple[int, int]], notifications: queue.Queue
) -> list[uproot.source.chunk.Chunk]:
"""
Args:
ranges (list of (int, int) 2-tuples): Intervals to fetch
ranges (list of tuple[int, int] 2-tuples): Intervals to fetch
as (start, stop) pairs in a single request, if possible.
notifications (``queue.Queue``): Indicator of completed
chunks. After each gets filled, it is ``put`` on the
Expand Down Expand Up @@ -171,29 +152,23 @@ async def async_wrapper_thread(blocking_func, *args, **kwargs):
# TODO: when python 3.8 is dropped, use `asyncio.to_thread` instead (also remove the try/except block above)
return await to_thread(blocking_func, *args, **kwargs)

paths = [self._file_path] * len(ranges)
starts = [start for start, _ in ranges]
ends = [stop for _, stop in ranges]
# _cat_ranges is async while cat_ranges is not.
coroutine = (
self._fs._cat_ranges(paths=paths, starts=starts, ends=ends)
if self._async_impl
else async_wrapper_thread(
self._fs.cat_ranges, paths=paths, starts=starts, ends=ends
def submit(request_ranges: list[tuple[int, int]]):
paths = [self._file_path] * len(request_ranges)
starts = [start for start, _ in request_ranges]
ends = [stop for _, stop in request_ranges]
# _cat_ranges is async while cat_ranges is not.
coroutine = (
self._fs._cat_ranges(paths=paths, starts=starts, ends=ends)
if self._async_impl
else async_wrapper_thread(
self._fs.cat_ranges, paths=paths, starts=starts, ends=ends
)
)
)

future = self._executor.submit(coroutine)
return self._executor.submit(coroutine)

chunks = []
for index, (start, stop) in enumerate(ranges):
chunk_future = PartFuture(future, index)
chunk = uproot.source.chunk.Chunk(self, start, stop, chunk_future)
chunk_future.add_done_callback(
uproot.source.chunk.notifier(chunk, notifications)
)
chunks.append(chunk)
return chunks
return coalesce_requests(
ranges, submit, self, notifications, config=self._coalesce_config
)

@property
def async_impl(self) -> bool:
Expand Down
8 changes: 0 additions & 8 deletions tests/test_0692_fsspec_reading.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ def test_open_fsspec_local():
)
def test_open_fsspec_s3(handler):
pytest.importorskip("s3fs")
if sys.version_info < (3, 11):
pytest.skip(
"https://github.com/scikit-hep/uproot5/pull/1012",
)

with uproot.open(
"s3://pivarski-princeton/pythia_ppZee_run17emb.picoDst.root:PicoDst",
Expand Down Expand Up @@ -461,10 +457,6 @@ def test_fsspec_globbing_xrootd_no_files(handler):
)
def test_fsspec_globbing_s3(handler):
pytest.importorskip("s3fs")
if sys.version_info < (3, 11):
pytest.skip(
"https://github.com/scikit-hep/uproot5/pull/1012",
)

iterator = uproot.iterate(
{"s3://pivarski-princeton/pythia_ppZee_run17emb.*.root": "PicoDst"},
Expand Down
39 changes: 39 additions & 0 deletions tests/test_1198_coalesce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pytest
from uproot.source.coalesce import CoalesceConfig, RangeRequest, _coalesce, Future


@pytest.mark.parametrize(
"config",
[
CoalesceConfig(),
CoalesceConfig(max_range_gap=2, max_request_ranges=1),
],
ids=["default", "tiny"],
)
@pytest.mark.parametrize(
"ranges",
[
[(1, 3), (4, 6), (10, 20)],
[(1, 3), (10, 20), (4, 6), (9, 10)],
[(1, 3), (10, 20), (6, 15)],
[(1, 3), (10, 20), (6, 25)],
],
ids=["sorted", "jumbled", "overlapped", "nested"],
)
def test_coalesce(ranges, config):
data = b"abcdefghijklmnopqurstuvwxyz"

all_requests = [RangeRequest(start, stop, None) for start, stop in ranges]
nreq = 0
for merged_request in _coalesce(all_requests, config):
future = Future()
future.set_result([data[start:stop] for start, stop in merged_request.ranges()])
merged_request.set_future(future)
nreq += 1

if config.max_range_gap == 2:
assert nreq > 1

for req in all_requests:
assert req.future
assert req.future.result() == data[req.start : req.stop]