-
Couldn't load subscription status.
- Fork 74
Utilize memory allocator in ReadProperties.GetStream #547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Utilize memory allocator in ReadProperties.GetStream #547
Conversation
parquet/file/page_reader.go
Outdated
| // underlying reader, even if there is less data available than that. So even if there are no more bytes, | ||
| // the buffer must have at least bytes.MinRead capacity remaining to avoid a relocation. | ||
| allocSize := lenCompressed | ||
| if p.decompressBuffer.Cap() < lenCompressed+bytes.MinRead { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is dependent on the combined behavior of io.LimitReader and bytes.Buffer. Which seems fragile to me, but I don't have any other ideas how to deal with it. I'll at least add unit tests that the reallocation happens when I don't add bytes.MinRead to the allocation size and doesn't happen when I do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this seems really fragile. Maybe io.ReadFull directly into p.decompressBuffer.Bytes()[:lenCompressed] instead of using the intermediate bytes.Buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, lets go with ReadFull and we can skip ``bytes.Buffer` altogether.
| if n != lenUncompressed { | ||
| return nil, fmt.Errorf("parquet: expected to read %d bytes but only read %d", lenUncompressed, n) | ||
| } | ||
| if p.cryptoCtx.DataDecryptor != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is needed or not. But for data page v2, the data is just read by ReadFull and Decrypt is not called:
arrow-go/parquet/file/page_reader.go
Lines 815 to 824 in 95b3f76
| if compressed { | |
| if levelsBytelen > 0 { | |
| io.ReadFull(p.r, buf.Bytes()[:levelsBytelen]) | |
| } | |
| if _, p.err = p.decompress(p.r, lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil { | |
| return false | |
| } | |
| } else { | |
| io.ReadFull(p.r, buf.Bytes()) | |
| } |
So maybe the Decrypt call is not needed for data age v1 or dictionary page either?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reverse actually. Looks like this is a bug we just never came across, I'm guessing no one was using DataPageV2 with uncompressed data but still encrypted that was using this library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I'll fix it for DataPageV2 then. I'll add a unit tests without compression and with encryption that should fail with the current main, if I have the time.
I reran the profiler with the current commit in this PR, with a 2.8GB parquet file stored in S3, uncompressed. And cpu profiler is showing that more time is spent in runtime.memmove (copying memory) than in syscall.Syscall6 (read). Which is annoying me. :-D
I think it should be still possible to eliminate at least one copy for the uncompressed case.
So this is my scenario:
ReaderProperties.GetStreamreads column chunk from a TLS and stores it in a buffer (or just allocates the buffer ifBufferedStreamEnabled, but lets go with the unbuffered case for now)serializedPageReaderis created with the buffer returned from ReaderProperties.GetStreamserializedPageReader.Nextget the page header callsserializedPageReader.readUncompressed/serializedPageReader.decompresswhich reads data from the GetStream buffer into dictPageBuffer/dataPageBuffer
3a) for the uncompressed case the this is just a copypagestruct is created from the bytes written to the dictPageBuffer/dataPageBuffer
I think I could avoid the copy in 3a and create page directly from the bytes in the buffer returned by ReaderProperties.GetStream by using combination of calls Peek (to get the bytes)+Discard (to move the internal position inside the buffer). This should hold when BufferedStreamEnabled is false, I have to check what happens when it is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! Thanks for diving into this!
Rationale for this change
Optimization of memory usage, enables the use of custom allocators when reading column data with both buffered and unbuffered readers.
What changes are included in this PR?
Changes to bufferedReader type, new bytesBufferReader type and modification of ReadProperties.GetStream to propagate the custom memory allocator to the readers.
Are these changes tested?
TODO: add unit tests
Are there any user-facing changes?
The allocator if provided with reader properties will be used to allocate the underlying buffers for the buffered/unbuffered readers.
The BufferedReader interface was extended by the Free method to allow returning of the memory to the allocator.