Skip to content

Dev/minimal flow #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added minimal Run classes ([#6](https://github.com/neptune-ai/neptune-client-scale/pull/6))
- Added support for `max_queue_size` and `max_queue_size_exceeded_callback` parameters in `Run` ([#7](https://github.com/neptune-ai/neptune-client-scale/pull/7))
- Added support for logging metadata ([#8](https://github.com/neptune-ai/neptune-client-scale/pull/8))
- Added support for `creation_time` ([#9](https://github.com/neptune-ai/neptune-client-scale/pull/9))
- Added support for Forking ([#9](https://github.com/neptune-ai/neptune-client-scale/pull/9))
- Added support for Experiments ([#9](https://github.com/neptune-ai/neptune-client-scale/pull/9))
- Added support for Run resume ([#9](https://github.com/neptune-ai/neptune-client-scale/pull/9))
82 changes: 79 additions & 3 deletions src/neptune_scale/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,21 @@
from datetime import datetime
from typing import Callable

from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import ForkPoint
from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import Run as CreateRun
from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation

from neptune_scale.api.api_client import ApiClient
from neptune_scale.core.components.abstract import (
Resource,
WithResources,
)
from neptune_scale.core.components.operations_queue import OperationsQueue
from neptune_scale.core.message_builder import MessageBuilder
from neptune_scale.core.proto_utils import (
datetime_to_proto,
make_step,
)
from neptune_scale.core.validation import (
verify_collection_type,
verify_max_length,
Expand All @@ -43,6 +52,11 @@ def __init__(
api_token: str,
family: str,
run_id: str,
resume: bool = False,
as_experiment: str | None = None,
creation_time: datetime | None = None,
from_run_id: str | None = None,
from_step: int | float | None = None,
max_queue_size: int = MAX_QUEUE_SIZE,
max_queue_size_exceeded_callback: Callable[[int, BaseException], None] | None = None,
) -> None:
Expand All @@ -55,6 +69,11 @@ def __init__(
family: Identifies related runs. For example, the same value must apply to all runs within a run hierarchy.
Max length: 128 characters.
run_id: Unique identifier of a run. Must be unique within the project. Max length: 128 characters.
resume: Whether to resume an existing run.
as_experiment: If creating a run as an experiment, ID of an experiment to be associated with the run.
creation_time: Custom creation time of the run.
from_run_id: If forking from an existing run, ID of the run to fork from.
from_step: If forking from an existing run, step number to fork from.
max_queue_size: Maximum number of operations in a queue.
max_queue_size_exceeded_callback: Callback function triggered when a queue is full.
Accepts two arguments:
Expand All @@ -64,41 +83,96 @@ def __init__(
verify_type("api_token", api_token, str)
verify_type("family", family, str)
verify_type("run_id", run_id, str)
verify_type("resume", resume, bool)
verify_type("as_experiment", as_experiment, (str, type(None)))
verify_type("creation_time", creation_time, (datetime, type(None)))
verify_type("from_run_id", from_run_id, (str, type(None)))
verify_type("from_step", from_step, (int, float, type(None)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we make from_step obligatory when from_run_id is provided, and the other way around? Otherwise _create_run will silently ignore forking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isin't it covered by line 128?

        if (from_run_id is not None and from_step is None) or (from_run_id is None and from_step is not None):
            raise ValueError("`from_run_id` and `from_step` must be used together.")

verify_type("max_queue_size", max_queue_size, int)
verify_type("max_queue_size_exceeded_callback", max_queue_size_exceeded_callback, (Callable, type(None)))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could check for > 1 here or add verify_int() with eg. positive=True argument

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do

if resume and creation_time is not None:
raise ValueError("`resume` and `creation_time` cannot be used together.")
if resume and as_experiment is not None:
raise ValueError("`resume` and `as_experiment` cannot be used together.")
if (from_run_id is not None and from_step is None) or (from_run_id is None and from_step is not None):
raise ValueError("`from_run_id` and `from_step` must be used together.")
if resume and from_run_id is not None:
raise ValueError("`resume` and `from_run_id` cannot be used together.")
if resume and from_step is not None:
raise ValueError("`resume` and `from_step` cannot be used together.")

verify_non_empty("api_token", api_token)
verify_non_empty("family", family)
verify_non_empty("run_id", run_id)
if as_experiment is not None:
verify_non_empty("as_experiment", as_experiment)
if from_run_id is not None:
verify_non_empty("from_run_id", from_run_id)

verify_project_qualified_name("project", project)

verify_max_length("family", family, MAX_FAMILY_LENGTH)
verify_max_length("run_id", run_id, MAX_RUN_ID_LENGTH)

self._project: str = project
self._api_token: str = api_token
self._family: str = family
self._run_id: str = run_id

self._lock = threading.RLock()
self._operations_queue: OperationsQueue = OperationsQueue(
lock=self._lock, max_size=max_queue_size, max_size_exceeded_callback=max_queue_size_exceeded_callback
)
self._backend: ApiClient = ApiClient(api_token=api_token)

if not resume:
self._create_run(
creation_time=datetime.now() if creation_time is None else creation_time,
as_experiment=as_experiment,
from_run_id=from_run_id,
from_step=from_step,
)

def __enter__(self) -> Run:
return self

@property
def resources(self) -> tuple[Resource, ...]:
return (self._operations_queue,)
return self._operations_queue, self._backend

def close(self) -> None:
"""
Stops the connection to Neptune and synchronizes all data.
"""
super().close()

def _create_run(
self,
creation_time: datetime,
as_experiment: str | None,
from_run_id: str | None,
from_step: int | float | None,
) -> None:
fork_point: ForkPoint | None = None
if from_run_id is not None and from_step is not None:
fork_point = ForkPoint(
parent_project=self._project, parent_run_id=from_run_id, step=make_step(number=from_step)
)

operation = RunOperation(
project=self._project,
run_id=self._run_id,
create=CreateRun(
family=self._family,
fork_point=fork_point,
experiment_id=as_experiment,
creation_time=None if creation_time is None else datetime_to_proto(creation_time),
),
)
self._backend.submit(operation=operation)
# TODO: Enqueue on the operations queue
# self._operations_queue.enqueue(operation=operation)

def log(
self,
step: float | int | None = None,
Expand Down Expand Up @@ -161,4 +235,6 @@ def log(
add_tags=add_tags,
remove_tags=remove_tags,
):
self._operations_queue.enqueue(operation=operation)
self._backend.submit(operation=operation)
# TODO: Enqueue on the operations queue
# self._operations_queue.enqueue(operation=operation)
Empty file.
87 changes: 87 additions & 0 deletions src/neptune_scale/api/api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#
# Copyright (c) 2024, Neptune Labs Sp. z o.o.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations

__all__ = ["ApiClient"]


from dataclasses import dataclass

from neptune_api import (
AuthenticatedClient,
Client,
)
from neptune_api.api.backend import get_client_config
from neptune_api.api.data_ingestion import submit_operation
from neptune_api.auth_helpers import exchange_api_key
from neptune_api.credentials import Credentials
from neptune_api.models import (
ClientConfig,
Error,
)
from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation

from neptune_scale.core.components.abstract import Resource


class ApiClient(Resource):
def __init__(self, api_token: str) -> None:
credentials = Credentials.from_api_key(api_key=api_token)
config, token_urls = get_config_and_token_urls(credentials=credentials)
self._backend = create_auth_api_client(credentials=credentials, config=config, token_refreshing_urls=token_urls)

def submit(self, operation: RunOperation) -> None:
_ = submit_operation.sync(client=self._backend, body=operation)

def cleanup(self) -> None:
pass

def close(self) -> None:
self._backend.__exit__()


@dataclass
class TokenRefreshingURLs:
authorization_endpoint: str
token_endpoint: str

@classmethod
def from_dict(cls, data: dict) -> TokenRefreshingURLs:
return TokenRefreshingURLs(
authorization_endpoint=data["authorization_endpoint"], token_endpoint=data["token_endpoint"]
)


def get_config_and_token_urls(*, credentials: Credentials) -> tuple[ClientConfig, TokenRefreshingURLs]:
with Client(base_url=credentials.base_url) as client:
config = get_client_config.sync(client=client)
if config is None or isinstance(config, Error):
raise RuntimeError(f"Failed to get client config: {config}")
response = client.get_httpx_client().get(config.security.open_id_discovery)
token_urls = TokenRefreshingURLs.from_dict(response.json())
return config, token_urls


def create_auth_api_client(
*, credentials: Credentials, config: ClientConfig, token_refreshing_urls: TokenRefreshingURLs
) -> AuthenticatedClient:
return AuthenticatedClient(
base_url=credentials.base_url,
credentials=credentials,
client_id=config.security.client_id,
token_refreshing_endpoint=token_refreshing_urls.token_endpoint,
api_key_exchange_callback=exchange_api_key,
)
34 changes: 5 additions & 29 deletions src/neptune_scale/core/message_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@

from datetime import datetime

from google.protobuf.timestamp_pb2 import Timestamp
from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import (
SET_OPERATION,
ModifySet,
Step,
StringSet,
UpdateRunSnapshot,
Value,
)
from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation

from neptune_scale.core.proto_utils import (
datetime_to_proto,
make_step,
)


class MessageBuilder:
def __init__(
Expand Down Expand Up @@ -91,30 +94,3 @@ def make_value(value: Value | float | str | int | bool | datetime | list[str] |
return fv
else:
raise ValueError(f"Unsupported ingest field value type: {type(value)}")


def datetime_to_proto(dt: datetime) -> Timestamp:
dt_ts = dt.timestamp()
return Timestamp(seconds=int(dt_ts), nanos=int((dt_ts % 1) * 1e9))


def make_step(number: float | int, raise_on_step_precision_loss: bool = False) -> Step:
"""
Converts a number to protobuf Step value. Example:
>>> assert make_step(7.654321, True) == Step(whole=7, micro=654321)
Args:
number: step expressed as number
raise_on_step_precision_loss: inform converter whether it should silently drop precision and
round down to 6 decimal places or raise an error.

Returns: Step protobuf used in Neptune API.
"""
m = int(1e6)
micro: int = int(number * m)
if raise_on_step_precision_loss and number * m - micro != 0:
raise ValueError(f"step must not use more than 6-decimal points, got: {number}")

whole = micro // m
micro = micro % m

return Step(whole=whole, micro=micro)
35 changes: 35 additions & 0 deletions src/neptune_scale/core/proto_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from __future__ import annotations

__all__ = ("datetime_to_proto", "make_step")

from datetime import datetime

from google.protobuf.timestamp_pb2 import Timestamp
from neptune_api.proto.neptune_pb.ingest.v1.common_pb2 import Step


def datetime_to_proto(dt: datetime) -> Timestamp:
dt_ts = dt.timestamp()
return Timestamp(seconds=int(dt_ts), nanos=int((dt_ts % 1) * 1e9))


def make_step(number: float | int, raise_on_step_precision_loss: bool = False) -> Step:
"""
Converts a number to protobuf Step value. Example:
>>> assert make_step(7.654321, True) == Step(whole=7, micro=654321)
Args:
number: step expressed as number
raise_on_step_precision_loss: inform converter whether it should silently drop precision and
round down to 6 decimal places or raise an error.

Returns: Step protobuf used in Neptune API.
"""
m = int(1e6)
micro: int = int(number * m)
if raise_on_step_precision_loss and number * m - micro != 0:
raise ValueError(f"step must not use more than 6-decimal points, got: {number}")

whole = micro // m
micro = micro % m

return Step(whole=whole, micro=micro)
Loading
Loading