-
Notifications
You must be signed in to change notification settings - Fork 6.6k
Add script to efficiently delete a large number of nested empty folders in an HNS bucket. #13497
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
mabsaleh
wants to merge
2
commits into
GoogleCloudPlatform:main
Choose a base branch
from
mabsaleh:delete-empty-folders-script
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+413
−0
Open
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
# GCS HNS Folder Management Scripts | ||
|
||
This directory contains scripts for managing folders in Google Cloud Storage (GCS) Hierarchical Namespace (HNS) enabled buckets. | ||
|
||
## Scripts | ||
|
||
### `delete-empty-folders.py` | ||
|
||
This script recursively deletes empty folders within a specified GCS bucket and (optional) prefix. | ||
|
||
**Features:** | ||
|
||
* **Recursive Deletion:** Traverses and deletes nested empty folders. | ||
* **Depth-First Deletion:** Deletes the deepest folders first to ensure parent folders are empty before deletion attempts. | ||
* **Parallel Execution:** Uses a thread pool to delete folders concurrently for improved performance. | ||
* **Configurable:** Allows setting the target bucket, folder prefix, and number of workers. | ||
* **Error Handling:** Retries on transient errors and logs failures. | ||
* **Progress Reporting:** Periodically logs deletion statistics. | ||
|
||
**Usage:** | ||
|
||
1. **Authenticate:** | ||
```bash | ||
gcloud auth application-default login | ||
``` | ||
2. **Configure:** Update the variables at the top of the script: | ||
* `BUCKET_NAME`: The name of your GCS HNS bucket. | ||
* `FOLDER_PREFIX`: (Optional) The prefix to limit deletion scope (e.g., `archive/`). Leave empty to scan the whole bucket. Must end with `/` if specified. | ||
* `MAX_WORKERS`: Number of concurrent deletion threads. | ||
3. **Run:** | ||
```bash | ||
python3 delete-empty-folders.py | ||
``` | ||
|
||
**Note:** This script *only* deletes folders. Folders containing any objects will not be deleted, and a "Failed Precondition" warning will be logged. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,360 @@ | ||
from concurrent import futures | ||
import concurrent.futures | ||
import logging | ||
import threading | ||
import time | ||
|
||
from google.api_core import exceptions as google_exceptions | ||
from google.cloud import storage_control_v2 | ||
|
||
ThreadPoolExecutor = futures.ThreadPoolExecutor | ||
|
||
# This script may be used to recursively delete a large number of nested empty | ||
# folders in a GCS HNS bucket. Overview of the algorithm: | ||
# 1. Folder Discovery: | ||
# - Lists all folders under the BUCKET_NAME and FOLDER_PREFIX (if set). | ||
# - Partitions all discovered folders into a map, keyed by depth | ||
# (e.g. {1: [foo1/ foo2/], 2: [foo1/bar1/, foo1/bar2/, foo2/bar3/], ...} | ||
# 2. Folder Deletion: | ||
# - Processes depths in reverse order (from deepest to shallowest). | ||
# - For each depth level, submits all folders at that level to a thread pool | ||
# for parallel deletion. | ||
# - Only moves to the next depth level once all folders at the current depth | ||
# have been processed. This ensures that child folders are removed before | ||
# their parents, respecting hierarhical constraints. | ||
# | ||
# Note: This script only deletes folders, not objects; any folders with child | ||
# objects (immediate or nested) will fail to be deleted. | ||
# | ||
# Usage: | ||
# 1. Authenticate with Google Cloud CLI: | ||
# `gcloud auth application-default login` | ||
# 2. Update configuration variables as needed to specify the target bucket, | ||
# prefix, number of concurrent threads, etc. | ||
# 3. Run the script: `python3 delete-folders-script.py` | ||
|
||
# --- Configuration --- | ||
BUCKET_NAME = "your-gcs-bucket-name" | ||
|
||
# e.g. "archive/old_data/" or "" to delete all folders in the bucket. | ||
# If specified, must end with '/'. | ||
FOLDER_PREFIX = "" | ||
|
||
# Max number of concurrent threads to use for deleting folders. | ||
MAX_WORKERS = 100 | ||
|
||
# How often to log statistics during deletion, in seconds. | ||
STATS_REPORT_INTERVAL = 5 | ||
|
||
# Maximum number of retries for transient errors | ||
MAX_RETRIES = 5 | ||
|
||
# --- Data Structures & Globals --- | ||
logging.basicConfig( | ||
level=logging.INFO, format="%(asctime)s - %(threadName)s - %(message)s" | ||
) | ||
|
||
# Global map to store folders by their depth | ||
# { depth (int) -> set of full_resource_names (str) } | ||
folders_by_depth = {} | ||
|
||
# Stats for monitoring progress | ||
stats = { | ||
"found_total": 0, | ||
"successful_deletes": 0, | ||
"failed_deletes_precondition": 0, | ||
"failed_deletes_internal": 0, | ||
} | ||
stats_lock = threading.Lock() | ||
|
||
# Initialize the Storage Control API client | ||
storage_control_client = storage_control_v2.StorageControlClient() | ||
|
||
|
||
def _get_simple_path_and_depth(full_resource_name: str) -> tuple[str, int]: | ||
"""Extracts bucket-relative path and depth from a GCS folder resource name. | ||
|
||
The "simple path" is relative to the bucket (e.g., 'archive/logs/' for | ||
'projects/_/buckets/my-bucket/folders/archive/logs/'). | ||
|
||
The "depth" is the number of '/' in the simple path (e.g. 'archive/logs/' is | ||
depth 2). | ||
|
||
Args: | ||
full_resource_name: The full resource name of the GCS folder, e.g., | ||
'projects/_/buckets/your-bucket-name/folders/path/to/folder/'. | ||
|
||
Returns: | ||
A tuple (simple_path: str, depth: int). | ||
|
||
Raises: | ||
ValueError: If the resource name does not match the expected format | ||
(i.e. start with 'projects/_/buckets/BUCKET_NAME/folders/FOLDER_PREFIX' | ||
and ends with a trailing slash). | ||
""" | ||
base_folders_prefix = f"projects/_/buckets/{BUCKET_NAME}/folders/" | ||
# The full prefix to validate against, including the global FOLDER_PREFIX. | ||
# If FOLDER_PREFIX is "", this is equivalent to base_folders_prefix. | ||
expected_validation_prefix = base_folders_prefix + FOLDER_PREFIX | ||
|
||
if not full_resource_name.startswith( | ||
expected_validation_prefix | ||
) or not full_resource_name.endswith("/"): | ||
raise ValueError( | ||
f"Folder resource name '{full_resource_name}' does not match expected" | ||
f" prefix '{expected_validation_prefix}' or missing trailing slash." | ||
) | ||
|
||
simple_path = full_resource_name[len(base_folders_prefix) :] | ||
depth = simple_path.count("/") | ||
if depth < 1: | ||
raise ValueError( | ||
f"Folder resource name '{full_resource_name}' has invalid depth" | ||
f" {depth} (expected at least 1)." | ||
) | ||
return simple_path, depth | ||
|
||
|
||
def discover_and_partition_folders(): | ||
"""Discovers all folders in the bucket and partitions them by depth. | ||
|
||
Result is stored in the global folders_by_depth dictionary. | ||
""" | ||
parent_resource = f"projects/_/buckets/{BUCKET_NAME}" | ||
|
||
logging.info( | ||
"Starting folder discovery and partitioning for bucket '%s'." | ||
" Using prefix filter: '%s'.", | ||
BUCKET_NAME, | ||
FOLDER_PREFIX if FOLDER_PREFIX else "NONE (all folders)", | ||
) | ||
|
||
list_folders_request = storage_control_v2.ListFoldersRequest( | ||
parent=parent_resource, prefix=FOLDER_PREFIX | ||
) | ||
|
||
num_folders_found = 0 | ||
try: | ||
for folder in storage_control_client.list_folders(request=list_folders_request): | ||
full_resource_name = folder.name | ||
_, depth = _get_simple_path_and_depth(full_resource_name) | ||
|
||
if depth not in folders_by_depth: | ||
folders_by_depth[depth] = set() | ||
folders_by_depth[depth].add(full_resource_name) | ||
|
||
num_folders_found += 1 | ||
with stats_lock: | ||
stats["found_total"] = num_folders_found | ||
|
||
except Exception as e: | ||
logging.error("Failed to list folders: %s", e, exc_info=True) | ||
return | ||
|
||
logging.info("Finished discovery. Total folders found: %s.", num_folders_found) | ||
if not folders_by_depth: | ||
logging.info("No folders found in the bucket.") | ||
else: | ||
logging.info("Folders partitioned by depth:") | ||
for depth_val in sorted(folders_by_depth.keys()): | ||
logging.info( | ||
" Depth %s: %s folders", depth_val, len(folders_by_depth[depth_val]) | ||
) | ||
|
||
|
||
def delete_folder(folder_full_resource_name: str): | ||
"""Attempts to delete a single GCS HNS folder. | ||
|
||
Includes retry logic for transient errors. | ||
|
||
Stores stats in the global stats dictionary. | ||
|
||
Args: | ||
folder_full_resource_name: The full resource name of the GCS folder to | ||
delete, e.g., | ||
'projects/_/buckets/your-bucket-name/folders/path/to/folder/'. | ||
""" | ||
simple_path, _ = _get_simple_path_and_depth(folder_full_resource_name) | ||
|
||
attempts = 0 | ||
while attempts <= MAX_RETRIES: | ||
attempts += 1 | ||
try: | ||
request = storage_control_v2.DeleteFolderRequest( | ||
name=folder_full_resource_name | ||
) | ||
storage_control_client.delete_folder(request=request) | ||
|
||
with stats_lock: | ||
stats["successful_deletes"] += 1 | ||
return # Success, exit the retry loop | ||
|
||
except google_exceptions.NotFound: | ||
# This can happen if the folder was deleted by another process. | ||
logging.warning( | ||
"Folder not found for deletion (already gone?): %s", simple_path | ||
) | ||
return # Not a retriable error | ||
except google_exceptions.FailedPrecondition as e: | ||
# This typically means the folder contains objects. | ||
logging.warning("Deletion failed for '%s': %s.", simple_path, e.message) | ||
with stats_lock: | ||
stats["failed_deletes_precondition"] += 1 | ||
return # Not a retriable error | ||
except Exception as e: | ||
if attempts <= MAX_RETRIES: | ||
logging.warning( | ||
"Transient error deleting '%s': %s. Retrying (attempt %s/%s).", | ||
simple_path, | ||
e, | ||
attempts, | ||
MAX_RETRIES, | ||
) | ||
else: | ||
logging.error( | ||
"Failed to delete '%s' after %s retries: %s", | ||
simple_path, | ||
MAX_RETRIES, | ||
e, | ||
exc_info=True, | ||
) | ||
with stats_lock: | ||
stats["failed_deletes_internal"] += 1 | ||
return # All retries exhausted | ||
|
||
|
||
# --- STATS REPORTER THREAD --- | ||
def stats_reporter_thread_logic(stop_event: threading.Event, start_time: float): | ||
"""Logs current statistics periodically.""" | ||
logging.info("Stats Reporter: Started.") | ||
while not stop_event.wait(STATS_REPORT_INTERVAL): | ||
with stats_lock: | ||
elapsed = time.time() - start_time | ||
rate = stats["successful_deletes"] / elapsed if elapsed > 0 else 0 | ||
logging.info( | ||
"[STATS] Total Folders Found: %s | Successful Deletes: %s | Failed" | ||
" Deletes (precondition): %s | Failed Deletes (internal): %s | Rate:" | ||
" %.2f folders/sec", | ||
stats["found_total"], | ||
stats["successful_deletes"], | ||
stats["failed_deletes_precondition"], | ||
stats["failed_deletes_internal"], | ||
rate, | ||
) | ||
logging.info("Stats Reporter: Shutting down.") | ||
|
||
|
||
# --- MAIN EXECUTION BLOCK --- | ||
if __name__ == "__main__": | ||
if BUCKET_NAME == "your-gcs-bucket-name": | ||
print( | ||
"\nERROR: Please update the BUCKET_NAME variable in the script before" | ||
" running." | ||
) | ||
exit(0) | ||
mabsaleh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
start_time = time.time() | ||
|
||
logging.info("Starting GCS HNS folder deletion for bucket: %s", BUCKET_NAME) | ||
|
||
# Event to signal threads to stop gracefully. | ||
stop_event = threading.Event() | ||
|
||
# Start the stats reporter thread. | ||
stats_thread = threading.Thread( | ||
target=stats_reporter_thread_logic, | ||
args=(stop_event, start_time), | ||
name="StatsReporter", | ||
daemon=True, | ||
) | ||
stats_thread.start() | ||
|
||
# Step 1: Discover and Partition Folders. | ||
discover_and_partition_folders() | ||
|
||
if not folders_by_depth: | ||
logging.info("No folders found to delete. Exiting.") | ||
exit(0) | ||
|
||
# Prepare for multi-threaded deletion within each depth level. | ||
deletion_executor = ThreadPoolExecutor( | ||
max_workers=MAX_WORKERS, thread_name_prefix="DeleteFolderWorker" | ||
) | ||
|
||
try: | ||
# Step 2: Iterate and delete by depth (from max to min). | ||
sorted_depths = sorted(folders_by_depth.keys(), reverse=True) | ||
for current_depth in sorted_depths: | ||
folders_at_current_depth = list(folders_by_depth.get(current_depth, set())) | ||
|
||
if not folders_at_current_depth: | ||
logging.info( | ||
"Skipping depth %s: No folders found at this depth.", current_depth | ||
) | ||
continue | ||
|
||
logging.info( | ||
"\nProcessing depth %s: Submitting %s folders for deletion...", | ||
current_depth, | ||
len(folders_at_current_depth), | ||
) | ||
|
||
# Submit deletion tasks to the executor. | ||
futures = [ | ||
deletion_executor.submit(delete_folder, folder_path) | ||
for folder_path in folders_at_current_depth | ||
] | ||
|
||
# Wait for all tasks at the current depth to complete. | ||
# This is critical: we must ensure all nested folders are gone before | ||
# tackling their parents. | ||
concurrent.futures.wait(futures) | ||
|
||
logging.info("Finished processing all folders at depth %s.", current_depth) | ||
|
||
except KeyboardInterrupt: | ||
logging.info( | ||
"Main: Keyboard interrupt received. Attempting graceful shutdown..." | ||
) | ||
except Exception as e: | ||
logging.error( | ||
"An unexpected error occurred in the main loop: %s", e, exc_info=True | ||
) | ||
finally: | ||
# Signal all threads to stop. | ||
stop_event.set() | ||
|
||
# Shut down deletion executor and wait for any pending tasks to complete. | ||
logging.info( | ||
"Main: Shutting down deletion workers. Waiting for any final tasks..." | ||
) | ||
deletion_executor.shutdown(wait=True) | ||
|
||
# Wait for the stats reporter to finish. | ||
if stats_thread.is_alive(): | ||
stats_thread.join( | ||
timeout=STATS_REPORT_INTERVAL + 2 | ||
) # Give it a bit more time. | ||
|
||
# Log final statistics. | ||
final_elapsed_time = time.time() - start_time | ||
logging.info("\n--- FINAL SUMMARY ---") | ||
with stats_lock: | ||
final_rate = ( | ||
stats["successful_deletes"] / final_elapsed_time | ||
if final_elapsed_time > 0 | ||
else 0 | ||
) | ||
logging.info( | ||
" - Total Folders Found (Initial Scan): %s\n - Successful Folder" | ||
" Deletes: %s\n - Failed Folder Deletes (Precondition): %s\n -" | ||
" Failed Folder Deletes (Internal): %s\n - Total Runtime: %.2f" | ||
" seconds\n - Average Deletion Rate: %.2f folders/sec", | ||
stats["found_total"], | ||
stats["successful_deletes"], | ||
stats["failed_deletes_precondition"], | ||
stats["failed_deletes_internal"], | ||
final_elapsed_time, | ||
final_rate, | ||
) | ||
logging.info("Script execution finished.") |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.