Skip to content

File upload support #103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b41bcb1
Rename `InternalQueueFeederThread`
kgodlewski Dec 6, 2024
bc10ae6
Add `.sync.files` subpackage
kgodlewski Dec 12, 2024
246cb2e
`verify_max_length` now accepts `bytes`
kgodlewski Dec 12, 2024
7c07ce7
Add `Run.log_file()`
kgodlewski Dec 12, 2024
12628b4
Support file uploads in dict API
kgodlewski Dec 12, 2024
8b21657
Improve how we detect unsupported kwargs in dict API
kgodlewski Dec 12, 2024
4d7c125
`SharedVar.wait*()` now return `bool`
kgodlewski Dec 13, 2024
b2c8458
Add very basic tests for file uploads
kgodlewski Dec 13, 2024
e8bde2b
Add `util.arg_to_datetime()`
kgodlewski Dec 17, 2024
ee68611
Add `ApiClient.fetch_file_storage_info()`
kgodlewski Dec 17, 2024
97e6ebc
Move `code_to_exception()` to a new module
kgodlewski Dec 17, 2024
d2f1235
Add timestamp to file upload queue message
kgodlewski Dec 17, 2024
1f46949
File upload thread now submits the upload result
kgodlewski Dec 17, 2024
2dc1f87
Add `NEPTUNE_LOG_TRACEBACKS` env variable.
kgodlewski Dec 17, 2024
bac7729
Decrement active upload count on submit error
kgodlewski Dec 17, 2024
a69d10e
Update file upload tests
kgodlewski Dec 17, 2024
125a584
Use `with` when opening a file for upload
kgodlewski Dec 17, 2024
eda4ca8
Make `SharedVar` tests wait a bit longer in case of slow process startup
kgodlewski Dec 17, 2024
ad2b98f
Make `NeptuneScaleError.message` an instance attribute
kgodlewski Dec 17, 2024
0be4edb
Update file upload tests
kgodlewski Dec 17, 2024
c88f662
Add more tests for file uploads
kgodlewski Dec 17, 2024
6261cde
Python 3.9 typehints fixes
kgodlewski Dec 18, 2024
79f8dec
Fix a test on Windows
kgodlewski Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 107 additions & 11 deletions src/neptune_scale/api/attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Iterator,
)
from datetime import datetime
from pathlib import Path
from typing import (
Any,
Callable,
Expand All @@ -14,23 +15,49 @@
cast,
)

from neptune_scale.api.validation import (
verify_max_length,
verify_non_empty,
verify_type,
)
from neptune_scale.sync.files.queue import FileUploadQueue
from neptune_scale.sync.metadata_splitter import MetadataSplitter
from neptune_scale.sync.operations_queue import OperationsQueue
from neptune_scale.sync.parameters import MAX_FILE_UPLOAD_BUFFER_SIZE
from neptune_scale.sync.util import arg_to_datetime

__all__ = ("Attribute", "AttributeStore")


def warn_unsupported_params(fn: Callable) -> Callable:
# Perform some simple heuristics to detect if a method is called with parameters
# that are not supported by Scale
def _extract_named_kwargs(fn: Callable) -> set[str]:
"""Return a set of named arguments of a function, that are not positional-only."""
import inspect

sig = inspect.signature(fn)
kwargs = {
p.name
for p in sig.parameters.values()
if p.kind in {inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY}
}

return kwargs


def warn_unsupported_kwargs(fn: Callable) -> Callable:
"""Perform some simple heuristics to detect if a method is called with parameters that are not supported by
Scale. Some methods in the old client accepted a **kwargs argument, which we currently do not inspect in any
way, so it's important to notify the user that an argument is being ignored.
"""

warn = functools.partial(warnings.warn, stacklevel=3)
known_kwargs = _extract_named_kwargs(fn)

@functools.wraps(fn)
def wrapper(*args, **kwargs): # type: ignore
if kwargs.get("wait") is not None:
warn("The `wait` parameter is not yet implemented and will be ignored.")

extra_kwargs = set(kwargs.keys()) - {"wait", "step", "timestamp", "steps", "timestamps"}
extra_kwargs = set(kwargs.keys()) - known_kwargs
if extra_kwargs:
warn(
f"`{fn.__name__}()` was called with additional keyword argument(s): `{', '.join(extra_kwargs)}`. "
Expand All @@ -54,11 +81,14 @@ class AttributeStore:
end consuming the queue (which would be SyncProcess).
"""

def __init__(self, project: str, run_id: str, operations_queue: OperationsQueue) -> None:
def __init__(
self, project: str, run_id: str, operations_queue: OperationsQueue, file_upload_queue: FileUploadQueue
) -> None:
self._project = project
self._run_id = run_id
self._operations_queue = operations_queue
self._attributes: dict[str, Attribute] = {}
self._file_upload_queue = file_upload_queue

def __getitem__(self, path: str) -> "Attribute":
path = cleanup_path(path)
Expand Down Expand Up @@ -92,7 +122,7 @@ def log(
project=self._project,
run_id=self._run_id,
step=step,
timestamp=timestamp,
timestamp=arg_to_datetime(timestamp),
configs=configs,
metrics=metrics,
add_tags=tags_add,
Expand All @@ -102,6 +132,24 @@ def log(
for operation, metadata_size in splitter:
self._operations_queue.enqueue(operation=operation, size=metadata_size, key=step)

def upload_file(
self,
attribute_path: str,
local_path: Optional[Path],
data: Optional[Union[str, bytes]],
target_basename: Optional[str],
target_path: Optional[str],
timestamp: Optional[Union[float, datetime]] = None,
) -> None:
self._file_upload_queue.submit(
timestamp=arg_to_datetime(timestamp),
attribute_path=attribute_path,
local_path=local_path,
data=data.encode("utf-8") if isinstance(data, str) else data,
target_basename=target_basename,
target_path=target_path,
)


class Attribute:
"""Objects of this class are returned on dict-like access to Run. Attributes have a path and
Expand All @@ -118,12 +166,12 @@ def __init__(self, store: AttributeStore, path: str) -> None:
self._path = path

# TODO: typehint value properly
@warn_unsupported_params
@warn_unsupported_kwargs
def assign(self, value: Any, *, wait: bool = False) -> None:
data = accumulate_dict_values(value, self._path)
self._store.log(configs=data)

@warn_unsupported_params
@warn_unsupported_kwargs
def append(
self,
value: Union[dict[str, Any], float],
Expand All @@ -136,23 +184,23 @@ def append(
data = accumulate_dict_values(value, self._path)
self._store.log(metrics=data, step=step, timestamp=timestamp)

@warn_unsupported_params
@warn_unsupported_kwargs
# TODO: this should be Iterable in Run as well
# def add(self, values: Union[str, Iterable[str]], *, wait: bool = False) -> None:
def add(self, values: Union[str, Union[list[str], set[str], tuple[str]]], *, wait: bool = False) -> None:
if isinstance(values, str):
values = (values,)
self._store.log(tags_add={self._path: values})

@warn_unsupported_params
@warn_unsupported_kwargs
# TODO: this should be Iterable in Run as well
# def remove(self, values: Union[str, Iterable[str]], *, wait: bool = False) -> None:
def remove(self, values: Union[str, Union[list[str], set[str], tuple[str]]], *, wait: bool = False) -> None:
if isinstance(values, str):
values = (values,)
self._store.log(tags_remove={self._path: values})

@warn_unsupported_params
@warn_unsupported_kwargs
def extend(
self,
values: Collection[Union[float, int]],
Expand All @@ -173,6 +221,54 @@ def extend(
for value, step, timestamp in zip(values, steps, timestamps):
self.append(value, step=step, timestamp=timestamp, wait=wait)

@warn_unsupported_kwargs
def upload(
self,
path: Optional[str] = None,
*,
data: Optional[Union[str, bytes]] = None,
mime_type: Optional[str] = None,
target_basename: Optional[str] = None,
target_path: Optional[str] = None,
timestamp: Optional[Union[float, datetime]] = None,
wait: bool = False,
) -> None:
verify_type("path", path, (str, type(None)))

if data is not None:
verify_type("data", data, (str, bytes, type(None)))
verify_max_length("data", data, MAX_FILE_UPLOAD_BUFFER_SIZE)

verify_type("mime_type", mime_type, (str, type(None)))
verify_type("target_basename", target_basename, (str, type(None)))
verify_type("target_path", target_path, (str, type(None)))

if path is None and data is None:
raise ValueError("Either `path` or `data` must be provided")

if path is not None and data is not None:
raise ValueError("Only one of `path` or `data` can be provided")

local_path: Optional[Path] = None
if path:
verify_non_empty("path", path)

local_path = Path(path)
if not local_path.exists():
raise FileNotFoundError(f"Path `{path}` does not exist")

if not local_path.is_file():
raise ValueError(f"Path `{path}` is not a file")

self._store.upload_file(
attribute_path=self._path,
local_path=local_path,
data=data.encode("utf-8") if isinstance(data, str) else data,
target_basename=target_basename,
target_path=target_path,
timestamp=timestamp,
)

# TODO: add value type validation to all the methods
# TODO: change Run API to typehint timestamp as Union[datetime, float]

Expand Down
Loading
Loading