Skip to content

Commit 9d25831

Browse files
assafvaynerWauplin
andauthored
upload large folder operations uses batches of files for preupload-lfs jobs for xet-enabled repositories (#3228)
* modify funcs to accept lists * batch uploads post testing * make style * is_xet_enabled * change batch size constructor param to have default size of 1 * test batch size set to greater than one * style + quality * apply comments from Lucain * get n files or at least totalling 64MB * Revert "get n files or at least totalling 64MB" This reverts commit 04d31d9. --------- Co-authored-by: Lucain <lucain@huggingface.co>
1 parent 5094658 commit 9d25831

File tree

2 files changed

+79
-23
lines changed

2 files changed

+79
-23
lines changed

src/huggingface_hub/_upload_large_folder.py

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from .constants import DEFAULT_REVISION, REPO_TYPES
3434
from .utils import DEFAULT_IGNORE_PATTERNS, filter_repo_objects, tqdm
3535
from .utils._cache_manager import _format_size
36+
from .utils._runtime import is_xet_available
3637
from .utils.sha import sha_fileobj
3738

3839

@@ -45,6 +46,9 @@
4546
MAX_NB_FILES_FETCH_UPLOAD_MODE = 100
4647
COMMIT_SIZE_SCALE: List[int] = [20, 50, 75, 100, 125, 200, 250, 400, 600, 1000]
4748

49+
UPLOAD_BATCH_SIZE_XET = 256 # Max 256 files per upload batch for XET-enabled repos
50+
UPLOAD_BATCH_SIZE_LFS = 1 # Otherwise, batches of 1 for regular LFS upload
51+
4852

4953
def upload_large_folder_internal(
5054
api: "HfApi",
@@ -93,6 +97,17 @@ def upload_large_folder_internal(
9397
repo_url = api.create_repo(repo_id=repo_id, repo_type=repo_type, private=private, exist_ok=True)
9498
logger.info(f"Repo created: {repo_url}")
9599
repo_id = repo_url.repo_id
100+
# 2.1 Check if xet is enabled to set batch file upload size
101+
is_xet_enabled = (
102+
is_xet_available()
103+
and api.repo_info(
104+
repo_id=repo_id,
105+
repo_type=repo_type,
106+
revision=revision,
107+
expand="xetEnabled",
108+
).xet_enabled
109+
)
110+
upload_batch_size = UPLOAD_BATCH_SIZE_XET if is_xet_enabled else UPLOAD_BATCH_SIZE_LFS
96111

97112
# 3. List files to upload
98113
filtered_paths_list = filter_repo_objects(
@@ -110,7 +125,7 @@ def upload_large_folder_internal(
110125
]
111126

112127
# 4. Start workers
113-
status = LargeUploadStatus(items)
128+
status = LargeUploadStatus(items, upload_batch_size)
114129
threads = [
115130
threading.Thread(
116131
target=_worker_job,
@@ -168,7 +183,7 @@ class WorkerJob(enum.Enum):
168183
class LargeUploadStatus:
169184
"""Contains information, queues and tasks for a large upload process."""
170185

171-
def __init__(self, items: List[JOB_ITEM_T]):
186+
def __init__(self, items: List[JOB_ITEM_T], upload_batch_size: int = 1):
172187
self.items = items
173188
self.queue_sha256: "queue.Queue[JOB_ITEM_T]" = queue.Queue()
174189
self.queue_get_upload_mode: "queue.Queue[JOB_ITEM_T]" = queue.Queue()
@@ -179,6 +194,7 @@ def __init__(self, items: List[JOB_ITEM_T]):
179194
self.nb_workers_sha256: int = 0
180195
self.nb_workers_get_upload_mode: int = 0
181196
self.nb_workers_preupload_lfs: int = 0
197+
self.upload_batch_size: int = upload_batch_size
182198
self.nb_workers_commit: int = 0
183199
self.nb_workers_waiting: int = 0
184200
self.last_commit_attempt: Optional[float] = None
@@ -353,16 +369,17 @@ def _worker_job(
353369
status.nb_workers_get_upload_mode -= 1
354370

355371
elif job == WorkerJob.PREUPLOAD_LFS:
356-
item = items[0] # single item
357372
try:
358-
_preupload_lfs(item, api=api, repo_id=repo_id, repo_type=repo_type, revision=revision)
359-
status.queue_commit.put(item)
373+
_preupload_lfs(items, api=api, repo_id=repo_id, repo_type=repo_type, revision=revision)
374+
for item in items:
375+
status.queue_commit.put(item)
360376
except KeyboardInterrupt:
361377
raise
362378
except Exception as e:
363379
logger.error(f"Failed to preupload LFS: {e}")
364380
traceback.format_exc()
365-
status.queue_preupload_lfs.put(item)
381+
for item in items:
382+
status.queue_preupload_lfs.put(item)
366383

367384
with status.lock:
368385
status.nb_workers_preupload_lfs -= 1
@@ -417,11 +434,11 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
417434
logger.debug(f"Job: get upload mode (>{MAX_NB_FILES_FETCH_UPLOAD_MODE} files ready)")
418435
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, MAX_NB_FILES_FETCH_UPLOAD_MODE))
419436

420-
# 4. Preupload LFS file if at least 1 file and no worker is preuploading LFS
421-
elif status.queue_preupload_lfs.qsize() > 0 and status.nb_workers_preupload_lfs == 0:
437+
# 4. Preupload LFS file if at least `status.upload_batch_size` files and no worker is preuploading LFS
438+
elif status.queue_preupload_lfs.qsize() >= status.upload_batch_size and status.nb_workers_preupload_lfs == 0:
422439
status.nb_workers_preupload_lfs += 1
423440
logger.debug("Job: preupload LFS (no other worker preuploading LFS)")
424-
return (WorkerJob.PREUPLOAD_LFS, _get_one(status.queue_preupload_lfs))
441+
return (WorkerJob.PREUPLOAD_LFS, _get_n(status.queue_preupload_lfs, status.upload_batch_size))
425442

426443
# 5. Compute sha256 if at least 1 file and no worker is computing sha256
427444
elif status.queue_sha256.qsize() > 0 and status.nb_workers_sha256 == 0:
@@ -435,14 +452,14 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
435452
logger.debug("Job: get upload mode (no other worker getting upload mode)")
436453
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, MAX_NB_FILES_FETCH_UPLOAD_MODE))
437454

438-
# 7. Preupload LFS file if at least 1 file
455+
# 7. Preupload LFS file if at least `status.upload_batch_size` files
439456
# Skip if hf_transfer is enabled and there is already a worker preuploading LFS
440-
elif status.queue_preupload_lfs.qsize() > 0 and (
457+
elif status.queue_preupload_lfs.qsize() >= status.upload_batch_size and (
441458
status.nb_workers_preupload_lfs == 0 or not constants.HF_HUB_ENABLE_HF_TRANSFER
442459
):
443460
status.nb_workers_preupload_lfs += 1
444461
logger.debug("Job: preupload LFS")
445-
return (WorkerJob.PREUPLOAD_LFS, _get_one(status.queue_preupload_lfs))
462+
return (WorkerJob.PREUPLOAD_LFS, _get_n(status.queue_preupload_lfs, status.upload_batch_size))
446463

447464
# 8. Compute sha256 if at least 1 file
448465
elif status.queue_sha256.qsize() > 0:
@@ -456,7 +473,13 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
456473
logger.debug("Job: get upload mode")
457474
return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, MAX_NB_FILES_FETCH_UPLOAD_MODE))
458475

459-
# 10. Commit if at least 1 file and 1 min since last commit attempt
476+
# 10. Preupload LFS file if at least 1 file
477+
elif status.queue_preupload_lfs.qsize() > 0:
478+
status.nb_workers_preupload_lfs += 1
479+
logger.debug("Job: preupload LFS")
480+
return (WorkerJob.PREUPLOAD_LFS, _get_n(status.queue_preupload_lfs, status.upload_batch_size))
481+
482+
# 11. Commit if at least 1 file and 1 min since last commit attempt
460483
elif (
461484
status.nb_workers_commit == 0
462485
and status.queue_commit.qsize() > 0
@@ -467,7 +490,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
467490
logger.debug("Job: commit (1 min since last commit attempt)")
468491
return (WorkerJob.COMMIT, _get_n(status.queue_commit, status.target_chunk()))
469492

470-
# 11. Commit if at least 1 file all other queues are empty and all workers are waiting
493+
# 12. Commit if at least 1 file all other queues are empty and all workers are waiting
471494
# e.g. when it's the last commit
472495
elif (
473496
status.nb_workers_commit == 0
@@ -483,12 +506,12 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
483506
logger.debug("Job: commit")
484507
return (WorkerJob.COMMIT, _get_n(status.queue_commit, status.target_chunk()))
485508

486-
# 12. If all queues are empty, exit
509+
# 13. If all queues are empty, exit
487510
elif all(metadata.is_committed or metadata.should_ignore for _, metadata in status.items):
488511
logger.info("All files have been processed! Exiting worker.")
489512
return None
490513

491-
# 13. If no task is available, wait
514+
# 14. If no task is available, wait
492515
else:
493516
status.nb_workers_waiting += 1
494517
logger.debug(f"No task available, waiting... ({WAITING_TIME_IF_NO_TASKS}s)")
@@ -531,19 +554,19 @@ def _get_upload_mode(items: List[JOB_ITEM_T], api: "HfApi", repo_id: str, repo_t
531554
metadata.save(paths)
532555

533556

534-
def _preupload_lfs(item: JOB_ITEM_T, api: "HfApi", repo_id: str, repo_type: str, revision: str) -> None:
535-
"""Preupload LFS file and update metadata."""
536-
paths, metadata = item
537-
addition = _build_hacky_operation(item)
557+
def _preupload_lfs(items: List[JOB_ITEM_T], api: "HfApi", repo_id: str, repo_type: str, revision: str) -> None:
558+
"""Preupload LFS files and update metadata."""
559+
additions = [_build_hacky_operation(item) for item in items]
538560
api.preupload_lfs_files(
539561
repo_id=repo_id,
540562
repo_type=repo_type,
541563
revision=revision,
542-
additions=[addition],
564+
additions=additions,
543565
)
544566

545-
metadata.is_uploaded = True
546-
metadata.save(paths)
567+
for paths, metadata in items:
568+
metadata.is_uploaded = True
569+
metadata.save(paths)
547570

548571

549572
def _commit(items: List[JOB_ITEM_T], api: "HfApi", repo_id: str, repo_type: str, revision: str) -> None:

tests/test_xet_upload.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,39 @@ def test_upload_large_folder(self, api, tmp_path, repo_url: RepoUrl) -> None:
279279
regular_file = local_dir / f"subfolder_{i}/file_regular_{i}_{j}.txt"
280280
assert regular_file.read_bytes() == f"content_regular_{i}_{j}".encode()
281281

282+
def test_upload_large_folder_batch_size_greater_than_one(self, api, tmp_path, repo_url: RepoUrl) -> None:
283+
from hf_xet import upload_files as real_upload_files
284+
285+
N_FILES = 500
286+
repo_id = repo_url.repo_id
287+
288+
folder = Path(tmp_path) / "large_folder"
289+
folder.mkdir()
290+
for i in range(N_FILES):
291+
(folder / f"file_xet_{i}.bin").write_bytes(f"content_lfs_{i}".encode())
292+
293+
# capture the number of files passed in per call to hf_xet.upload_files
294+
# to ensure that the batch size is respected.
295+
num_files_per_call = []
296+
297+
def spy_upload_files(*args, **kwargs):
298+
num_files = len(args[0])
299+
num_files_per_call.append(num_files)
300+
return real_upload_files(*args, **kwargs)
301+
302+
with assert_upload_mode("xet"):
303+
with patch("hf_xet.upload_files", side_effect=spy_upload_files):
304+
api.upload_large_folder(repo_id=repo_id, repo_type="model", folder_path=folder, num_workers=4)
305+
306+
# the batch size is set to 256 however due to speed of hashing and get_upload_mode calls it's not always guaranteed
307+
# that the files will be uploaded in batches of 256. They may be uploaded in smaller batches if no other jobs
308+
# are available to run; even as small as 1 file per call.
309+
#
310+
# However, it would be unlikely that all files are uploaded in batches of 1 if batching was correctly implemented.
311+
# So we assert that not all files were uploaded in batches of 1, although it is possible even with batching.
312+
313+
assert any(n > 1 for n in num_files_per_call)
314+
282315

283316
@requires("hf_xet")
284317
class TestXetE2E(TestXetUpload):

0 commit comments

Comments
 (0)