Skip to content

Add support for Google Cloud Storage #262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
10 changes: 6 additions & 4 deletions .github/workflows/tests-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ jobs:
runs-on: arc-runner-set
strategy:
matrix:
python-version: [ "3.9", "3.10", "3.11", "3.12", "3.13" ]
python-version: [ "3.9", "3.13" ]
env_target: [ "AZURE", "GCP" ]
steps:
- name: Checkout repository
uses: actions/checkout@v4
Expand All @@ -35,11 +36,12 @@ jobs:
- name: Run tests
timeout-minutes: 30
env:
NEPTUNE_API_TOKEN: ${{ secrets.E2E_API_TOKEN }}
NEPTUNE_PROJECT: ${{ secrets.E2E_PROJECT }}
NEPTUNE_E2E_PROJECT: ${{ secrets.E2E_PROJECT }}
NEPTUNE_API_TOKEN: ${{ secrets[format('E2E_API_TOKEN_{0}', matrix.env_target)] }}
NEPTUNE_PROJECT: ${{ secrets[format('E2E_PROJECT_{0}', matrix.env_target)] }}
NEPTUNE_E2E_PROJECT: ${{ secrets[format('E2E_PROJECT_{0}', matrix.env_target)] }}
NEPTUNE_E2E_CUSTOM_RUN_ID: ${{ vars.E2E_CUSTOM_RUN_ID }}
NEPTUNE_FILE_API_ENABLED: ${{ vars.NEPTUNE_FILE_API_ENABLED }}
NEPTUNE_ALLOW_SELF_SIGNED_CERTIFICATE: true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It this temporary or supposed to stay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is temporary until the test instance is finalized

run: pytest --junitxml="test-results/test-e2e-${{ matrix.python-version }}.xml" tests/e2e

- name: Upload test reports
Expand Down
2 changes: 2 additions & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@ pre-commit
pytest
pytest-timeout
pytest-xdist
pytest-asyncio
freezegun
numpy
neptune-api @ git+https://github.com/neptune-ai/neptune-api.git@dev/storage-v2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder: to be removed.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ warn_unused_ignores = "True"
ignore_missing_imports = "True"

[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
addopts = "--doctest-modules -n auto"

[tool.poetry.scripts]
Expand Down
4 changes: 2 additions & 2 deletions src/neptune_scale/net/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
bulk_check_status,
ingest,
)
from neptune_api.api.storage import signed_url
from neptune_api.api.storage import signed_url_generic
from neptune_api.auth_helpers import exchange_api_key
from neptune_api.credentials import Credentials
from neptune_api.errors import (
Expand Down Expand Up @@ -228,7 +228,7 @@ def fetch_file_storage_urls(
mode: Literal["read", "write"],
) -> Response[CreateSignedUrlsResponse]:
permission = Permission(mode)
return signed_url.sync_detailed(
return signed_url_generic.sync_detailed(
client=self.backend,
body=CreateSignedUrlsRequest(
files=[FileToSign(path=path, project_identifier=project, permission=permission) for path in paths],
Expand Down
191 changes: 191 additions & 0 deletions src/neptune_scale/sync/google_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import asyncio
import random
from pathlib import Path

import backoff
import httpx
from httpx import AsyncClient

from neptune_scale.exceptions import NeptuneFileUploadTemporaryError
from neptune_scale.sync.parameters import (
HTTP_CLIENT_NETWORKING_TIMEOUT,
HTTP_REQUEST_MAX_TIME_SECONDS,
)
from neptune_scale.util import get_logger

__all__ = ["upload_to_gcp"]

logger = get_logger()

# Maximum number of retries for chunk upload. Note that this applies only to the actual data chunks.
# Fetching upload session and querying resume position is retried separately using @backoff.
MAX_RETRIES = 6


async def upload_to_gcp(file_path: str, content_type: str, signed_url: str, chunk_size: int = 16 * 1024 * 1024) -> None:
"""
Upload a file to Google Cloud Storage using a signed URL. The upload is done in chunks, and resumed in
case of a failure of a specific chunk upload.

Raises NeptuneFileUploadTemporaryError if a retryable error happens, otherwise any other non-retryable exception
that occurs.

chunk_size must be a multiple of 256 KiB (256 * 1024 bytes) (GCS requirement)
"""

logger.debug(f"Starting upload to GCS: {file_path}")

try:
async with AsyncClient(timeout=httpx.Timeout(timeout=HTTP_CLIENT_NETWORKING_TIMEOUT)) as client:
session_uri = await _fetch_session_uri(client, signed_url, content_type)

file_size = Path(file_path).stat().st_size
if file_size == 0:
await _upload_empty_file(client, session_uri)
return

await _upload_file(client, session_uri, file_path, file_size, chunk_size)
except httpx.RequestError as e:
raise NeptuneFileUploadTemporaryError() from e
except httpx.HTTPStatusError as e:
# Internal server errors (5xx) are temporary
if e.response.status_code // 100 == 5:
raise NeptuneFileUploadTemporaryError() from e
else:
raise

logger.debug(f"Finished upload to GCS: {file_path}")


async def _upload_file(client: AsyncClient, session_uri: str, file_path: str, file_size: int, chunk_size: int) -> None:
file_position = 0
num_retries = 0

with open(file_path, "rb") as file:
while file_position < file_size:
chunk = file.read(chunk_size)
if not chunk:
raise Exception("File truncated during upload")

try:
await _upload_chunk(client, session_uri, chunk, file_position, file_size)
file_position += len(chunk)
num_retries = 0
logger.debug(f"{file_position}/{file_size} bytes uploaded.")
except Exception as e:
logger.debug(f"Error uploading chunk: {e}")

# HTTP status errors that are not 5xx should not be retried
if isinstance(e, httpx.HTTPStatusError) and e.response.status_code // 100 != 5:
raise

num_retries += 1
if num_retries > MAX_RETRIES:
raise Exception("Max retries reached while uploading file to GCS") from e

# Retry after exponential backoff with jitter
sleep_time = (2**num_retries) + random.randint(0, 1000) / 1000.0
await asyncio.sleep(sleep_time)

file_position = await _query_resume_position(client, session_uri, file_size)
if file_position >= file_size:
break

file.seek(file_position)


def _is_retryable_httpx_error(exc: Exception) -> bool:
"""Used in @backoff.on_predicate to determine if an error is retryable. All network-related errors
and HTTP 500 errors are considered retryable."""
if isinstance(exc, httpx.RequestError):
return True
if isinstance(exc, httpx.HTTPStatusError) and exc.response.status_code // 100 == 5:
return True
return False


@backoff.on_predicate(backoff.expo, _is_retryable_httpx_error, max_time=HTTP_REQUEST_MAX_TIME_SECONDS)
async def _fetch_session_uri(client: AsyncClient, signed_url: str, content_type: str) -> str:
"""
Use the signed url provided by Neptune API to start a resumable upload session.
The actual data upload will use the returned session URI.

See https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
"""
headers = {
"X-goog-resumable": "start",
# Google docs say that X-Uploaded-Content-Type can be provided, but for some reason
# it doesn't work, however Content-Type works just fine.
"Content-Type": content_type,
"Host": "storage.googleapis.com",
}

response = await client.post(signed_url, headers=headers)
response.raise_for_status()

session_uri = response.headers.get("Location")
if session_uri is None:
raise ValueError("Session URI not found in response headers")

return str(session_uri)


async def _upload_chunk(client: AsyncClient, session_uri: str, chunk: bytes, start: int, file_size: int) -> None:
end = start + len(chunk) - 1 # -1 because Content-Range represents an inclusive range
headers = {
"Content-Length": str(len(chunk)),
"Content-Range": f"bytes {start}-{end}/{file_size}",
}

response = await client.put(session_uri, headers=headers, content=chunk)

if response.status_code in (308, 200, 201):
# 200 or 201 -> the upload is complete
# 308 -> chunk was saved: https://cloud.google.com/storage/docs/json_api/v1/status-codes#308_Resume_Incomplete
return

response.raise_for_status()


@backoff.on_predicate(backoff.expo, predicate=_is_retryable_httpx_error, max_time=HTTP_REQUEST_MAX_TIME_SECONDS)
async def _query_resume_position(client: AsyncClient, session_uri: str, file_size: int) -> int:
"""
Query Google Storage for the current upload position. If the upload is completed, return value larger
than file_size.

A request might've been processes by GCS correctly, but due to network issues we might not have
received the response -- so we always query the current position after a there is a chunk upload error.
"""

headers = {
"Content-Range": f"bytes */{file_size}",
"Content-Length": "0",
}

response = await client.put(session_uri, headers=headers)
# 2xx - upload already completed
if response.status_code // 100 == 2:
return file_size + 1
elif response.status_code == 308:
range_header = response.headers.get("Range")
if not range_header:
return 0 # Nothing uploaded yet

if range_header.startswith("bytes=0-"):
# Range header is 'bytes=0-LAST_BYTE_UPLOADED'
return int(range_header.split("-")[1]) + 1 # +1 to resume from the next byte
else:
raise ValueError(f"Unexpected Range header format received from server: {range_header}")
else:
response.raise_for_status()
return -1 # keep mypy happy, the above line will always raise because status code is not 2xx


@backoff.on_exception(backoff.expo, httpx.RequestError, max_time=HTTP_REQUEST_MAX_TIME_SECONDS)
async def _upload_empty_file(client: AsyncClient, session_uri: str) -> None:
headers = {
"Content-Range": "bytes */0",
"Content-Length": "0",
}
response = await client.put(session_uri, headers=headers)
response.raise_for_status()
33 changes: 24 additions & 9 deletions src/neptune_scale/sync/sync_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import azure.core.exceptions
from azure.core.pipeline.transport import AsyncioRequestsTransport
from azure.storage.blob.aio import BlobClient
from neptune_api.models import Provider

from neptune_scale.sync.google_storage import upload_to_gcp
from neptune_scale.sync.operations_repository import (
FileUploadRequest,
Metadata,
Expand Down Expand Up @@ -633,7 +635,11 @@ def work(self) -> None:
# Note that self._upload_file() should not raise an exception, as they are handled
# in the method itself. However, we still pass return_exceptions=True to make sure asyncio.gather()
# waits for all the tasks to finish regardless of any exceptions.
tasks = [self._upload_file(file, storage_urls[file.destination]) for file in file_upload_requests]
tasks = []
for file in file_upload_requests:
provider, storage_url = storage_urls[file.destination]
tasks.append(self._upload_file(file, provider, storage_url))

self._aio_loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))

logger.debug("Upload tasks completed for the current batch")
Expand All @@ -645,9 +651,15 @@ def work(self) -> None:
self.interrupt()
raise NeptuneSynchronizationStopped() from e

async def _upload_file(self, file: FileUploadRequest, storage_url: str) -> None:
async def _upload_file(self, file: FileUploadRequest, provider: str, storage_url: str) -> None:
try:
await upload_file(file.source_path, file.mime_type, storage_url)
if provider == Provider.AZURE:
await upload_to_azure(file.source_path, file.mime_type, storage_url)
elif provider == Provider.GCP:
await upload_to_gcp(file.source_path, file.mime_type, storage_url)
else:
raise NeptuneUnexpectedError(f"Unsupported file storage provider: {provider}")

if file.is_temporary:
logger.debug(f"Removing temporary file {file.source_path}")
Path(file.source_path).unlink(missing_ok=True)
Expand All @@ -671,8 +683,11 @@ def close(self) -> None:

@backoff.on_exception(backoff.expo, NeptuneRetryableError, max_time=HTTP_REQUEST_MAX_TIME_SECONDS)
@with_api_errors_handling
def fetch_file_storage_urls(client: ApiClient, project: str, destination_paths: list[str]) -> dict[str, str]:
"""Fetch Azure urls for storing files. Return a dict of target_path -> upload url"""
def fetch_file_storage_urls(
client: ApiClient, project: str, destination_paths: list[str]
) -> dict[str, tuple[str, str]]:
"""Fetch signed URLs for storing files. Returns a dict of target_path -> (provider, upload url)."""

logger.debug("Fetching file storage urls")
response = client.fetch_file_storage_urls(paths=destination_paths, project=project, mode="write")
status_code = response.status_code
Expand All @@ -682,11 +697,11 @@ def fetch_file_storage_urls(client: ApiClient, project: str, destination_paths:
if response.parsed is None:
raise NeptuneUnexpectedResponseError("Server response is empty")

return {file.path: file.url for file in response.parsed.files}
return {file.path: (file.provider, file.url) for file in response.parsed.files or []}


async def upload_file(local_path: str, mime_type: str, storage_url: str) -> None:
logger.debug(f"Start: upload file {local_path}")
async def upload_to_azure(local_path: str, mime_type: str, storage_url: str) -> None:
logger.debug(f"Starting upload to Azure: {local_path}")

try:
size_bytes = Path(local_path).stat().st_size
Expand All @@ -708,4 +723,4 @@ async def upload_file(local_path: str, mime_type: str, storage_url: str) -> None
except Exception as e:
raise e

logger.debug(f"Done: upload file {local_path}")
logger.debug(f"Finished upload to Azure: {local_path}")
15 changes: 7 additions & 8 deletions tests/e2e/test_fetcher/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
Optional,
)

from azure.storage.blob import BlobClient
from neptune_api.api.storage import signed_url
import requests
from neptune_api.api.storage import signed_url_generic
from neptune_api.client import AuthenticatedClient
from neptune_api.models import (
CreateSignedUrlsRequest,
Expand Down Expand Up @@ -126,7 +126,7 @@ def _fetch_signed_urls(
]
)

response = signed_url.sync_detailed(client=client, body=body)
response = signed_url_generic.sync_detailed(client=client, body=body)

data: CreateSignedUrlsResponse = response.parsed

Expand All @@ -138,8 +138,7 @@ def _download_from_url(
target_path: pathlib.Path,
) -> None:
target_path.parent.mkdir(parents=True, exist_ok=True)
with open(target_path, mode="wb") as opened:
blob_client = BlobClient.from_blob_url(signed_url)
download_stream = blob_client.download_blob()
for chunk in download_stream.chunks():
opened.write(chunk)
with open(target_path, mode="wb") as file:
response = requests.get(signed_url)
for chunk in response.iter_content(chunk_size=1024 * 1024 * 4):
file.write(chunk)
Loading
Loading