Skip to content

feature: Implement saving operations to / reading from the on-disk storage #149

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 7, 2025
12 changes: 7 additions & 5 deletions src/neptune_scale/api/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@
)
from neptune_scale.exceptions import (
NeptuneApiTokenNotProvided,
NeptuneConflictingDataInLocalStorage,
NeptuneProjectNotProvided,
NeptuneRunConflicting,
NeptuneRunDuplicate,
)
from neptune_scale.net.serialization import (
datetime_to_proto,
Expand Down Expand Up @@ -280,12 +279,15 @@ def _check_for_run_conflicts(
) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method should become something like _validate_existing_db() and within it there should be:

  1. Check if the version matches. If not -> NeptuneLocalStorageInUnsupportedVersion
  2. Check if all other metadata matches. If not -> NeptuneConflictingDataInLocalStorage; if it does -> warning.

if existing_metadata.project != self._project or existing_metadata.run_id != self._run_id:
# should never happen because we use project and run_id to create the repository path
raise NeptuneRunConflicting()
raise NeptuneConflictingDataInLocalStorage()
if existing_metadata.parent_run_id == fork_run_id and existing_metadata.fork_step == fork_step:
raise NeptuneRunDuplicate()
logger.warning(
"Run already exists in local storage with the same parent run and fork point. Resuming the run."
)
return
else:
# Same run_id but different fork points
raise NeptuneRunConflicting()
raise NeptuneConflictingDataInLocalStorage()

def _on_child_link_closed(self, _: ProcessLink) -> None:
with self._lock:
Expand Down
12 changes: 12 additions & 0 deletions src/neptune_scale/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
"NeptuneApiTokenNotProvided",
"NeptuneTooManyRequestsResponseError",
"NeptunePreviewStepNotAfterLastCommittedStep",
"NeptuneConflictingDataInLocalStorage",
"NeptuneLocalStorageInUnsupportedVersion",
)

from typing import Any
Expand Down Expand Up @@ -493,3 +495,13 @@ class NeptunePreviewStepNotAfterLastCommittedStep(NeptuneScaleError):
the last fully committed (complete) update. Once a complete value is recorded, any preview updates
must only be added for later steps. Please adjust the order of your updates and try again.
"""


class NeptuneLocalStorageInUnsupportedVersion(NeptuneScaleError):
message = """The local storage database is in an unsupported version.
This may happen when you try to use a database created with a newer version of Neptune Scale with an older version of the library.
Please either upgrade Neptune Scale to the latest version or create a new local storage database."""


class NeptuneConflictingDataInLocalStorage(NeptuneScaleError):
message = """NeptuneConflictingDataInLocalStorage: Conflicting data found in local storage."""
4 changes: 4 additions & 0 deletions src/neptune_scale/sync/operations_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from pathlib import Path

from neptune_scale.exceptions import NeptuneLocalStorageInUnsupportedVersion
from neptune_scale.sync.parameters import MAX_SINGLE_OPERATION_SIZE_BYTES

__all__ = ("OperationsRepository", "OperationType", "Operation", "Metadata", "SequenceId")
Expand Down Expand Up @@ -258,6 +259,9 @@ def get_metadata(self) -> Optional[Metadata]:

version, project, run_id, parent_run_id, fork_step = row

if version != DB_VERSION:
raise NeptuneLocalStorageInUnsupportedVersion()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should move the validation to constructor or dedicated method

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a comment elsewhere - let's remove it from here.


return Metadata(
version=version, project=project, run_id=run_id, parent_run_id=parent_run_id, fork_step=fork_step
)
Expand Down
2 changes: 0 additions & 2 deletions tests/e2e/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import logging
import os
import uuid

os.environ["NEPTUNE_DEBUG_MODE"] = "1"
from datetime import (
datetime,
timezone,
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/sync/test_operations_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from neptune_scale.sync.operations_repository import (
Metadata,
NeptuneLocalStorageInUnsupportedVersion,
OperationsRepository,
OperationType,
)
Expand Down Expand Up @@ -278,6 +279,23 @@ def test_save_update_run_snapshots_too_large(operations_repo):
)


def test_unsupported_version_error(operations_repo, temp_db_path):
# Given - create a database with an unsupported version
operations_repo.init_db()
operations_repo.save_metadata(project="test", run_id="test")

conn = sqlite3.connect(temp_db_path)
conn.execute("UPDATE metadata SET version = 'unsupported_version'")
conn.commit()
conn.close()

# When/Then - trying to initialize with an existing DB with wrong version should raise an error
with pytest.raises(NeptuneLocalStorageInUnsupportedVersion):
operations_repo = OperationsRepository(db_path=Path(temp_db_path))
operations_repo.init_db()
operations_repo.get_metadata()


def get_operation_count(db_path: str) -> int:
conn = sqlite3.connect(db_path)
try:
Expand Down
17 changes: 8 additions & 9 deletions tests/unit/test_run_resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
import pytest

from neptune_scale import Run
from neptune_scale.exceptions import (
NeptuneRunConflicting,
NeptuneRunDuplicate,
)
from neptune_scale.exceptions import NeptuneConflictingDataInLocalStorage


def test_resume_false_with_matching_fork_point(api_token):
def test_resume_false_with_matching_fork_point(api_token, caplog):
project = "workspace/project"
run_id = str(uuid.uuid4())
fork_run_id = "parent-run"
Expand All @@ -27,16 +24,18 @@ def test_resume_false_with_matching_fork_point(api_token):
pass

# Then try to create the same run again without resume
with pytest.raises(NeptuneRunDuplicate):
Run(
with caplog.at_level("WARNING"):
with Run(
project=project,
api_token=api_token,
run_id=run_id,
resume=False,
mode="disabled",
fork_run_id=fork_run_id,
fork_step=fork_step,
)
):
pass
assert "Run already exists in local storage" in caplog.text

# Then try to use the same run_id with a different project
with Run(
Expand Down Expand Up @@ -64,7 +63,7 @@ def test_resume_false_with_conflicting_fork_point(
pass

# Then try to create the same run but with a different fork point
with pytest.raises(NeptuneRunConflicting):
with pytest.raises(NeptuneConflictingDataInLocalStorage):
Run(
project=project,
api_token=api_token,
Expand Down
Loading