Skip to content

Conversation

@daniel-adam-tfs
Copy link
Contributor

Rationale for this change

Optimization of memory usage, enables the use of custom allocators when reading column data with both buffered and unbuffered readers.

What changes are included in this PR?

Changes to bufferedReader type, new bytesBufferReader type and modification of ReadProperties.GetStream to propagate the custom memory allocator to the readers.

Are these changes tested?

TODO: add unit tests

Are there any user-facing changes?

The allocator if provided with reader properties will be used to allocate the underlying buffers for the buffered/unbuffered readers.

The BufferedReader interface was extended by the Free method to allow returning of the memory to the allocator.

// underlying reader, even if there is less data available than that. So even if there are no more bytes,
// the buffer must have at least bytes.MinRead capacity remaining to avoid a relocation.
allocSize := lenCompressed
if p.decompressBuffer.Cap() < lenCompressed+bytes.MinRead {
Copy link
Contributor Author

@daniel-adam-tfs daniel-adam-tfs Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is dependent on the combined behavior of io.LimitReader and bytes.Buffer. Which seems fragile to me, but I don't have any other ideas how to deal with it. I'll at least add unit tests that the reallocation happens when I don't add bytes.MinRead to the allocation size and doesn't happen when I do.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this seems really fragile. Maybe io.ReadFull directly into p.decompressBuffer.Bytes()[:lenCompressed] instead of using the intermediate bytes.Buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, lets go with ReadFull and we can skip ``bytes.Buffer` altogether.

if n != lenUncompressed {
return nil, fmt.Errorf("parquet: expected to read %d bytes but only read %d", lenUncompressed, n)
}
if p.cryptoCtx.DataDecryptor != nil {
Copy link
Contributor Author

@daniel-adam-tfs daniel-adam-tfs Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is needed or not. But for data page v2, the data is just read by ReadFull and Decrypt is not called:

if compressed {
if levelsBytelen > 0 {
io.ReadFull(p.r, buf.Bytes()[:levelsBytelen])
}
if _, p.err = p.decompress(p.r, lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
return false
}
} else {
io.ReadFull(p.r, buf.Bytes())
}

So maybe the Decrypt call is not needed for data age v1 or dictionary page either?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reverse actually. Looks like this is a bug we just never came across, I'm guessing no one was using DataPageV2 with uncompressed data but still encrypted that was using this library.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I'll fix it for DataPageV2 then. I'll add a unit tests without compression and with encryption that should fail with the current main, if I have the time.

I reran the profiler with the current commit in this PR, with a 2.8GB parquet file stored in S3, uncompressed. And cpu profiler is showing that more time is spent in runtime.memmove (copying memory) than in syscall.Syscall6 (read). Which is annoying me. :-D

I think it should be still possible to eliminate at least one copy for the uncompressed case.

So this is my scenario:

  1. ReaderProperties.GetStream reads column chunk from a TLS and stores it in a buffer (or just allocates the buffer if BufferedStreamEnabled, but lets go with the unbuffered case for now)
  2. serializedPageReader is created with the buffer returned from ReaderProperties.GetStream
  3. serializedPageReader.Next get the page header calls serializedPageReader.readUncompressed / serializedPageReader.decompress which reads data from the GetStream buffer into dictPageBuffer/dataPageBuffer
    3a) for the uncompressed case the this is just a copy
  4. page struct is created from the bytes written to the dictPageBuffer/dataPageBuffer

I think I could avoid the copy in 3a and create page directly from the bytes in the buffer returned by ReaderProperties.GetStream by using combination of calls Peek (to get the bytes)+Discard (to move the internal position inside the buffer). This should hold when BufferedStreamEnabled is false, I have to check what happens when it is true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! Thanks for diving into this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants