-
Notifications
You must be signed in to change notification settings - Fork 154
[Feature] Large file uploads (901) #902
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] Large file uploads (901) #902
Conversation
multipart_upload_min_stream_size: int = 5 * 1024 * 1024 | ||
|
||
# Maximum number of presigned URLs that can be requested at a time. | ||
# | ||
# The more URLs we request at once, the higher chance is that some of the URLs will expire | ||
# before we get to use it. We discover the presigned URL is expired *after* sending the | ||
# input stream partition to the server. So to retry the upload of this partition we must rewind | ||
# the stream back. In case of a non-seekable stream we cannot rewind, so we'll abort | ||
# the upload. To reduce the chance of this, we're requesting presigned URLs one by one | ||
# and using them immediately. | ||
multipart_upload_batch_url_count: int = 1 | ||
|
||
# Size of the chunk to use for multipart uploads. | ||
# | ||
# The smaller chunk is, the less chance for network errors (or URL get expired), | ||
# but the more requests we'll make. | ||
# For AWS, minimum is 5Mb: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html | ||
# For GCP, minimum is 256 KiB (and also recommended multiple is 256 KiB) | ||
# boto uses 8Mb: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html#boto3.s3.transfer.TransferConfig | ||
multipart_upload_chunk_size: int = 10 * 1024 * 1024 | ||
|
||
# use maximum duration of 1 hour | ||
multipart_upload_url_expiration_duration: datetime.timedelta = datetime.timedelta(hours=1) | ||
|
||
# This is not a "wall time" cutoff for the whole upload request, | ||
# but a maximum time between consecutive data reception events (even 1 byte) from the server | ||
multipart_upload_single_chunk_upload_timeout_seconds: float = 60 | ||
|
||
# Cap on the number of custom retries during incremental uploads: | ||
# 1) multipart: upload part URL is expired, so new upload URLs must be requested to continue upload | ||
# 2) resumable: chunk upload produced a retryable response (or exception), so upload status must be | ||
# retrieved to continue the upload. | ||
# In these two cases standard SDK retries (which are capped by the `retry_timeout_seconds` option) are not used. | ||
# Note that retry counter is reset when upload is successfully resumed. | ||
multipart_upload_max_retries = 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As defined, these are static variables that would be shared by all instance of the Config
class. My understanding is that these should be considered as static constants. If so, let's rename rename them in uppercase to make that clear.
# Minimal input stream size (bytes) to use multipart / resumable uploads.
# For small files it's more efficient to make one single-shot upload request.
# When uploading a file, SDK will initially buffer this many bytes from input stream.
# This parameter can be less or bigger than multipart_upload_chunk_size.
MULTIPART_UPLOAD_MIN_STREAM_SIZE: int = 5 * 1024 * 1024
# and so on...
If the intent is to make them actual instance variable, then these should be defined in the init
function:
def __init__(
self,
*,
# Deprecated. Use credentials_strategy instead.
credentials_provider: Optional[CredentialsStrategy] = None,
credentials_strategy: Optional[CredentialsStrategy] = None,
product=None,
product_version=None,
clock: Optional[Clock] = None,
**kwargs,
):
# Init file settings.
self.multipart_upload_min_stream_size = 5 * 1024 * 1024
# and so on...
def _download_raw_stream( | ||
def upload(self, file_path: str, contents: BinaryIO, *, overwrite: Optional[bool] = None): | ||
# Upload empty and small files with one-shot upload. | ||
pre_read_buffer = contents.read(self._config.multipart_upload_min_stream_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment above, if multipart_upload_min_stream_size
is meant to be a static constant, then we should access it this way:
pre_read_buffer = contents.read(self._config.multipart_upload_min_stream_size) | |
pre_read_buffer = contents.read(Config.MULTIPART_UPLOAD_MIN_STREAM_SIZE) |
databricks/sdk/mixins/files.py
Outdated
# _api.do() does retry | ||
initiate_upload_response = self._api.do( | ||
"POST", f"/api/2.0/fs/files{_escape_multi_segment_path_parameter(file_path)}", query=query | ||
) | ||
# no need to check response status, _api.do() will throw exception on failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit]
# _api.do() does retry | |
initiate_upload_response = self._api.do( | |
"POST", f"/api/2.0/fs/files{_escape_multi_segment_path_parameter(file_path)}", query=query | |
) | |
# no need to check response status, _api.do() will throw exception on failure | |
# Method _api.do() takes care of retrying and will raise an exception in case of failure. | |
initiate_upload_response = self._api.do( | |
"POST", f"/api/2.0/fs/files{_escape_multi_segment_path_parameter(file_path)}", query=query | |
) |
@@ -713,11 +727,560 @@ def download(self, file_path: str) -> DownloadResponse: | |||
initial_response.contents._response = wrapped_response | |||
return initial_response | |||
|
|||
def _download_raw_stream( | |||
def upload(self, file_path: str, contents: BinaryIO, *, overwrite: Optional[bool] = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a docstring to explain what the introduced functions are doing.
Some general guidelines about doc string: https://google.github.io/styleguide/pyguide.html#383-functions-and-methods. Note that we use the reStructured format, not the Google format: https://peps.python.org/pep-0287/
databricks/sdk/mixins/files.py
Outdated
def fill_buffer(): | ||
bytes_to_read = max(0, self._config.multipart_upload_chunk_size - len(buffer)) | ||
if bytes_to_read > 0: | ||
next_buf = input_stream.read(bytes_to_read) | ||
new_buffer = buffer + next_buf | ||
return new_buffer | ||
else: | ||
# we have already buffered enough data | ||
return buffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this function is a little confusion as it looks functional and procedural at the same time. I think it would help to declare it as a standalone static function that returns the next buffer. Something like this:
def _read_next_buffer(
input_stream: BinaryIO,
previous_buffer: bytes,
chunk_size: int=Config.MULTIPART_UPLOAD_CHUNK_SIZE,
) -> bytes:
"""
Returns the next buffer to be processed.
The size of the next buffer can be more or less than the given chunk_size. If it is
less, it means that the all the input_stream has been read and that
The function avoids reading new bytes from the input_stream if previous_buffer
already contains chunk_size bytes to be read.
"""
bytes_to_read = max(0, Config.MULTIPART_UPLOAD_CHUNK_SIZE - len(previous_buffer))
if bytes_to_read <= 0:
return previous_buffer # buffer already contains a full chunk
next_bytes = input_stream.read(bytes_to_read)
new_buffer = previous_buffer + next_bytes
return new_buffer
All that behind said, I think it would be better to abstract all this in a class that would hide the buffering logic and expose simple interface that read the next bytes. This can come as a follow-up to this PR though.
databricks/sdk/mixins/files.py
Outdated
# no need to check response status, _api.do() will throw exception on failure | ||
|
||
upload_part_urls = upload_part_urls_response.get("upload_part_urls", []) | ||
if not len(upload_part_urls): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's aim for clarity for developers who are not use to Python idioms.
if not len(upload_part_urls): | |
if len(upload_part_urls) == 0: |
databricks/sdk/mixins/files.py
Outdated
for upload_part_url in upload_part_urls: | ||
buffer = fill_buffer() | ||
actual_buffer_length = len(buffer) | ||
if not actual_buffer_length: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, while maybe more "pythonic", this adds ambiguity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I think we'll have to do a little more work on the code style (e.g. use types that are compatible with python 3.7) but this should not be a blocker to start getting customer feedback on this experimental function. Test coverage is good and I'm confident that the code is doing what it has been designed to do.
Head branch was pushed to by a user without write access
If integration tests don't run automatically, an authorized user can run them manually by following the instructions below: Trigger: Inputs:
Checks will be approved automatically on success. |
What changes are proposed in this pull request?
Supporting large file uploads using Files API.
NO_CHANGELOG=true
How is this tested?
Unit tests.