42
42
logger = logging .getLogger (__name__ )
43
43
44
44
WAITING_TIME_IF_NO_TASKS = 10 # seconds
45
- MAX_NB_REGULAR_FILES_PER_COMMIT = 75
46
- MAX_NB_LFS_FILES_PER_COMMIT = 150
45
+ MAX_NB_FILES_FETCH_UPLOAD_MODE = 100
47
46
COMMIT_SIZE_SCALE : List [int ] = [20 , 50 , 75 , 100 , 125 , 200 , 250 , 400 , 600 , 1000 ]
48
47
49
48
@@ -404,19 +403,19 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
404
403
):
405
404
status .nb_workers_commit += 1
406
405
logger .debug ("Job: commit (more than 5 minutes since last commit attempt)" )
407
- return (WorkerJob .COMMIT , _get_items_to_commit (status .queue_commit ))
406
+ return (WorkerJob .COMMIT , _get_n (status .queue_commit , status . target_chunk () ))
408
407
409
408
# 2. Commit if at least 100 files are ready to commit
410
409
elif status .nb_workers_commit == 0 and status .queue_commit .qsize () >= 150 :
411
410
status .nb_workers_commit += 1
412
411
logger .debug ("Job: commit (>100 files ready)" )
413
- return (WorkerJob .COMMIT , _get_items_to_commit (status .queue_commit ))
412
+ return (WorkerJob .COMMIT , _get_n (status .queue_commit , status . target_chunk () ))
414
413
415
- # 3. Get upload mode if at least 10 files
416
- elif status .queue_get_upload_mode .qsize () >= 10 :
414
+ # 3. Get upload mode if at least 100 files
415
+ elif status .queue_get_upload_mode .qsize () >= MAX_NB_FILES_FETCH_UPLOAD_MODE :
417
416
status .nb_workers_get_upload_mode += 1
418
- logger .debug ("Job: get upload mode (>10 files ready)" )
419
- return (WorkerJob .GET_UPLOAD_MODE , _get_n (status .queue_get_upload_mode , status . target_chunk () ))
417
+ logger .debug (f "Job: get upload mode (>{ MAX_NB_FILES_FETCH_UPLOAD_MODE } files ready)" )
418
+ return (WorkerJob .GET_UPLOAD_MODE , _get_n (status .queue_get_upload_mode , MAX_NB_FILES_FETCH_UPLOAD_MODE ))
420
419
421
420
# 4. Preupload LFS file if at least 1 file and no worker is preuploading LFS
422
421
elif status .queue_preupload_lfs .qsize () > 0 and status .nb_workers_preupload_lfs == 0 :
@@ -434,7 +433,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
434
433
elif status .queue_get_upload_mode .qsize () > 0 and status .nb_workers_get_upload_mode == 0 :
435
434
status .nb_workers_get_upload_mode += 1
436
435
logger .debug ("Job: get upload mode (no other worker getting upload mode)" )
437
- return (WorkerJob .GET_UPLOAD_MODE , _get_n (status .queue_get_upload_mode , status . target_chunk () ))
436
+ return (WorkerJob .GET_UPLOAD_MODE , _get_n (status .queue_get_upload_mode , MAX_NB_FILES_FETCH_UPLOAD_MODE ))
438
437
439
438
# 7. Preupload LFS file if at least 1 file
440
439
# Skip if hf_transfer is enabled and there is already a worker preuploading LFS
@@ -455,7 +454,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
455
454
elif status .queue_get_upload_mode .qsize () > 0 :
456
455
status .nb_workers_get_upload_mode += 1
457
456
logger .debug ("Job: get upload mode" )
458
- return (WorkerJob .GET_UPLOAD_MODE , _get_n (status .queue_get_upload_mode , status . target_chunk () ))
457
+ return (WorkerJob .GET_UPLOAD_MODE , _get_n (status .queue_get_upload_mode , MAX_NB_FILES_FETCH_UPLOAD_MODE ))
459
458
460
459
# 10. Commit if at least 1 file and 1 min since last commit attempt
461
460
elif (
@@ -466,7 +465,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
466
465
):
467
466
status .nb_workers_commit += 1
468
467
logger .debug ("Job: commit (1 min since last commit attempt)" )
469
- return (WorkerJob .COMMIT , _get_items_to_commit (status .queue_commit ))
468
+ return (WorkerJob .COMMIT , _get_n (status .queue_commit , status . target_chunk () ))
470
469
471
470
# 11. Commit if at least 1 file all other queues are empty and all workers are waiting
472
471
# e.g. when it's the last commit
@@ -482,7 +481,7 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob,
482
481
):
483
482
status .nb_workers_commit += 1
484
483
logger .debug ("Job: commit" )
485
- return (WorkerJob .COMMIT , _get_items_to_commit (status .queue_commit ))
484
+ return (WorkerJob .COMMIT , _get_n (status .queue_commit , status . target_chunk () ))
486
485
487
486
# 12. If all queues are empty, exit
488
487
elif all (metadata .is_committed or metadata .should_ignore for _ , metadata in status .items ):
@@ -528,6 +527,7 @@ def _get_upload_mode(items: List[JOB_ITEM_T], api: "HfApi", repo_id: str, repo_t
528
527
paths , metadata = item
529
528
metadata .upload_mode = addition ._upload_mode
530
529
metadata .should_ignore = addition ._should_ignore
530
+ metadata .remote_oid = addition ._remote_oid
531
531
metadata .save (paths )
532
532
533
533
@@ -580,6 +580,9 @@ def _build_hacky_operation(item: JOB_ITEM_T) -> HackyCommitOperationAdd:
580
580
if metadata .sha256 is None :
581
581
raise ValueError ("sha256 must have been computed by now!" )
582
582
operation .upload_info = UploadInfo (sha256 = bytes .fromhex (metadata .sha256 ), size = metadata .size , sample = sample )
583
+ operation ._upload_mode = metadata .upload_mode # type: ignore[assignment]
584
+ operation ._should_ignore = metadata .should_ignore
585
+ operation ._remote_oid = metadata .remote_oid
583
586
return operation
584
587
585
588
@@ -596,30 +599,6 @@ def _get_n(queue: "queue.Queue[JOB_ITEM_T]", n: int) -> List[JOB_ITEM_T]:
596
599
return [queue .get () for _ in range (min (queue .qsize (), n ))]
597
600
598
601
599
- def _get_items_to_commit (queue : "queue.Queue[JOB_ITEM_T]" ) -> List [JOB_ITEM_T ]:
600
- """Special case for commit job: the number of items to commit depends on the type of files."""
601
- # Can take at most 50 regular files and/or 100 LFS files in a single commit
602
- items : List [JOB_ITEM_T ] = []
603
- nb_lfs , nb_regular = 0 , 0
604
- while True :
605
- # If empty queue => commit everything
606
- if queue .qsize () == 0 :
607
- return items
608
-
609
- # If we have enough items => commit them
610
- if nb_lfs >= MAX_NB_LFS_FILES_PER_COMMIT or nb_regular >= MAX_NB_REGULAR_FILES_PER_COMMIT :
611
- return items
612
-
613
- # Else, get a new item and increase counter
614
- item = queue .get ()
615
- items .append (item )
616
- _ , metadata = item
617
- if metadata .upload_mode == "lfs" :
618
- nb_lfs += 1
619
- else :
620
- nb_regular += 1
621
-
622
-
623
602
def _print_overwrite (report : str ) -> None :
624
603
"""Print a report, overwriting the previous lines.
625
604
0 commit comments