Skip to content

[Feature] Large file uploads (901) #902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

ksafonov-db
Copy link
Contributor

@ksafonov-db ksafonov-db commented Feb 26, 2025

What changes are proposed in this pull request?

Supporting large file uploads using Files API.

NO_CHANGELOG=true

How is this tested?

Unit tests.

@ksafonov-db ksafonov-db changed the title [901] Large file uploads [Feature] Large file uploads Feb 26, 2025
@ksafonov-db ksafonov-db changed the title [Feature] Large file uploads [Feature] Large file uploads (901) Feb 26, 2025
@renaudhartert-db renaudhartert-db self-requested a review March 4, 2025 10:18
Comment on lines +109 to +143
multipart_upload_min_stream_size: int = 5 * 1024 * 1024

# Maximum number of presigned URLs that can be requested at a time.
#
# The more URLs we request at once, the higher chance is that some of the URLs will expire
# before we get to use it. We discover the presigned URL is expired *after* sending the
# input stream partition to the server. So to retry the upload of this partition we must rewind
# the stream back. In case of a non-seekable stream we cannot rewind, so we'll abort
# the upload. To reduce the chance of this, we're requesting presigned URLs one by one
# and using them immediately.
multipart_upload_batch_url_count: int = 1

# Size of the chunk to use for multipart uploads.
#
# The smaller chunk is, the less chance for network errors (or URL get expired),
# but the more requests we'll make.
# For AWS, minimum is 5Mb: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
# For GCP, minimum is 256 KiB (and also recommended multiple is 256 KiB)
# boto uses 8Mb: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html#boto3.s3.transfer.TransferConfig
multipart_upload_chunk_size: int = 10 * 1024 * 1024

# use maximum duration of 1 hour
multipart_upload_url_expiration_duration: datetime.timedelta = datetime.timedelta(hours=1)

# This is not a "wall time" cutoff for the whole upload request,
# but a maximum time between consecutive data reception events (even 1 byte) from the server
multipart_upload_single_chunk_upload_timeout_seconds: float = 60

# Cap on the number of custom retries during incremental uploads:
# 1) multipart: upload part URL is expired, so new upload URLs must be requested to continue upload
# 2) resumable: chunk upload produced a retryable response (or exception), so upload status must be
# retrieved to continue the upload.
# In these two cases standard SDK retries (which are capped by the `retry_timeout_seconds` option) are not used.
# Note that retry counter is reset when upload is successfully resumed.
multipart_upload_max_retries = 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As defined, these are static variables that would be shared by all instance of the Config class. My understanding is that these should be considered as static constants. If so, let's rename rename them in uppercase to make that clear.

    # Minimal input stream size (bytes) to use multipart / resumable uploads.
    # For small files it's more efficient to make one single-shot upload request.
    # When uploading a file, SDK will initially buffer this many bytes from input stream.
    # This parameter can be less or bigger than multipart_upload_chunk_size.
    MULTIPART_UPLOAD_MIN_STREAM_SIZE: int = 5 * 1024 * 1024

    # and so on...

If the intent is to make them actual instance variable, then these should be defined in the init function:

def __init__(
        self,
        *,
        # Deprecated. Use credentials_strategy instead.
        credentials_provider: Optional[CredentialsStrategy] = None,
        credentials_strategy: Optional[CredentialsStrategy] = None,
        product=None,
        product_version=None,
        clock: Optional[Clock] = None,
        **kwargs,
    ):

        # Init file settings.
        self.multipart_upload_min_stream_size = 5 * 1024 * 1024

        # and so on...

def _download_raw_stream(
def upload(self, file_path: str, contents: BinaryIO, *, overwrite: Optional[bool] = None):
# Upload empty and small files with one-shot upload.
pre_read_buffer = contents.read(self._config.multipart_upload_min_stream_size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above, if multipart_upload_min_stream_size is meant to be a static constant, then we should access it this way:

Suggested change
pre_read_buffer = contents.read(self._config.multipart_upload_min_stream_size)
pre_read_buffer = contents.read(Config.MULTIPART_UPLOAD_MIN_STREAM_SIZE)

Comment on lines 743 to 747
# _api.do() does retry
initiate_upload_response = self._api.do(
"POST", f"/api/2.0/fs/files{_escape_multi_segment_path_parameter(file_path)}", query=query
)
# no need to check response status, _api.do() will throw exception on failure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit]

Suggested change
# _api.do() does retry
initiate_upload_response = self._api.do(
"POST", f"/api/2.0/fs/files{_escape_multi_segment_path_parameter(file_path)}", query=query
)
# no need to check response status, _api.do() will throw exception on failure
# Method _api.do() takes care of retrying and will raise an exception in case of failure.
initiate_upload_response = self._api.do(
"POST", f"/api/2.0/fs/files{_escape_multi_segment_path_parameter(file_path)}", query=query
)

@@ -713,11 +727,560 @@ def download(self, file_path: str) -> DownloadResponse:
initial_response.contents._response = wrapped_response
return initial_response

def _download_raw_stream(
def upload(self, file_path: str, contents: BinaryIO, *, overwrite: Optional[bool] = None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a docstring to explain what the introduced functions are doing.

Some general guidelines about doc string: https://google.github.io/styleguide/pyguide.html#383-functions-and-methods. Note that we use the reStructured format, not the Google format: https://peps.python.org/pep-0287/

Comment on lines 802 to 810
def fill_buffer():
bytes_to_read = max(0, self._config.multipart_upload_chunk_size - len(buffer))
if bytes_to_read > 0:
next_buf = input_stream.read(bytes_to_read)
new_buffer = buffer + next_buf
return new_buffer
else:
# we have already buffered enough data
return buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function is a little confusion as it looks functional and procedural at the same time. I think it would help to declare it as a standalone static function that returns the next buffer. Something like this:

def _read_next_buffer(
    input_stream: BinaryIO, 
    previous_buffer: bytes, 
    chunk_size: int=Config.MULTIPART_UPLOAD_CHUNK_SIZE,
) -> bytes:
    """
    Returns the next buffer to be processed.

    The size of the next buffer can be more or less than the given chunk_size. If it is
    less, it means that the all the input_stream has been read and that 
    
    The function avoids reading new bytes from the input_stream if previous_buffer
    already contains chunk_size bytes to be read.
    """

    bytes_to_read = max(0, Config.MULTIPART_UPLOAD_CHUNK_SIZE - len(previous_buffer))

    if bytes_to_read <= 0:
        return previous_buffer # buffer already contains a full chunk

    next_bytes = input_stream.read(bytes_to_read)
    new_buffer = previous_buffer + next_bytes
    return new_buffer

All that behind said, I think it would be better to abstract all this in a class that would hide the buffering logic and expose simple interface that read the next bytes. This can come as a follow-up to this PR though.

# no need to check response status, _api.do() will throw exception on failure

upload_part_urls = upload_part_urls_response.get("upload_part_urls", [])
if not len(upload_part_urls):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's aim for clarity for developers who are not use to Python idioms.

Suggested change
if not len(upload_part_urls):
if len(upload_part_urls) == 0:

for upload_part_url in upload_part_urls:
buffer = fill_buffer()
actual_buffer_length = len(buffer)
if not actual_buffer_length:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, while maybe more "pythonic", this adds ambiguity.

@renaudhartert-db renaudhartert-db self-requested a review March 5, 2025 11:18
Copy link
Contributor

@renaudhartert-db renaudhartert-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I think we'll have to do a little more work on the code style (e.g. use types that are compatible with python 3.7) but this should not be a blocker to start getting customer feedback on this experimental function. Test coverage is good and I'm confident that the code is doing what it has been designed to do.

auto-merge was automatically disabled March 5, 2025 13:09

Head branch was pushed to by a user without write access

Copy link

github-actions bot commented Mar 5, 2025

If integration tests don't run automatically, an authorized user can run them manually by following the instructions below:

Trigger:
go/deco-tests-run/sdk-py

Inputs:

  • PR number: 902
  • Commit SHA: b4d39cc0f13c89a3ea2e51c4b3b60bec90c1466a

Checks will be approved automatically on success.

@renaudhartert-db renaudhartert-db added this pull request to the merge queue Mar 5, 2025
Merged via the queue into databricks:main with commit 7ca3fb7 Mar 5, 2025
18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants