Skip to content

fix: PLT-791: Fix OOM for update_tasks_counters_and_task_states job #7771

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions label_studio/core/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,24 @@ def batch(iterable, n=1):
yield iterable[ndx : min(ndx + n, l)]


def batched_iterator(iterable, n):
"""
TODO: replace with itertools.batched when we drop support for Python < 3.12
"""

iterator = iter(iterable)
while True:
batch = []
for _ in range(n):
try:
batch.append(next(iterator))
except StopIteration:
if batch:
yield batch
return
yield batch


def round_floats(o):
if isinstance(o, float):
return round(o, 2)
Expand Down
43 changes: 43 additions & 0 deletions label_studio/projects/functions/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from logging import getLogger
from typing import TYPE_CHECKING

from django.db.models import QuerySet
from tasks.models import AnnotationDraft, Task

logger = getLogger(__name__)
Expand All @@ -10,6 +11,48 @@
from projects.models import Project, ProjectSummary


def get_unique_ids_list(tasks_queryset):
"""
Convert various input types to a list of unique IDs.

:param tasks_queryset: Can be:
- list of IDs (integers)
- list of objects with 'id' attribute
- Django QuerySet
- set of IDs or objects
:return: list of unique IDs
"""
if isinstance(tasks_queryset, (list, tuple)):
if not tasks_queryset:
return []

Check warning on line 27 in label_studio/projects/functions/utils.py

View check run for this annotation

Codecov / codecov/patch

label_studio/projects/functions/utils.py#L27

Added line #L27 was not covered by tests

# Check if it's a list of IDs (integers)
if isinstance(tasks_queryset[0], int):
return list(set(tasks_queryset)) # Remove duplicates

Check warning on line 31 in label_studio/projects/functions/utils.py

View check run for this annotation

Codecov / codecov/patch

label_studio/projects/functions/utils.py#L31

Added line #L31 was not covered by tests

# It's a list of objects with 'id' attribute
return list(set(obj.id for obj in tasks_queryset))

elif isinstance(tasks_queryset, set):
if not tasks_queryset:
return []

# Check if it's a set of IDs (integers)
first_item = next(iter(tasks_queryset))
if isinstance(first_item, int):
return list(tasks_queryset)

# It's a set of objects with 'id' attribute
return list(obj.id for obj in tasks_queryset)

Check warning on line 46 in label_studio/projects/functions/utils.py

View check run for this annotation

Codecov / codecov/patch

label_studio/projects/functions/utils.py#L46

Added line #L46 was not covered by tests

elif isinstance(tasks_queryset, QuerySet):
# It's a Django QuerySet
return list(tasks_queryset.values_list('id', flat=True))

else:
raise ValueError(f'Unsupported type for tasks_queryset: {type(tasks_queryset)}')

Check warning on line 53 in label_studio/projects/functions/utils.py

View check run for this annotation

Codecov / codecov/patch

label_studio/projects/functions/utils.py#L53

Added line #L53 was not covered by tests


def make_queryset_from_iterable(tasks_list):
"""
Make queryset from list/set of int/Tasks
Expand Down
13 changes: 5 additions & 8 deletions label_studio/projects/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from core.redis import start_job_async_or_sync
from django.db.models import QuerySet
from django.utils.functional import cached_property
from projects.functions.utils import get_unique_ids_list

if TYPE_CHECKING:
from users.models import User
Expand All @@ -22,11 +23,8 @@ def update_tasks_counters_and_is_labeled(self, tasks_queryset, from_scratch=True
:param from_scratch: Skip calculated tasks
"""
# get only id from queryset to decrease data size in job
if not (isinstance(tasks_queryset, set) or isinstance(tasks_queryset, list)):
tasks_queryset = set(tasks_queryset.values_list('id', flat=True))
start_job_async_or_sync(
self._update_tasks_counters_and_is_labeled, list(tasks_queryset), from_scratch=from_scratch
)
task_ids = get_unique_ids_list(tasks_queryset)
start_job_async_or_sync(self._update_tasks_counters_and_is_labeled, task_ids, from_scratch=from_scratch)

def update_tasks_counters_and_task_states(
self,
Expand All @@ -46,11 +44,10 @@ def update_tasks_counters_and_task_states(
:param from_scratch: Skip calculated tasks
"""
# get only id from queryset to decrease data size in job
if not (isinstance(tasks_queryset, set) or isinstance(tasks_queryset, list)):
tasks_queryset = set(tasks_queryset.values_list('id', flat=True))
task_ids = get_unique_ids_list(tasks_queryset)
start_job_async_or_sync(
self._update_tasks_counters_and_task_states,
tasks_queryset,
task_ids,
maximum_annotations_changed,
overlap_cohort_percentage_changed,
tasks_number_changed,
Expand Down
41 changes: 24 additions & 17 deletions label_studio/tasks/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@
import shutil
import sys

from core.bulk_update_utils import bulk_update
from core.models import AsyncMigrationStatus
from core.redis import start_job_async_or_sync
from core.utils.common import batch
from core.utils.common import batch, batched_iterator
from data_export.mixins import ExportMixin
from data_export.models import DataExport
from data_export.serializers import ExportDataSerializer
from data_manager.managers import TaskQuerySet
from django.conf import settings
from django.db import transaction
from django.db.models import Count, Q
from organizations.models import Organization
from projects.models import Project
Expand Down Expand Up @@ -181,8 +179,6 @@ def update_tasks_counters(queryset, from_scratch=True):
:param from_scratch: Skip calculated tasks
:return: Count of updated tasks
"""
objs = []

total_annotations = Count('annotations', distinct=True, filter=Q(annotations__was_cancelled=False))
cancelled_annotations = Count('annotations', distinct=True, filter=Q(annotations__was_cancelled=True))
total_predictions = Count('predictions', distinct=True)
Expand Down Expand Up @@ -211,15 +207,26 @@ def update_tasks_counters(queryset, from_scratch=True):
new_total_predictions=total_predictions,
)

for task in queryset.only('id', 'total_annotations', 'cancelled_annotations', 'total_predictions'):
task.total_annotations = task.new_total_annotations
task.cancelled_annotations = task.new_cancelled_annotations
task.total_predictions = task.new_total_predictions
objs.append(task)
with transaction.atomic():
bulk_update(
objs,
update_fields=['total_annotations', 'cancelled_annotations', 'total_predictions'],
batch_size=settings.BATCH_SIZE,
)
return len(objs)
updated_count = 0

tasks_iterator = queryset.only('id', 'total_annotations', 'cancelled_annotations', 'total_predictions').iterator(
chunk_size=settings.BATCH_SIZE
)

for _batch in batched_iterator(tasks_iterator, settings.BATCH_SIZE):
batch_list = []
for task in _batch:
task.total_annotations = task.new_total_annotations
task.cancelled_annotations = task.new_cancelled_annotations
task.total_predictions = task.new_total_predictions
batch_list.append(task)

if batch_list:
Task.objects.bulk_update(
batch_list,
['total_annotations', 'cancelled_annotations', 'total_predictions'],
batch_size=settings.BATCH_SIZE,
)
updated_count += len(batch_list)

return updated_count
Loading