Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,43 @@ Response on success looks like this:
}
```


## PUT /backup/{stream_id}/preserve

Create a request for updating the preservation status of a backup. Request body must be like this:

```
{
"preserve_until": "2023-09-01T00:00:0000",
"wait_for_applied_preservation": 3.0,
}
```

**stream_id**

Identifier of this backup.

**preserve_until**

Optional datetime value in ISO format for keeping the backup from being deleted.
If a valid value is provided, the backup will be preserved until the specified datetime.
If not provided or has a null value, the backup can be deleted due to old age.

**wait_for_applied_preservation**

Optional amount of time to the wait for the preservation request to be effectively executed. The
operation will block for up to as many seconds as specified by this parameter.


Response on success looks like this:

```
{
"success": true
}
```


## PUT /replication_state

This call can be used to inform MyHoard of the executed GTIDs on other servers
Expand Down
29 changes: 29 additions & 0 deletions myhoard/backup_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class State(TypedDict):
normalized_backup_time: Optional[str]
pending_binlogs: List[BinlogInfo]
prepare_details: Dict
preserved_info: Dict
# Set of GTIDs that have been stored persistently to file storage.
remote_gtid_executed: GtidExecuted
remote_read_errors: int
Expand Down Expand Up @@ -238,6 +239,7 @@ def __init__(
"normalized_backup_time": normalized_backup_time,
"pending_binlogs": list(binlogs) if binlogs else [],
"prepare_details": {},
"preserved_info": {},
# Set of GTIDs that have been stored persistently to file storage.
"remote_gtid_executed": {},
"remote_read_errors": 0,
Expand Down Expand Up @@ -443,6 +445,33 @@ def mark_as_closed(self) -> None:
)
self.wakeup_event.set()

def mark_preservation(self, preserve_until: Optional[datetime]) -> None:
if preserve_until:
self.log.info("Marking stream %s preservation to %s.", self.stream_id, preserve_until)
else:
self.log.info("Removing preservation for stream %s.", self.stream_id)

preserved_info = {
"preserve_until": preserve_until.isoformat() if preserve_until else None,
"server_id": self.server_id,
}

try:
key = self._build_full_name("preserved.json")
data = json.dumps(preserved_info)
metadata = make_fs_metadata(preserved_info)

file_storage = self.file_storage_setup_fn()
file_storage.store_file_from_memory(key, data.encode("utf-8"), metadata=metadata)
self.state_manager.update_state(preserved_info=preserved_info)
except Exception as ex: # pylint: disable=broad-except
self.log.exception("Failed to update basebackup preservation")
self.stats.unexpected_exception(ex=ex, where="BackupStream._update_basebackup_preservation")
self.state_manager.update_state(remote_write_errors=self.state["remote_write_errors"] + 1)
self.stats.increase("myhoard.remote_write_errors")
# raise exception since update on preservation did not succeed
raise

def _handle_pending_mark_as_closed(self) -> None:
assert self.file_storage is not None
self.log.info("Handling pending mark as closed request for stream %r", self.stream_id)
Expand Down
93 changes: 81 additions & 12 deletions myhoard/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class Backup(TypedDict):
closed_at: Optional[float]
completed_at: Optional[float]
broken_at: Optional[float]
preserve_until: Optional[str]
recovery_site: bool
stream_id: str
resumable: bool
Expand Down Expand Up @@ -127,10 +128,12 @@ class State(TypedDict):
last_could_have_purged: float
mode: "Controller.Mode"
owned_stream_ids: List[int]
pending_preservation_requests: Dict[str, Optional[str]]
promote_details: Dict[str, Any]
promote_on_restore_completion: bool
replication_state: Dict[str, GtidExecuted]
restore_options: dict
stream_to_be_purged: Optional[str]
server_uuid: Optional[str]
uploaded_binlogs: list

Expand Down Expand Up @@ -210,10 +213,12 @@ def __init__(
"last_could_have_purged": time.time(),
"mode": self.Mode.idle,
"owned_stream_ids": [],
"pending_preservation_requests": {},
"promote_details": {},
"promote_on_restore_completion": False,
"replication_state": {},
"restore_options": {},
"stream_to_be_purged": None,
"server_uuid": None,
"uploaded_binlogs": [],
}
Expand Down Expand Up @@ -265,6 +270,16 @@ def mark_backup_requested(
return
self.state_manager.update_state(backup_request=new_request)

def mark_backup_preservation(self, stream_id: str, preserve_until: Optional[datetime.datetime]) -> None:
backup_to_preserve = self.get_backup_by_stream_id(stream_id)
if not backup_to_preserve:
raise Exception(f"Stream {stream_id} was not found in completed backups.")

with self.lock:
current_requests = dict(self.state["pending_preservation_requests"])
current_requests[stream_id] = preserve_until.isoformat() if preserve_until else None
self.state_manager.update_state(pending_preservation_requests=current_requests)

@property
def mode(self) -> Mode:
return self.state["mode"]
Expand Down Expand Up @@ -567,6 +582,7 @@ def get_backup_list(backup_sites: Dict[str, BackupSiteInfo], *, seen_basebackup_
broken_info = {}
closed_info = {}
completed_info = {}
preserved_info = {}
for info in file_storage.list_iter(site_and_stream_id):
file_name = info["name"].rsplit("/", 1)[-1]
if file_name == "basebackup.xbstream":
Expand All @@ -585,18 +601,23 @@ def get_backup_list(backup_sites: Dict[str, BackupSiteInfo], *, seen_basebackup_
closed_info = parse_fs_metadata(info["metadata"])
elif file_name == "completed.json":
completed_info = parse_fs_metadata(info["metadata"])
elif file_name == "preserved.json":
preserved_info = parse_fs_metadata(info["metadata"])

if basebackup_info and basebackup_compressed_size:
basebackup_info = dict(basebackup_info, compressed_size=basebackup_compressed_size)
resumable = basebackup_info and basebackup_compressed_size
completed = resumable and completed_info
closed = completed and closed_info

preserve_until = preserved_info.get("preserve_until")
backups.append(
{
"basebackup_info": basebackup_info,
"broken_at": broken_info.get("broken_at"),
"closed_at": closed_info["closed_at"] if closed else None,
"completed_at": completed_info["completed_at"] if completed else None,
"preserve_until": preserve_until,
"recovery_site": site_config.get("recovery_only", False),
"stream_id": site_and_stream_id.rsplit("/", 1)[-1],
"resumable": bool(resumable),
Expand Down Expand Up @@ -1086,11 +1107,18 @@ def _get_upload_backup_site(self):
raise Exception("Defined upload site not present in list of non-recovery backup sites")
return site_id, non_recovery_sites[site_id]

def _get_site_for_stream_id(self, stream_id):
def _get_site_for_stream_id(self, stream_id: str):
backup = self.get_backup_by_stream_id(stream_id)
if not backup:
KeyError(f"Stream {stream_id} not found in backups")
return backup["site"]

def get_backup_by_stream_id(self, stream_id: str):
for backup in self.state["backups"]:
if backup["stream_id"] == stream_id:
return backup["site"]
raise KeyError(f"Stream {stream_id} not found in backups")
return backup
Comment on lines 1117 to +1119
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we often call that? If we always want to access backups by their stream_id, it might be worth considering changing self.state["backups"] from a list[Backup] to a dict[stream_id, Backup].

I don't think this should be in the scope of the ticket though. But depending on your opinion and knowledge, it might be worth creating a ticket for this (if it makes sense), good for new-developer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do filter backups by stream_id lots of times... but that sounds as a major change and multiple places on the code will be affected, not sure if it will be worth it at the end of the day.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, out of scope then. 👍
At the end of the review, I'll create a few tickets about all the points mentioned, to keep track of them and they can be prioritized by Amichai at a later date.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#175 handles this


return None

def _get_restore_coordinator_state_file_and_remove_old(self):
state_file_name = os.path.join(self.state_dir, "restore_coordinator_state.json")
Expand All @@ -1111,6 +1139,7 @@ def _get_restore_coordinator_pending_state_file_and_remove_old(self):
def _handle_mode_active(self):
self._cache_server_uuid_if_missing()
self._set_uploaded_binlog_references()
self._handle_pending_preservation_requests()
self._refresh_backups_list_and_streams()
self._mark_periodic_backup_requested_if_interval_exceeded()
self._create_new_backup_stream_if_requested_and_max_streams_not_exceeded()
Expand Down Expand Up @@ -1188,16 +1217,13 @@ def _handle_mode_restore(self):
)

def _mark_failed_restore_backup_as_broken(self) -> None:
broken_backup = None
failed_stream_id = self.state["restore_options"]["stream_id"]
backups = self.state["backups"]
for backup in backups:
if backup["stream_id"] == failed_stream_id:
broken_backup = backup
break
broken_backup = self.get_backup_by_stream_id(stream_id=failed_stream_id)

if not broken_backup:
raise Exception(f"Stream {failed_stream_id} to be marked as broken not found in completed backups: {backups}")
raise Exception(
f'Stream {failed_stream_id} to be marked as broken not found in completed backups: {self.state["backups"]}'
)

self._build_backup_stream(broken_backup).mark_as_broken()

Expand Down Expand Up @@ -1290,6 +1316,11 @@ def _purge_old_backups(self):
if not backup["closed_at"]:
return

# do not purge backup if its preserved
preserve_until = backup["preserve_until"]
if preserve_until and datetime.datetime.now(datetime.timezone.utc) < datetime.datetime.fromisoformat(preserve_until):
return

if time.time() > backup["closed_at"] + self.backup_settings["backup_age_days_max"] * 24 * 60 * 60:
self.log.info("Backup %r is older than max backup age, dropping it", backup["stream_id"])
elif non_broken_backups_count > self.backup_settings["backup_count_max"]:
Expand All @@ -1304,8 +1335,19 @@ def _purge_old_backups(self):
self.log.warning("Backup %r to drop is one of active streams, not dropping", backup["stream_id"])
return

with self.lock:
self.state_manager.update_state(stream_to_be_purged=backup["stream_id"])

self._build_backup_stream(backup).remove()
# lock the controller, this way other requests do not access backups till backup is purged
with self.lock:
self.state_manager.update_state(stream_to_be_purged=None)
current_backups = [
current_backup
for current_backup in self.state["backups"]
if current_backup["stream_id"] != backup["stream_id"]
]
self.state_manager.update_state(backups=current_backups)
owned_stream_ids = [sid for sid in self.state["owned_stream_ids"] if sid != backup["stream_id"]]
self.state_manager.update_state(owned_stream_ids=owned_stream_ids)

Expand Down Expand Up @@ -1408,11 +1450,12 @@ def _purge_old_binlogs(self, *, mysql_maybe_not_running=False):
self.stats.gauge_float("myhoard.binlog.time_since_any_purged", current_time - last_purge)
self.stats.gauge_float("myhoard.binlog.time_since_could_have_purged", current_time - last_could_have_purged)

def _refresh_backups_list(self):
def _refresh_backups_list(self, force_refresh: bool = False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _refresh_backups_list(self, force_refresh: bool = False):
def _refresh_backups_list(self, force_refresh: bool = False) -> List[Backup]:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this requires a lot of changes since Controller.get_backup_list is not returning List[Backup], is returning List[Dict[.....]]. I rather keep this PR small and not involve any refactoring work that is not required. I'll create an issue for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #175 for this

interval = self.backup_refresh_interval_base
if self.mode == self.Mode.active:
interval *= self.BACKUP_REFRESH_ACTIVE_MULTIPLIER
if time.time() - self.state["backups_fetched_at"] < interval:

if force_refresh is False and time.time() - self.state["backups_fetched_at"] < interval:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if force_refresh is False and time.time() - self.state["backups_fetched_at"] < interval:
if not force_refresh and time.time() - self.state["backups_fetched_at"] < interval:

return None

backups = self.get_backup_list(
Expand All @@ -1432,6 +1475,32 @@ def _stream_for_backup_initiated_by_old_master(self, stream: BackupStream) -> bo
for backups initiated by an old master while the current node was getting promoted."""
return self.mode == self.Mode.active and stream.mode == BackupStream.Mode.observe

def _handle_pending_preservation_requests(self) -> None:
if not self.state["pending_preservation_requests"]:
return None

while self.state["pending_preservation_requests"]:
# acquire lock since another thread might update the state
with self.lock:
stream_id, preserve_until = list(self.state["pending_preservation_requests"].items())[0]

backup_to_preserve = self.get_backup_by_stream_id(stream_id)
if backup_to_preserve:
self._build_backup_stream(backup_to_preserve).mark_preservation(
preserve_until=datetime.datetime.fromisoformat(preserve_until) if preserve_until else None
)

with self.lock:
pending_preservation_requests = {
sid: ts
for sid, ts in self.state["pending_preservation_requests"].items()
if sid != stream_id and ts != preserve_until
}
self.state_manager.update_state(pending_preservation_requests=pending_preservation_requests)

# force refresh, this way we guarantee preserve_until is updated and backup does not get deleted
self._refresh_backups_list(force_refresh=True)

def _refresh_backups_list_and_streams(self):
basebackup_streams = {
stream.stream_id: stream
Expand Down
59 changes: 59 additions & 0 deletions myhoard/web_server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2019 Aiven, Helsinki, Finland. https://aiven.io/
from aiohttp import web
from aiohttp.web_response import json_response
from datetime import datetime, timezone
from myhoard.backup_stream import BackupStream
from myhoard.controller import Controller
from myhoard.errors import BadRequest
Expand Down Expand Up @@ -78,6 +79,63 @@ async def backup_list(self, _request):
response["backups"] = self.controller.state["backups"]
return json_response(response)

async def backup_preserve(self, request):
with self._handle_request(name="backup_preserve"):
stream_id = request.match_info["stream_id"]
body = await self._get_request_json(request)
preserve_until = body.get("preserve_until")

if preserve_until is not None:
try:
preserve_until = datetime.fromisoformat(preserve_until)
if preserve_until.tzinfo != timezone.utc:
raise BadRequest("`preserve_until` must be in UTC timezone.")

now = datetime.now(timezone.utc)
if preserve_until < now:
raise BadRequest("`preserve_until` must be a date in the future.")
except ValueError:
raise BadRequest("`preserve_until` must be a valid isoformat datetime string.")

self.controller.mark_backup_preservation(stream_id=stream_id, preserve_until=preserve_until)
wait_for_applied_preservation = body.get("wait_for_applied_preservation")
if wait_for_applied_preservation:
self.log.info(
"Waiting up to %.1f seconds for preservation of backup %s to be applied.",
wait_for_applied_preservation,
stream_id,
)
start = time.monotonic()
while True:
backup = self.controller.get_backup_by_stream_id(stream_id)
# the backup was or will be removed before preservation could be applied
if not backup or stream_id == self.controller.state["stream_to_be_purged"]:
if preserve_until:
return json_response({"success": False})
# preservation was removed on time
return json_response({"success": True})

if (backup["preserve_until"] is None and preserve_until is None) or (
backup["preserve_until"] == preserve_until.isoformat()
):
self.log.info("Preservation for backup %s was applied.", stream_id)
break

elapsed = time.monotonic() - start
if elapsed > wait_for_applied_preservation:
self.log.info(
"Preservation for backup %s was not applied up in %.1f seconds",
stream_id,
elapsed,
)
# waiting time was exceeded
return json_response({"success": False, "preservation_is_still_pending": True})

wait_time = min(wait_for_applied_preservation - elapsed, 0.1)
await asyncio.sleep(wait_time)

return json_response({"success": True})

async def replication_state_set(self, request):
with self._handle_request(name="replication_state_set"):
state = await self._get_request_json(request)
Expand Down Expand Up @@ -201,6 +259,7 @@ def _add_routes(self):
[
web.get("/backup", self.backup_list),
web.post("/backup", self.backup_create),
web.put("/backup/{stream_id}/preserve", self.backup_preserve),
web.put("/replication_state", self.replication_state_set),
web.get("/status", self.status_show),
web.put("/status", self.status_update),
Expand Down
Loading