Skip to content

Commit 6ddbb1c

Browse files
feat(launchpad): ensure artifact assembly task retries in case of kafka error (#94728)
The retry behavior: - If the Kafka producer raises a KafkaException, the task will automatically retry - It will retry up to 3 times with exponential backoff - (Idempotency) Since the task checks for existing artifacts at the beginning, re-running won't create duplicates - The artifact creation happens in a transaction, so partial failures are rolled back
1 parent 504ee28 commit 6ddbb1c

File tree

3 files changed

+70
-71
lines changed

3 files changed

+70
-71
lines changed

src/sentry/preprod/producer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,4 @@ def produce_preprod_artifact_to_kafka(
5353
"Failed to send preprod artifact message to Kafka",
5454
extra={"artifact_id": artifact_id, "project_id": project_id},
5555
)
56+
raise # Re-raise to trigger task retry

src/sentry/preprod/tasks.py

Lines changed: 68 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from sentry.tasks.base import instrumented_task
2424
from sentry.taskworker.config import TaskworkerConfig
2525
from sentry.taskworker.namespaces import attachments_tasks
26+
from sentry.taskworker.retry import Retry
2627
from sentry.utils.sdk import bind_organization_context
2728

2829
logger = logging.getLogger(__name__)
@@ -32,6 +33,7 @@
3233
name="sentry.preprod.tasks.assemble_preprod_artifact",
3334
queue="assemble",
3435
silo_mode=SiloMode.REGION,
36+
retry=Retry(times=3),
3537
taskworker_config=TaskworkerConfig(
3638
namespace=attachments_tasks,
3739
processing_deadline_duration=30,
@@ -62,6 +64,7 @@ def assemble_preprod_artifact(
6264
},
6365
)
6466

67+
preprod_artifact = None
6568
try:
6669
organization = Organization.objects.get_from_cache(pk=org_id)
6770
project = Project.objects.get(id=project_id, organization=organization)
@@ -81,67 +84,64 @@ def assemble_preprod_artifact(
8184

8285
if existing_artifact:
8386
logger.info(
84-
"PreprodArtifact already exists for this checksum, skipping assembly",
87+
"PreprodArtifact already exists for this checksum, using existing artifact",
8588
extra={
8689
"preprod_artifact_id": existing_artifact.id,
8790
"project_id": project_id,
8891
"organization_id": org_id,
8992
"checksum": checksum,
9093
},
9194
)
95+
preprod_artifact = existing_artifact
96+
else:
9297
set_assemble_status(
93-
AssembleTask.PREPROD_ARTIFACT, project_id, checksum, ChunkFileState.OK
94-
)
95-
return
96-
97-
set_assemble_status(
98-
AssembleTask.PREPROD_ARTIFACT, project_id, checksum, ChunkFileState.ASSEMBLING
99-
)
100-
101-
assemble_result = assemble_file(
102-
task=AssembleTask.PREPROD_ARTIFACT,
103-
org_or_project=project,
104-
name=f"preprod-artifact-{uuid.uuid4().hex}",
105-
checksum=checksum,
106-
chunks=chunks,
107-
file_type="preprod.artifact",
108-
)
109-
110-
if assemble_result is None:
111-
return
112-
113-
build_config = None
114-
if build_configuration:
115-
build_config, _ = PreprodBuildConfiguration.objects.get_or_create(
116-
project=project,
117-
name=build_configuration,
98+
AssembleTask.PREPROD_ARTIFACT, project_id, checksum, ChunkFileState.ASSEMBLING
11899
)
119100

120-
# Create PreprodArtifact record
121-
preprod_artifact = PreprodArtifact.objects.create(
122-
project=project,
123-
file_id=assemble_result.bundle.id,
124-
build_configuration=build_config,
125-
state=PreprodArtifact.ArtifactState.UPLOADED,
126-
)
101+
assemble_result = assemble_file(
102+
task=AssembleTask.PREPROD_ARTIFACT,
103+
org_or_project=project,
104+
name=f"preprod-artifact-{uuid.uuid4().hex}",
105+
checksum=checksum,
106+
chunks=chunks,
107+
file_type="preprod.artifact",
108+
)
127109

128-
logger.info(
129-
"Created preprod artifact",
130-
extra={
131-
"preprod_artifact_id": preprod_artifact.id,
132-
"project_id": project_id,
133-
"organization_id": org_id,
134-
"checksum": checksum,
135-
},
136-
)
110+
if assemble_result is None:
111+
logger.warning(
112+
"Assemble result is None, returning early",
113+
extra={
114+
"project_id": project_id,
115+
"organization_id": org_id,
116+
"checksum": checksum,
117+
},
118+
)
119+
return
120+
121+
build_config = None
122+
if build_configuration:
123+
build_config, _ = PreprodBuildConfiguration.objects.get_or_create(
124+
project=project,
125+
name=build_configuration,
126+
)
127+
128+
# Create PreprodArtifact record
129+
preprod_artifact, _ = PreprodArtifact.objects.get_or_create(
130+
project=project,
131+
file_id=assemble_result.bundle.id,
132+
build_configuration=build_config,
133+
state=PreprodArtifact.ArtifactState.UPLOADED,
134+
)
137135

138-
# where next set of changes will happen
139-
# TODO: Trigger artifact processing (size analysis, etc.)
140-
# This is where you'd add logic to:
141-
# 1. create_or_update a new row in the Commit table as well (once base_sha is added as a column to it)
142-
# 2. Detect artifact type (iOS/Android/etc.)
143-
# 3. Queue processing tasks
144-
# 4. Update state to PROCESSED when done (also update the date_built value to reflect when the artifact was built, among other fields)
136+
logger.info(
137+
"Created preprod artifact",
138+
extra={
139+
"preprod_artifact_id": preprod_artifact.id,
140+
"project_id": project_id,
141+
"organization_id": org_id,
142+
"checksum": checksum,
143+
},
144+
)
145145

146146
except Exception as e:
147147
sentry_sdk.capture_exception(e)
@@ -165,27 +165,25 @@ def assemble_preprod_artifact(
165165
# Mark assembly as successful since the artifact was created successfully
166166
set_assemble_status(AssembleTask.PREPROD_ARTIFACT, project_id, checksum, ChunkFileState.OK)
167167

168-
produce_preprod_artifact_to_kafka(
169-
project_id=project_id,
170-
organization_id=org_id,
171-
artifact_id=preprod_artifact.id,
172-
checksum=checksum,
173-
git_sha=git_sha,
174-
build_configuration=build_configuration,
175-
)
168+
if preprod_artifact:
169+
produce_preprod_artifact_to_kafka(
170+
project_id=project_id,
171+
organization_id=org_id,
172+
artifact_id=preprod_artifact.id,
173+
)
176174

177-
logger.info(
178-
"Finished preprod artifact assembly and Kafka dispatch",
179-
extra={
180-
"preprod_artifact_id": preprod_artifact.id,
181-
"project_id": project_id,
182-
"organization_id": org_id,
183-
"checksum": checksum,
184-
},
185-
)
175+
logger.info(
176+
"Finished preprod artifact assembly and Kafka dispatch",
177+
extra={
178+
"preprod_artifact_id": preprod_artifact.id,
179+
"project_id": project_id,
180+
"organization_id": org_id,
181+
"checksum": checksum,
182+
},
183+
)
186184

187185

188-
def _assemble_preprod_artifact(
186+
def _assemble_preprod_artifact_file(
189187
assemble_task: str,
190188
project_id: int,
191189
org_id: int,
@@ -194,7 +192,7 @@ def _assemble_preprod_artifact(
194192
callback: Callable[[AssembleResult, Any], None],
195193
):
196194
logger.info(
197-
"Starting preprod artifact assembly",
195+
"Starting preprod file assembly",
198196
extra={
199197
"timestamp": datetime.datetime.now().isoformat(),
200198
"project_id": project_id,
@@ -229,7 +227,7 @@ def _assemble_preprod_artifact(
229227
callback(assemble_result, project)
230228
except Exception as e:
231229
logger.exception(
232-
"Failed to assemble preprod artifact",
230+
"Failed to assemble preprod file",
233231
extra={
234232
"project_id": project_id,
235233
"organization_id": org_id,
@@ -322,7 +320,7 @@ def assemble_preprod_artifact_size_analysis(
322320
"""
323321
Creates a size analysis file for a preprod artifact from uploaded chunks.
324322
"""
325-
_assemble_preprod_artifact(
323+
_assemble_preprod_artifact_file(
326324
AssembleTask.PREPROD_ARTIFACT_SIZE_ANALYSIS,
327325
project_id,
328326
org_id,

tests/sentry/preprod/test_tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ def __init__(self):
262262
with (
263263
patch("sentry.preprod.tasks.assemble_file", return_value=MockAssembleResult()),
264264
patch.object(
265-
PreprodArtifact.objects, "create", side_effect=Exception("Simulated failure")
265+
PreprodArtifact.objects, "get_or_create", side_effect=Exception("Simulated failure")
266266
),
267267
):
268268

0 commit comments

Comments
 (0)