Skip to content

Commit 2769609

Browse files
feat(launchpad): create kafka producer (#94726)
<!-- Describe your PR here. --> <!-- Sentry employees and contractors can delete or ignore the following. --> ### Legal Boilerplate Look, I get it. The entity doing business as "Sentry" was incorporated in the State of Delaware in 2015 as Functional Software, Inc. and is gonna need some rights from me in order to utilize my contributions in this here PR. So here's the deal: I retain all rights, title and interest in and to my contributions, and by keeping this boilerplate intact I confirm that Sentry can use, modify, copy, and redistribute my contributions, under Sentry's choice of terms.
1 parent 7e57c8a commit 2769609

File tree

4 files changed

+86
-12
lines changed

4 files changed

+86
-12
lines changed

src/sentry/conf/server.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3391,6 +3391,7 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
33913391
"buffered-segments": "default",
33923392
"buffered-segments-dlq": "default",
33933393
"snuba-ourlogs": "default",
3394+
"preprod-artifact-events": "default",
33943395
# Taskworker topics
33953396
"taskworker": "default",
33963397
"taskworker-dlq": "default",
@@ -3715,6 +3716,8 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
37153716
SENTRY_PROFILE_FUNCTIONS_FUTURES_MAX_LIMIT = 10000
37163717
SENTRY_PROFILE_CHUNKS_FUTURES_MAX_LIMIT = 10000
37173718

3719+
SENTRY_PREPROD_ARTIFACT_EVENTS_FUTURES_MAX_LIMIT = 10000
3720+
37183721
# How long we should wait for a gateway proxy request to return before giving up
37193722
GATEWAY_PROXY_TIMEOUT: int | None = None
37203723

src/sentry/conf/types/kafka_definition.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class Topic(Enum):
5555
INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings"
5656
INGEST_OCCURRENCES = "ingest-occurrences"
5757
INGEST_MONITORS = "ingest-monitors"
58+
PREPROD_ARTIFACT_EVENTS = "preprod-artifact-events"
5859
MONITORS_CLOCK_TICK = "monitors-clock-tick"
5960
MONITORS_CLOCK_TASKS = "monitors-clock-tasks"
6061
MONITORS_INCIDENT_OCCURRENCES = "monitors-incident-occurrences"

src/sentry/preprod/producer.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from typing import Any
5+
6+
from arroyo import Topic as ArroyoTopic
7+
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
8+
from confluent_kafka import KafkaException
9+
from django.conf import settings
10+
11+
from sentry.conf.types.kafka_definition import Topic
12+
from sentry.utils import json
13+
from sentry.utils.arroyo_producer import SingletonProducer
14+
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
def _get_preprod_producer() -> KafkaProducer:
20+
cluster_name = get_topic_definition(Topic.PREPROD_ARTIFACT_EVENTS)["cluster"]
21+
producer_config = get_kafka_producer_cluster_options(cluster_name)
22+
producer_config.pop("compression.type", None)
23+
producer_config.pop("message.max.bytes", None)
24+
return KafkaProducer(build_kafka_configuration(default_config=producer_config))
25+
26+
27+
_preprod_producer = SingletonProducer(
28+
_get_preprod_producer, max_futures=settings.SENTRY_PREPROD_ARTIFACT_EVENTS_FUTURES_MAX_LIMIT
29+
)
30+
31+
32+
def produce_preprod_artifact_to_kafka(
33+
project_id: int,
34+
organization_id: int,
35+
artifact_id: int,
36+
**kwargs: Any,
37+
) -> None:
38+
payload_data = {
39+
"artifact_id": str(artifact_id),
40+
"project_id": str(project_id),
41+
"organization_id": str(organization_id),
42+
**kwargs,
43+
}
44+
45+
partition_key = f"{project_id}-{artifact_id}".encode()
46+
payload = KafkaPayload(partition_key, json.dumps(payload_data).encode("utf-8"), [])
47+
48+
try:
49+
topic = get_topic_definition(Topic.PREPROD_ARTIFACT_EVENTS)["real_topic_name"]
50+
_preprod_producer.produce(ArroyoTopic(topic), payload)
51+
except KafkaException:
52+
logger.exception(
53+
"Failed to send preprod artifact message to Kafka",
54+
extra={"artifact_id": artifact_id, "project_id": project_id},
55+
)

src/sentry/preprod/tasks.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
from collections.abc import Callable
77
from typing import Any
88

9+
import sentry_sdk
910
from django.db import router, transaction
1011

1112
from sentry.models.organization import Organization
1213
from sentry.models.project import Project
14+
from sentry.preprod.producer import produce_preprod_artifact_to_kafka
1315
from sentry.silo.base import SiloMode
1416
from sentry.tasks.assemble import (
1517
AssembleResult,
@@ -133,16 +135,6 @@ def assemble_preprod_artifact(
133135
},
134136
)
135137

136-
logger.info(
137-
"Finished preprod artifact assembly",
138-
extra={
139-
"timestamp": datetime.datetime.now().isoformat(),
140-
"project_id": project_id,
141-
"organization_id": org_id,
142-
"checksum": checksum,
143-
},
144-
)
145-
146138
# where next set of changes will happen
147139
# TODO: Trigger artifact processing (size analysis, etc.)
148140
# This is where you'd add logic to:
@@ -152,11 +144,13 @@ def assemble_preprod_artifact(
152144
# 4. Update state to PROCESSED when done (also update the date_built value to reflect when the artifact was built, among other fields)
153145

154146
except Exception as e:
147+
sentry_sdk.capture_exception(e)
155148
logger.exception(
156149
"Failed to assemble preprod artifact",
157150
extra={
158151
"project_id": project_id,
159152
"organization_id": org_id,
153+
"checksum": checksum,
160154
},
161155
)
162156
set_assemble_status(
@@ -166,8 +160,29 @@ def assemble_preprod_artifact(
166160
ChunkFileState.ERROR,
167161
detail=str(e),
168162
)
169-
else:
170-
set_assemble_status(AssembleTask.PREPROD_ARTIFACT, project_id, checksum, ChunkFileState.OK)
163+
return
164+
165+
# Mark assembly as successful since the artifact was created successfully
166+
set_assemble_status(AssembleTask.PREPROD_ARTIFACT, project_id, checksum, ChunkFileState.OK)
167+
168+
produce_preprod_artifact_to_kafka(
169+
project_id=project_id,
170+
organization_id=org_id,
171+
artifact_id=preprod_artifact.id,
172+
checksum=checksum,
173+
git_sha=git_sha,
174+
build_configuration=build_configuration,
175+
)
176+
177+
logger.info(
178+
"Finished preprod artifact assembly and Kafka dispatch",
179+
extra={
180+
"preprod_artifact_id": preprod_artifact.id,
181+
"project_id": project_id,
182+
"organization_id": org_id,
183+
"checksum": checksum,
184+
},
185+
)
171186

172187

173188
def _assemble_preprod_artifact(

0 commit comments

Comments
 (0)