Skip to content

Add script to efficiently delete a large number of nested empty folders in an HNS bucket. #13497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions storage/hierarchical-namespace/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# GCS HNS Folder Management Scripts

This directory contains scripts for managing folders in Google Cloud Storage (GCS) Hierarchical Namespace (HNS) enabled buckets.

## Scripts

### `delete-empty-folders.py`

This script recursively deletes empty folders within a specified GCS bucket and (optional) prefix.

**Features:**

* **Recursive Deletion:** Traverses and deletes nested empty folders.
* **Depth-First Deletion:** Deletes the deepest folders first to ensure parent folders are empty before deletion attempts.
* **Parallel Execution:** Uses a thread pool to delete folders concurrently for improved performance.
* **Configurable:** Allows setting the target bucket, folder prefix, and number of workers.
* **Error Handling:** Retries on transient errors and logs failures.
* **Progress Reporting:** Periodically logs deletion statistics.

**Usage:**

1. **Authenticate:**
```bash
gcloud auth application-default login
```
2. **Configure:** Update the variables at the top of the script:
* `BUCKET_NAME`: The name of your GCS HNS bucket.
* `FOLDER_PREFIX`: (Optional) The prefix to limit deletion scope (e.g., `archive/`). Leave empty to scan the whole bucket. Must end with `/` if specified.
* `MAX_WORKERS`: Number of concurrent deletion threads.
3. **Run:**
```bash
python3 delete-empty-folders.py
```

**Note:** This script *only* deletes folders. Folders containing any objects will not be deleted, and a "Failed Precondition" warning will be logged.
360 changes: 360 additions & 0 deletions storage/hierarchical-namespace/delete-empty-folders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,360 @@
from concurrent import futures
import concurrent.futures
import logging
import threading
import time

from google.api_core import exceptions as google_exceptions
from google.cloud import storage_control_v2

ThreadPoolExecutor = futures.ThreadPoolExecutor

# This script may be used to recursively delete a large number of nested empty
# folders in a GCS HNS bucket. Overview of the algorithm:
# 1. Folder Discovery:
# - Lists all folders under the BUCKET_NAME and FOLDER_PREFIX (if set).
# - Partitions all discovered folders into a map, keyed by depth
# (e.g. {1: [foo1/ foo2/], 2: [foo1/bar1/, foo1/bar2/, foo2/bar3/], ...}
# 2. Folder Deletion:
# - Processes depths in reverse order (from deepest to shallowest).
# - For each depth level, submits all folders at that level to a thread pool
# for parallel deletion.
# - Only moves to the next depth level once all folders at the current depth
# have been processed. This ensures that child folders are removed before
# their parents, respecting hierarhical constraints.
#
# Note: This script only deletes folders, not objects; any folders with child
# objects (immediate or nested) will fail to be deleted.
#
# Usage:
# 1. Authenticate with Google Cloud CLI:
# `gcloud auth application-default login`
# 2. Update configuration variables as needed to specify the target bucket,
# prefix, number of concurrent threads, etc.
# 3. Run the script: `python3 delete-folders-script.py`

# --- Configuration ---
BUCKET_NAME = "your-gcs-bucket-name"

# e.g. "archive/old_data/" or "" to delete all folders in the bucket.
# If specified, must end with '/'.
FOLDER_PREFIX = ""

# Max number of concurrent threads to use for deleting folders.
MAX_WORKERS = 100

# How often to log statistics during deletion, in seconds.
STATS_REPORT_INTERVAL = 5

# Maximum number of retries for transient errors
MAX_RETRIES = 5

# --- Data Structures & Globals ---
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(threadName)s - %(message)s"
)

# Global map to store folders by their depth
# { depth (int) -> set of full_resource_names (str) }
folders_by_depth = {}

# Stats for monitoring progress
stats = {
"found_total": 0,
"successful_deletes": 0,
"failed_deletes_precondition": 0,
"failed_deletes_internal": 0,
}
stats_lock = threading.Lock()

# Initialize the Storage Control API client
storage_control_client = storage_control_v2.StorageControlClient()


def _get_simple_path_and_depth(full_resource_name: str) -> tuple[str, int]:
"""Extracts bucket-relative path and depth from a GCS folder resource name.

The "simple path" is relative to the bucket (e.g., 'archive/logs/' for
'projects/_/buckets/my-bucket/folders/archive/logs/').

The "depth" is the number of '/' in the simple path (e.g. 'archive/logs/' is
depth 2).

Args:
full_resource_name: The full resource name of the GCS folder, e.g.,
'projects/_/buckets/your-bucket-name/folders/path/to/folder/'.

Returns:
A tuple (simple_path: str, depth: int).

Raises:
ValueError: If the resource name does not match the expected format
(i.e. start with 'projects/_/buckets/BUCKET_NAME/folders/FOLDER_PREFIX'
and ends with a trailing slash).
"""
base_folders_prefix = f"projects/_/buckets/{BUCKET_NAME}/folders/"
# The full prefix to validate against, including the global FOLDER_PREFIX.
# If FOLDER_PREFIX is "", this is equivalent to base_folders_prefix.
expected_validation_prefix = base_folders_prefix + FOLDER_PREFIX

if not full_resource_name.startswith(
expected_validation_prefix
) or not full_resource_name.endswith("/"):
raise ValueError(
f"Folder resource name '{full_resource_name}' does not match expected"
f" prefix '{expected_validation_prefix}' or missing trailing slash."
)

simple_path = full_resource_name[len(base_folders_prefix) :]
depth = simple_path.count("/")
if depth < 1:
raise ValueError(
f"Folder resource name '{full_resource_name}' has invalid depth"
f" {depth} (expected at least 1)."
)
return simple_path, depth


def discover_and_partition_folders():
"""Discovers all folders in the bucket and partitions them by depth.

Result is stored in the global folders_by_depth dictionary.
"""
parent_resource = f"projects/_/buckets/{BUCKET_NAME}"

logging.info(
"Starting folder discovery and partitioning for bucket '%s'."
" Using prefix filter: '%s'.",
BUCKET_NAME,
FOLDER_PREFIX if FOLDER_PREFIX else "NONE (all folders)",
)

list_folders_request = storage_control_v2.ListFoldersRequest(
parent=parent_resource, prefix=FOLDER_PREFIX
)

num_folders_found = 0
try:
for folder in storage_control_client.list_folders(request=list_folders_request):
full_resource_name = folder.name
_, depth = _get_simple_path_and_depth(full_resource_name)

if depth not in folders_by_depth:
folders_by_depth[depth] = set()
folders_by_depth[depth].add(full_resource_name)

num_folders_found += 1
with stats_lock:
stats["found_total"] = num_folders_found

except Exception as e:
logging.error("Failed to list folders: %s", e, exc_info=True)
return

logging.info("Finished discovery. Total folders found: %s.", num_folders_found)
if not folders_by_depth:
logging.info("No folders found in the bucket.")
else:
logging.info("Folders partitioned by depth:")
for depth_val in sorted(folders_by_depth.keys()):
logging.info(
" Depth %s: %s folders", depth_val, len(folders_by_depth[depth_val])
)


def delete_folder(folder_full_resource_name: str):
"""Attempts to delete a single GCS HNS folder.

Includes retry logic for transient errors.

Stores stats in the global stats dictionary.

Args:
folder_full_resource_name: The full resource name of the GCS folder to
delete, e.g.,
'projects/_/buckets/your-bucket-name/folders/path/to/folder/'.
"""
simple_path, _ = _get_simple_path_and_depth(folder_full_resource_name)

attempts = 0
while attempts <= MAX_RETRIES:
attempts += 1
try:
request = storage_control_v2.DeleteFolderRequest(
name=folder_full_resource_name
)
storage_control_client.delete_folder(request=request)

with stats_lock:
stats["successful_deletes"] += 1
return # Success, exit the retry loop

except google_exceptions.NotFound:
# This can happen if the folder was deleted by another process.
logging.warning(
"Folder not found for deletion (already gone?): %s", simple_path
)
return # Not a retriable error
except google_exceptions.FailedPrecondition as e:
# This typically means the folder contains objects.
logging.warning("Deletion failed for '%s': %s.", simple_path, e.message)
with stats_lock:
stats["failed_deletes_precondition"] += 1
return # Not a retriable error
except Exception as e:
if attempts <= MAX_RETRIES:
logging.warning(
"Transient error deleting '%s': %s. Retrying (attempt %s/%s).",
simple_path,
e,
attempts,
MAX_RETRIES,
)
else:
logging.error(
"Failed to delete '%s' after %s retries: %s",
simple_path,
MAX_RETRIES,
e,
exc_info=True,
)
with stats_lock:
stats["failed_deletes_internal"] += 1
return # All retries exhausted


# --- STATS REPORTER THREAD ---
def stats_reporter_thread_logic(stop_event: threading.Event, start_time: float):
"""Logs current statistics periodically."""
logging.info("Stats Reporter: Started.")
while not stop_event.wait(STATS_REPORT_INTERVAL):
with stats_lock:
elapsed = time.time() - start_time
rate = stats["successful_deletes"] / elapsed if elapsed > 0 else 0
logging.info(
"[STATS] Total Folders Found: %s | Successful Deletes: %s | Failed"
" Deletes (precondition): %s | Failed Deletes (internal): %s | Rate:"
" %.2f folders/sec",
stats["found_total"],
stats["successful_deletes"],
stats["failed_deletes_precondition"],
stats["failed_deletes_internal"],
rate,
)
logging.info("Stats Reporter: Shutting down.")


# --- MAIN EXECUTION BLOCK ---
if __name__ == "__main__":
if BUCKET_NAME == "your-gcs-bucket-name":
print(
"\nERROR: Please update the BUCKET_NAME variable in the script before"
" running."
)
exit(0)

start_time = time.time()

logging.info("Starting GCS HNS folder deletion for bucket: %s", BUCKET_NAME)

# Event to signal threads to stop gracefully.
stop_event = threading.Event()

# Start the stats reporter thread.
stats_thread = threading.Thread(
target=stats_reporter_thread_logic,
args=(stop_event, start_time),
name="StatsReporter",
daemon=True,
)
stats_thread.start()

# Step 1: Discover and Partition Folders.
discover_and_partition_folders()

if not folders_by_depth:
logging.info("No folders found to delete. Exiting.")
exit(0)

# Prepare for multi-threaded deletion within each depth level.
deletion_executor = ThreadPoolExecutor(
max_workers=MAX_WORKERS, thread_name_prefix="DeleteFolderWorker"
)

try:
# Step 2: Iterate and delete by depth (from max to min).
sorted_depths = sorted(folders_by_depth.keys(), reverse=True)
for current_depth in sorted_depths:
folders_at_current_depth = list(folders_by_depth.get(current_depth, set()))

if not folders_at_current_depth:
logging.info(
"Skipping depth %s: No folders found at this depth.", current_depth
)
continue

logging.info(
"\nProcessing depth %s: Submitting %s folders for deletion...",
current_depth,
len(folders_at_current_depth),
)

# Submit deletion tasks to the executor.
futures = [
deletion_executor.submit(delete_folder, folder_path)
for folder_path in folders_at_current_depth
]

# Wait for all tasks at the current depth to complete.
# This is critical: we must ensure all nested folders are gone before
# tackling their parents.
concurrent.futures.wait(futures)

logging.info("Finished processing all folders at depth %s.", current_depth)

except KeyboardInterrupt:
logging.info(
"Main: Keyboard interrupt received. Attempting graceful shutdown..."
)
except Exception as e:
logging.error(
"An unexpected error occurred in the main loop: %s", e, exc_info=True
)
finally:
# Signal all threads to stop.
stop_event.set()

# Shut down deletion executor and wait for any pending tasks to complete.
logging.info(
"Main: Shutting down deletion workers. Waiting for any final tasks..."
)
deletion_executor.shutdown(wait=True)

# Wait for the stats reporter to finish.
if stats_thread.is_alive():
stats_thread.join(
timeout=STATS_REPORT_INTERVAL + 2
) # Give it a bit more time.

# Log final statistics.
final_elapsed_time = time.time() - start_time
logging.info("\n--- FINAL SUMMARY ---")
with stats_lock:
final_rate = (
stats["successful_deletes"] / final_elapsed_time
if final_elapsed_time > 0
else 0
)
logging.info(
" - Total Folders Found (Initial Scan): %s\n - Successful Folder"
" Deletes: %s\n - Failed Folder Deletes (Precondition): %s\n -"
" Failed Folder Deletes (Internal): %s\n - Total Runtime: %.2f"
" seconds\n - Average Deletion Rate: %.2f folders/sec",
stats["found_total"],
stats["successful_deletes"],
stats["failed_deletes_precondition"],
stats["failed_deletes_internal"],
final_elapsed_time,
final_rate,
)
logging.info("Script execution finished.")