Skip to content

Add support for Google Cloud Storage #262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
10 changes: 6 additions & 4 deletions .github/workflows/tests-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ jobs:
runs-on: arc-runner-set
strategy:
matrix:
python-version: [ "3.9", "3.10", "3.11", "3.12", "3.13" ]
python-version: [ "3.9", "3.13" ]
env_target: [ "AZURE", "GCP" ]
steps:
- name: Checkout repository
uses: actions/checkout@v4
Expand All @@ -35,11 +36,12 @@ jobs:
- name: Run tests
timeout-minutes: 30
env:
NEPTUNE_API_TOKEN: ${{ secrets.E2E_API_TOKEN }}
NEPTUNE_PROJECT: ${{ secrets.E2E_PROJECT }}
NEPTUNE_E2E_PROJECT: ${{ secrets.E2E_PROJECT }}
NEPTUNE_API_TOKEN: ${{ secrets[format('E2E_API_TOKEN_{0}', matrix.env_target)] }}
NEPTUNE_PROJECT: ${{ secrets[format('E2E_PROJECT_{0}', matrix.env_target)] }}
NEPTUNE_E2E_PROJECT: ${{ secrets[format('E2E_PROJECT_{0}', matrix.env_target)] }}
NEPTUNE_E2E_CUSTOM_RUN_ID: ${{ vars.E2E_CUSTOM_RUN_ID }}
NEPTUNE_FILE_API_ENABLED: ${{ vars.NEPTUNE_FILE_API_ENABLED }}
NEPTUNE_ALLOW_SELF_SIGNED_CERTIFICATE: true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It this temporary or supposed to stay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is temporary until the test instance is finalized

run: pytest --junitxml="test-results/test-e2e-${{ matrix.python-version }}.xml" tests/e2e

- name: Upload test reports
Expand Down
2 changes: 2 additions & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@ pre-commit
pytest
pytest-timeout
pytest-xdist
pytest-asyncio
freezegun
numpy
neptune-api @ git+https://github.com/neptune-ai/neptune-api.git@dev/storage-v2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder: to be removed.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tqdm = "^4.21.0"
filetype = "^1.2.0"
azure-storage-blob = "^12.7.0"
requests = ">=2.21.0"
aiofiles = ">=22.1.0"

[tool.poetry]
name = "neptune-scale"
Expand Down Expand Up @@ -100,6 +101,8 @@ warn_unused_ignores = "True"
ignore_missing_imports = "True"

[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
addopts = "--doctest-modules -n auto"

[tool.poetry.scripts]
Expand Down
31 changes: 4 additions & 27 deletions src/neptune_scale/net/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
#
from __future__ import annotations

__all__ = ("HostedApiClient", "ApiClient", "backend_factory", "with_api_errors_handling")
__all__ = ("ApiClient", "with_api_errors_handling")

import abc
import functools
import os
from collections.abc import (
Expand All @@ -43,7 +42,7 @@
bulk_check_status,
ingest,
)
from neptune_api.api.storage import signed_url
from neptune_api.api.storage import signed_url_generic
from neptune_api.auth_helpers import exchange_api_key
from neptune_api.credentials import Credentials
from neptune_api.errors import (
Expand Down Expand Up @@ -169,25 +168,7 @@ def sanitize(value: Callable[[], str]) -> str:
return f"{package_name}/{package_version} ({additional_metadata_str})"


class ApiClient(abc.ABC):
@abc.abstractmethod
def submit(self, operation: RunOperation, family: str) -> Response[BinaryContent]: ...

@abc.abstractmethod
def check_batch(self, request_ids: list[str], project: str) -> Response[BinaryContent]: ...

def close(self) -> None: ...

@abc.abstractmethod
def fetch_file_storage_urls(
self,
paths: Iterable[str],
project: str,
mode: Literal["read", "write"],
) -> Response[CreateSignedUrlsResponse]: ...


class HostedApiClient(ApiClient):
class ApiClient:
def __init__(self, api_token: str) -> None:
credentials = Credentials.from_api_key(api_key=api_token)

Expand Down Expand Up @@ -228,18 +209,14 @@ def fetch_file_storage_urls(
mode: Literal["read", "write"],
) -> Response[CreateSignedUrlsResponse]:
permission = Permission(mode)
return signed_url.sync_detailed(
return signed_url_generic.sync_detailed(
client=self.backend,
body=CreateSignedUrlsRequest(
files=[FileToSign(path=path, project_identifier=project, permission=permission) for path in paths],
),
)


def backend_factory(api_token: str) -> ApiClient:
return HostedApiClient(api_token=api_token)


def with_api_errors_handling(func: Callable[..., Any]) -> Callable[..., Any]:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
Expand Down
6 changes: 3 additions & 3 deletions src/neptune_scale/net/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
NeptuneProjectAlreadyExists,
)
from neptune_scale.net.api_client import (
HostedApiClient,
ApiClient,
with_api_errors_handling,
)
from neptune_scale.util.envs import API_TOKEN_ENV_NAME
Expand Down Expand Up @@ -54,7 +54,7 @@ def create_project(
) -> None:
api_token = _get_api_token(api_token)

client = HostedApiClient(api_token=api_token)
client = ApiClient(api_token=api_token)
visibility = ProjectVisibility(visibility)

body = {
Expand Down Expand Up @@ -92,7 +92,7 @@ def _safe_json(response: httpx.Response) -> Any:


def get_project_list(*, api_token: Optional[str] = None) -> list[dict]:
client = HostedApiClient(api_token=_get_api_token(api_token))
client = ApiClient(api_token=_get_api_token(api_token))

params = {
"userRelation": "viewerOrHigher",
Expand Down
165 changes: 165 additions & 0 deletions src/neptune_scale/sync/google_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
from pathlib import Path

import aiofiles
import backoff
import httpx
from httpx import AsyncClient

from neptune_scale.exceptions import NeptuneFileUploadTemporaryError
from neptune_scale.sync.parameters import (
HTTP_CLIENT_NETWORKING_TIMEOUT,
HTTP_REQUEST_MAX_TIME_SECONDS,
)
from neptune_scale.util import get_logger

__all__ = ["upload_to_gcp"]

logger = get_logger()


async def upload_to_gcp(file_path: str, content_type: str, signed_url: str, chunk_size: int = 16 * 1024 * 1024) -> None:
"""
Upload a file to Google Cloud Storage using a signed URL. The upload is done in chunks, and resumed in
case of a failure of a specific chunk upload.

Raises NeptuneFileUploadTemporaryError if a retryable error happens, otherwise any other non-retryable exception
that occurs.

chunk_size must be a multiple of 256 KiB (256 * 1024 bytes) (GCS requirement)
"""

logger.debug(f"Starting upload to GCS: {file_path}, {content_type=}, {chunk_size=}")

try:
async with AsyncClient(timeout=httpx.Timeout(timeout=HTTP_CLIENT_NETWORKING_TIMEOUT)) as client:
session_uri = await _fetch_session_uri(client, signed_url, content_type)

file_size = Path(file_path).stat().st_size
if file_size == 0:
await _upload_empty_file(client, session_uri)
return

await _upload_file(client, session_uri, file_path, file_size, chunk_size)
except httpx.RequestError as e:
logger.debug(f"Temporary error while uploading {file_path}: {e}")
raise NeptuneFileUploadTemporaryError() from e
except httpx.HTTPStatusError as e:
logger.debug(f"HTTP {e.response.status_code} error while uploading {file_path}: {e}, {e.response.content=!r}")
if _is_retryable_httpx_error(e):
raise NeptuneFileUploadTemporaryError() from e
else:
raise

logger.debug(f"Finished upload to GCS: {file_path}")


async def _upload_file(client: AsyncClient, session_uri: str, file_path: str, file_size: int, chunk_size: int) -> None:
file_position = 0

async with aiofiles.open(file_path, "rb") as file:
while file_position < file_size:
chunk = await file.read(chunk_size)
if not chunk:
raise Exception("File truncated during upload")

upload_position = await _upload_chunk(client, session_uri, chunk, file_position, file_size)
file_position += len(chunk)

# If the server confirmed less that we uploaded, we need to track back to the reported position.
if file_position != upload_position:
logger.debug(
f"Server returned a different upload position: {file_position=}, {upload_position=}. "
f"Resuming from {upload_position}."
)
await file.seek(upload_position)
file_position = upload_position

logger.debug(f"{file_position}/{file_size} bytes uploaded.")


def _is_retryable_httpx_error(exc: Exception) -> bool:
"""Used to determine if an error is retryable. Retryable errors are:
- All network-related errors
- HTTP 5xx errors
- HTTP 429 Too Many Requests
- HTTP 400 Bad Request which covers errors related to signed URLs, see:
https://cloud.google.com/storage/docs/xml-api/reference-status#400%E2%80%94bad
"""
if isinstance(exc, httpx.RequestError):
return True
if isinstance(exc, httpx.HTTPStatusError):
status_code = exc.response.status_code
return status_code in (400, 429) or status_code // 100 == 5

return False


@backoff.on_predicate(backoff.expo, _is_retryable_httpx_error, max_time=HTTP_REQUEST_MAX_TIME_SECONDS)
async def _fetch_session_uri(client: AsyncClient, signed_url: str, content_type: str) -> str:
"""
Use the signed url provided by Neptune API to start a resumable upload session.
The actual data upload will use the returned session URI.

See https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload
"""
headers = {
"X-goog-resumable": "start",
# Google docs say that X-Upload-Content-Type should be provided to specify the content type of the file,
# but it does not work. Setting Content-Type header does work.
"Content-Type": content_type,
}

response = await client.post(signed_url, headers=headers)
response.raise_for_status()

session_uri = response.headers.get("Location")
if session_uri is None:
raise ValueError("Session URI not found in response headers")

return str(session_uri)


@backoff.on_predicate(backoff.expo, _is_retryable_httpx_error, max_time=HTTP_REQUEST_MAX_TIME_SECONDS)
async def _upload_chunk(client: AsyncClient, session_uri: str, chunk: bytes, start: int, file_size: int) -> int:
"""Upload a chunk of data indicating the start-end position and total size. Returns the total number of bytes
already uploaded to the server in a given session URI.

Note that the returned value could be smaller than the number of bytes uploaded so far, so we always need
to use the returned position to determine start position of the next chunk.
"""

end = start + len(chunk) - 1 # -1 because Content-Range represents an inclusive range
headers = {
"Content-Length": str(len(chunk)),
"Content-Range": f"bytes {start}-{end}/{file_size}",
}

response = await client.put(session_uri, headers=headers, content=chunk)

# 308 -> chunk was saved: https://cloud.google.com/storage/docs/json_api/v1/status-codes#308_Resume_Incomplete
if response.status_code == 308:
range_header = response.headers.get("Range")
# Nothing uploaded yet
if range_header is None:
return 0
elif range_header.startswith("bytes=0-"):
# Range header is 'bytes=0-LAST_BYTE_UPLOADED'. LAST_BYTE_UPLOADED is inclusive, so we need to add 1.
return int(range_header.split("-")[1]) + 1
else:
raise ValueError(f"Unexpected Range header format received from server: `{range_header}`")
# 2xx -> the upload is complete
elif response.status_code // 100 == 2:
return file_size

response.raise_for_status()
return -1 # keep mypy happy, the above line will always raise because status code is not 2xx or 308


@backoff.on_exception(backoff.expo, httpx.RequestError, max_time=HTTP_REQUEST_MAX_TIME_SECONDS)
async def _upload_empty_file(client: AsyncClient, session_uri: str) -> None:
headers = {
"Content-Range": "bytes */0",
"Content-Length": "0",
}
response = await client.put(session_uri, headers=headers)
response.raise_for_status()
Loading
Loading