-
Notifications
You must be signed in to change notification settings - Fork 22
support backup preservation [BF-2219] #171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -55,6 +55,7 @@ class Backup(TypedDict): | |||||
closed_at: Optional[float] | ||||||
completed_at: Optional[float] | ||||||
broken_at: Optional[float] | ||||||
preserve_until: Optional[str] | ||||||
recovery_site: bool | ||||||
stream_id: str | ||||||
resumable: bool | ||||||
|
@@ -127,10 +128,12 @@ class State(TypedDict): | |||||
last_could_have_purged: float | ||||||
mode: "Controller.Mode" | ||||||
owned_stream_ids: List[int] | ||||||
pending_preservation_requests: Dict[str, Optional[str]] | ||||||
promote_details: Dict[str, Any] | ||||||
promote_on_restore_completion: bool | ||||||
replication_state: Dict[str, GtidExecuted] | ||||||
restore_options: dict | ||||||
stream_to_be_purged: Optional[str] | ||||||
server_uuid: Optional[str] | ||||||
uploaded_binlogs: list | ||||||
|
||||||
|
@@ -210,10 +213,12 @@ def __init__( | |||||
"last_could_have_purged": time.time(), | ||||||
"mode": self.Mode.idle, | ||||||
"owned_stream_ids": [], | ||||||
"pending_preservation_requests": {}, | ||||||
"promote_details": {}, | ||||||
"promote_on_restore_completion": False, | ||||||
"replication_state": {}, | ||||||
"restore_options": {}, | ||||||
"stream_to_be_purged": None, | ||||||
"server_uuid": None, | ||||||
"uploaded_binlogs": [], | ||||||
} | ||||||
|
@@ -265,6 +270,16 @@ def mark_backup_requested( | |||||
return | ||||||
self.state_manager.update_state(backup_request=new_request) | ||||||
|
||||||
def mark_backup_preservation(self, stream_id: str, preserve_until: Optional[datetime.datetime]) -> None: | ||||||
backup_to_preserve = self.get_backup_by_stream_id(stream_id) | ||||||
if not backup_to_preserve: | ||||||
raise Exception(f"Stream {stream_id} was not found in completed backups.") | ||||||
|
||||||
with self.lock: | ||||||
current_requests = dict(self.state["pending_preservation_requests"]) | ||||||
current_requests[stream_id] = preserve_until.isoformat() if preserve_until else None | ||||||
kathia-barahona marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
self.state_manager.update_state(pending_preservation_requests=current_requests) | ||||||
|
||||||
@property | ||||||
def mode(self) -> Mode: | ||||||
return self.state["mode"] | ||||||
|
@@ -567,6 +582,7 @@ def get_backup_list(backup_sites: Dict[str, BackupSiteInfo], *, seen_basebackup_ | |||||
broken_info = {} | ||||||
closed_info = {} | ||||||
completed_info = {} | ||||||
preserved_info = {} | ||||||
for info in file_storage.list_iter(site_and_stream_id): | ||||||
file_name = info["name"].rsplit("/", 1)[-1] | ||||||
if file_name == "basebackup.xbstream": | ||||||
|
@@ -585,18 +601,23 @@ def get_backup_list(backup_sites: Dict[str, BackupSiteInfo], *, seen_basebackup_ | |||||
closed_info = parse_fs_metadata(info["metadata"]) | ||||||
elif file_name == "completed.json": | ||||||
completed_info = parse_fs_metadata(info["metadata"]) | ||||||
elif file_name == "preserved.json": | ||||||
preserved_info = parse_fs_metadata(info["metadata"]) | ||||||
|
||||||
if basebackup_info and basebackup_compressed_size: | ||||||
basebackup_info = dict(basebackup_info, compressed_size=basebackup_compressed_size) | ||||||
resumable = basebackup_info and basebackup_compressed_size | ||||||
completed = resumable and completed_info | ||||||
closed = completed and closed_info | ||||||
|
||||||
preserve_until = preserved_info.get("preserve_until") | ||||||
backups.append( | ||||||
{ | ||||||
"basebackup_info": basebackup_info, | ||||||
"broken_at": broken_info.get("broken_at"), | ||||||
"closed_at": closed_info["closed_at"] if closed else None, | ||||||
"completed_at": completed_info["completed_at"] if completed else None, | ||||||
"preserve_until": preserve_until, | ||||||
"recovery_site": site_config.get("recovery_only", False), | ||||||
"stream_id": site_and_stream_id.rsplit("/", 1)[-1], | ||||||
"resumable": bool(resumable), | ||||||
|
@@ -1086,11 +1107,18 @@ def _get_upload_backup_site(self): | |||||
raise Exception("Defined upload site not present in list of non-recovery backup sites") | ||||||
return site_id, non_recovery_sites[site_id] | ||||||
|
||||||
def _get_site_for_stream_id(self, stream_id): | ||||||
def _get_site_for_stream_id(self, stream_id: str): | ||||||
backup = self.get_backup_by_stream_id(stream_id) | ||||||
if not backup: | ||||||
KeyError(f"Stream {stream_id} not found in backups") | ||||||
return backup["site"] | ||||||
|
||||||
def get_backup_by_stream_id(self, stream_id: str): | ||||||
for backup in self.state["backups"]: | ||||||
if backup["stream_id"] == stream_id: | ||||||
return backup["site"] | ||||||
raise KeyError(f"Stream {stream_id} not found in backups") | ||||||
return backup | ||||||
|
||||||
|
||||||
return None | ||||||
|
||||||
def _get_restore_coordinator_state_file_and_remove_old(self): | ||||||
state_file_name = os.path.join(self.state_dir, "restore_coordinator_state.json") | ||||||
|
@@ -1111,6 +1139,7 @@ def _get_restore_coordinator_pending_state_file_and_remove_old(self): | |||||
def _handle_mode_active(self): | ||||||
self._cache_server_uuid_if_missing() | ||||||
self._set_uploaded_binlog_references() | ||||||
self._handle_pending_preservation_requests() | ||||||
self._refresh_backups_list_and_streams() | ||||||
self._mark_periodic_backup_requested_if_interval_exceeded() | ||||||
self._create_new_backup_stream_if_requested_and_max_streams_not_exceeded() | ||||||
|
@@ -1188,16 +1217,13 @@ def _handle_mode_restore(self): | |||||
) | ||||||
|
||||||
def _mark_failed_restore_backup_as_broken(self) -> None: | ||||||
broken_backup = None | ||||||
failed_stream_id = self.state["restore_options"]["stream_id"] | ||||||
backups = self.state["backups"] | ||||||
for backup in backups: | ||||||
if backup["stream_id"] == failed_stream_id: | ||||||
broken_backup = backup | ||||||
break | ||||||
broken_backup = self.get_backup_by_stream_id(stream_id=failed_stream_id) | ||||||
|
||||||
if not broken_backup: | ||||||
raise Exception(f"Stream {failed_stream_id} to be marked as broken not found in completed backups: {backups}") | ||||||
raise Exception( | ||||||
f'Stream {failed_stream_id} to be marked as broken not found in completed backups: {self.state["backups"]}' | ||||||
) | ||||||
|
||||||
self._build_backup_stream(broken_backup).mark_as_broken() | ||||||
|
||||||
|
@@ -1290,6 +1316,11 @@ def _purge_old_backups(self): | |||||
if not backup["closed_at"]: | ||||||
return | ||||||
|
||||||
# do not purge backup if its preserved | ||||||
preserve_until = backup["preserve_until"] | ||||||
if preserve_until and datetime.datetime.now(datetime.timezone.utc) < datetime.datetime.fromisoformat(preserve_until): | ||||||
return | ||||||
|
||||||
if time.time() > backup["closed_at"] + self.backup_settings["backup_age_days_max"] * 24 * 60 * 60: | ||||||
self.log.info("Backup %r is older than max backup age, dropping it", backup["stream_id"]) | ||||||
elif non_broken_backups_count > self.backup_settings["backup_count_max"]: | ||||||
|
@@ -1304,8 +1335,19 @@ def _purge_old_backups(self): | |||||
self.log.warning("Backup %r to drop is one of active streams, not dropping", backup["stream_id"]) | ||||||
return | ||||||
|
||||||
with self.lock: | ||||||
self.state_manager.update_state(stream_to_be_purged=backup["stream_id"]) | ||||||
|
||||||
self._build_backup_stream(backup).remove() | ||||||
kathia-barahona marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
# lock the controller, this way other requests do not access backups till backup is purged | ||||||
with self.lock: | ||||||
self.state_manager.update_state(stream_to_be_purged=None) | ||||||
current_backups = [ | ||||||
current_backup | ||||||
for current_backup in self.state["backups"] | ||||||
if current_backup["stream_id"] != backup["stream_id"] | ||||||
] | ||||||
self.state_manager.update_state(backups=current_backups) | ||||||
owned_stream_ids = [sid for sid in self.state["owned_stream_ids"] if sid != backup["stream_id"]] | ||||||
self.state_manager.update_state(owned_stream_ids=owned_stream_ids) | ||||||
|
||||||
|
@@ -1408,11 +1450,12 @@ def _purge_old_binlogs(self, *, mysql_maybe_not_running=False): | |||||
self.stats.gauge_float("myhoard.binlog.time_since_any_purged", current_time - last_purge) | ||||||
self.stats.gauge_float("myhoard.binlog.time_since_could_have_purged", current_time - last_could_have_purged) | ||||||
|
||||||
def _refresh_backups_list(self): | ||||||
def _refresh_backups_list(self, force_refresh: bool = False): | ||||||
|
def _refresh_backups_list(self, force_refresh: bool = False): | |
def _refresh_backups_list(self, force_refresh: bool = False) -> List[Backup]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this requires a lot of changes since Controller.get_backup_list
is not returning List[Backup]
, is returning List[Dict[.....]]
. I rather keep this PR small and not involve any refactoring work that is not required. I'll create an issue for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened #175 for this
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if force_refresh is False and time.time() - self.state["backups_fetched_at"] < interval: | |
if not force_refresh and time.time() - self.state["backups_fetched_at"] < interval: |
Uh oh!
There was an error while loading. Please reload this page.