Skip to content

Commit 3423d8b

Browse files
authored
ref(processor): Remove project id state field (#4388)
1 parent 06f956a commit 3423d8b

File tree

4 files changed

+67
-85
lines changed

4 files changed

+67
-85
lines changed

relay-server/src/services/processor.rs

Lines changed: 60 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -776,13 +776,6 @@ struct ProcessEnvelopeState<Group> {
776776
/// This is the config used for trace-based dynamic sampling.
777777
sampling_project_info: Option<Arc<ProjectInfo>>,
778778

779-
/// The id of the project that this envelope is ingested into.
780-
///
781-
/// This identifier can differ from the one stated in the Envelope's DSN if the key was moved to
782-
/// a new project or on the legacy endpoint. In that case, normalization will update the project
783-
/// ID.
784-
project_id: ProjectId,
785-
786779
/// The managed envelope before processing.
787780
managed_envelope: TypedEnvelope<Group>,
788781
}
@@ -1230,13 +1223,17 @@ impl EnvelopeProcessorService {
12301223

12311224
/// Normalize monitor check-ins and remove invalid ones.
12321225
#[cfg(feature = "processing")]
1233-
fn process_check_ins(&self, state: &mut ProcessEnvelopeState<CheckInGroup>) {
1226+
fn process_check_ins(
1227+
&self,
1228+
state: &mut ProcessEnvelopeState<CheckInGroup>,
1229+
project_id: ProjectId,
1230+
) {
12341231
state.managed_envelope.retain_items(|item| {
12351232
if item.ty() != &ItemType::CheckIn {
12361233
return ItemAction::Keep;
12371234
}
12381235

1239-
match relay_monitors::process_check_in(&item.payload(), state.project_id) {
1236+
match relay_monitors::process_check_in(&item.payload(), project_id) {
12401237
Ok(result) => {
12411238
item.set_routing_hint(result.routing_hint);
12421239
item.set_payload(ContentType::Json, result.payload);
@@ -1254,50 +1251,6 @@ impl EnvelopeProcessorService {
12541251
})
12551252
}
12561253

1257-
/// Creates and initializes the processing state.
1258-
///
1259-
/// This applies defaults to the envelope and initializes empty rate limits.
1260-
#[allow(clippy::too_many_arguments)]
1261-
fn prepare_state<G>(
1262-
&self,
1263-
config: Arc<Config>,
1264-
mut managed_envelope: TypedEnvelope<G>,
1265-
project_id: ProjectId,
1266-
project_info: Arc<ProjectInfo>,
1267-
rate_limits: Arc<RateLimits>,
1268-
sampling_project_info: Option<Arc<ProjectInfo>>,
1269-
) -> ProcessEnvelopeState<G> {
1270-
let envelope = managed_envelope.envelope_mut();
1271-
1272-
// Set the event retention. Effectively, this value will only be available in processing
1273-
// mode when the full project config is queried from the upstream.
1274-
if let Some(retention) = project_info.config.event_retention {
1275-
envelope.set_retention(retention);
1276-
}
1277-
1278-
// Ensure the project ID is updated to the stored instance for this project cache. This can
1279-
// differ in two cases:
1280-
// 1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1281-
// 2. The DSN was moved and the envelope sent to the old project ID.
1282-
envelope.meta_mut().set_project_id(project_id);
1283-
1284-
let extracted_metrics = ProcessingExtractedMetrics::new();
1285-
1286-
ProcessEnvelopeState {
1287-
event: Annotated::empty(),
1288-
event_metrics_extracted: false,
1289-
spans_extracted: false,
1290-
metrics: Metrics::default(),
1291-
extracted_metrics,
1292-
project_info,
1293-
rate_limits,
1294-
config,
1295-
sampling_project_info,
1296-
project_id,
1297-
managed_envelope,
1298-
}
1299-
}
1300-
13011254
#[cfg(feature = "processing")]
13021255
fn enforce_quotas<G>(
13031256
&self,
@@ -1332,6 +1285,7 @@ impl EnvelopeProcessorService {
13321285
fn extract_transaction_metrics(
13331286
&self,
13341287
state: &mut ProcessEnvelopeState<TransactionGroup>,
1288+
project_id: ProjectId,
13351289
sampling_decision: SamplingDecision,
13361290
) -> Result<(), ProcessingError> {
13371291
if state.event_metrics_extracted {
@@ -1406,7 +1360,7 @@ impl EnvelopeProcessorService {
14061360
event,
14071361
combined_config,
14081362
sampling_decision,
1409-
state.project_id,
1363+
project_id,
14101364
self.inner
14111365
.config
14121366
.aggregator_config_for(MetricNamespace::Spans)
@@ -1431,7 +1385,7 @@ impl EnvelopeProcessorService {
14311385
generic_config: Some(combined_config),
14321386
transaction_from_dsc,
14331387
sampling_decision,
1434-
target_project_id: state.project_id,
1388+
target_project_id: project_id,
14351389
};
14361390

14371391
state
@@ -1447,6 +1401,7 @@ impl EnvelopeProcessorService {
14471401
fn normalize_event<G: EventProcessing>(
14481402
&self,
14491403
state: &mut ProcessEnvelopeState<G>,
1404+
project_id: ProjectId,
14501405
mut event_fully_normalized: EventFullyNormalized,
14511406
) -> Result<Option<EventFullyNormalized>, ProcessingError> {
14521407
if !state.has_event() {
@@ -1510,7 +1465,7 @@ impl EnvelopeProcessorService {
15101465
}
15111466

15121467
let normalization_config = NormalizationConfig {
1513-
project_id: Some(state.project_id.value()),
1468+
project_id: Some(project_id.value()),
15141469
client: request_meta.client().map(str::to_owned),
15151470
key_id,
15161471
protocol_version: Some(request_meta.version().to_string()),
@@ -1593,6 +1548,7 @@ impl EnvelopeProcessorService {
15931548
fn process_errors(
15941549
&self,
15951550
state: &mut ProcessEnvelopeState<ErrorGroup>,
1551+
project_id: ProjectId,
15961552
) -> Result<(), ProcessingError> {
15971553
let mut event_fully_normalized = EventFullyNormalized::new(state.envelope());
15981554

@@ -1616,7 +1572,7 @@ impl EnvelopeProcessorService {
16161572

16171573
event::finalize(state, &self.inner.config)?;
16181574
if let Some(inner_event_fully_normalized) =
1619-
self.normalize_event(state, event_fully_normalized)?
1575+
self.normalize_event(state, project_id, event_fully_normalized)?
16201576
{
16211577
event_fully_normalized = inner_event_fully_normalized;
16221578
};
@@ -1640,7 +1596,7 @@ impl EnvelopeProcessorService {
16401596

16411597
if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
16421598
relay_log::error!(
1643-
tags.project = %state.project_id,
1599+
tags.project = %project_id,
16441600
tags.ty = state.event_type().map(|e| e.to_string()).unwrap_or("none".to_owned()),
16451601
"ingested event without normalizing"
16461602
);
@@ -1653,6 +1609,7 @@ impl EnvelopeProcessorService {
16531609
fn process_transactions(
16541610
&self,
16551611
state: &mut ProcessEnvelopeState<TransactionGroup>,
1612+
project_id: ProjectId,
16561613
reservoir_counters: ReservoirCounters,
16571614
) -> Result<(), ProcessingError> {
16581615
let mut event_fully_normalized = EventFullyNormalized::new(state.envelope());
@@ -1661,12 +1618,12 @@ impl EnvelopeProcessorService {
16611618

16621619
event::extract(state, event_fully_normalized, &self.inner.config)?;
16631620

1664-
let profile_id = profile::filter(state);
1621+
let profile_id = profile::filter(state, project_id);
16651622
profile::transfer_id(state, profile_id);
16661623

16671624
event::finalize(state, &self.inner.config)?;
16681625
if let Some(inner_event_fully_normalized) =
1669-
self.normalize_event(state, event_fully_normalized)?
1626+
self.normalize_event(state, project_id, event_fully_normalized)?
16701627
{
16711628
event_fully_normalized = inner_event_fully_normalized;
16721629
}
@@ -1701,7 +1658,7 @@ impl EnvelopeProcessorService {
17011658
// Before metric extraction to make sure the profile count is reflected correctly.
17021659
profile::process(state, &global_config);
17031660
// Extract metrics here, we're about to drop the event/transaction.
1704-
self.extract_transaction_metrics(state, SamplingDecision::Drop)?;
1661+
self.extract_transaction_metrics(state, project_id, SamplingDecision::Drop)?;
17051662

17061663
dynamic_sampling::drop_unsampled_items(state, outcome);
17071664

@@ -1728,7 +1685,7 @@ impl EnvelopeProcessorService {
17281685
profile::transfer_id(state, profile_id);
17291686

17301687
// Always extract metrics in processing Relays for sampled items.
1731-
self.extract_transaction_metrics(state, SamplingDecision::Keep)?;
1688+
self.extract_transaction_metrics(state, project_id, SamplingDecision::Keep)?;
17321689

17331690
if state
17341691
.project_info
@@ -1749,7 +1706,7 @@ impl EnvelopeProcessorService {
17491706

17501707
if self.inner.config.processing_enabled() && !event_fully_normalized.0 {
17511708
relay_log::error!(
1752-
tags.project = %state.project_id,
1709+
tags.project = %project_id,
17531710
tags.ty = state.event_type().map(|e| e.to_string()).unwrap_or("none".to_owned()),
17541711
"ingested event without normalizing"
17551712
);
@@ -1777,8 +1734,9 @@ impl EnvelopeProcessorService {
17771734
fn process_standalone(
17781735
&self,
17791736
state: &mut ProcessEnvelopeState<StandaloneGroup>,
1737+
project_id: ProjectId,
17801738
) -> Result<(), ProcessingError> {
1781-
profile::filter(state);
1739+
profile::filter(state, project_id);
17821740

17831741
if_processing!(self.inner.config, {
17841742
self.enforce_quotas(state)?;
@@ -1834,11 +1792,12 @@ impl EnvelopeProcessorService {
18341792
/// Processes cron check-ins.
18351793
fn process_checkins(
18361794
&self,
1837-
_state: &mut ProcessEnvelopeState<CheckInGroup>,
1795+
#[allow(unused_variables)] state: &mut ProcessEnvelopeState<CheckInGroup>,
1796+
#[allow(unused_variables)] project_id: ProjectId,
18381797
) -> Result<(), ProcessingError> {
18391798
if_processing!(self.inner.config, {
1840-
self.enforce_quotas(_state)?;
1841-
self.process_check_ins(_state);
1799+
self.enforce_quotas(state)?;
1800+
self.process_check_ins(state, project_id);
18421801
});
18431802
Ok(())
18441803
}
@@ -1849,6 +1808,7 @@ impl EnvelopeProcessorService {
18491808
fn process_standalone_spans(
18501809
&self,
18511810
state: &mut ProcessEnvelopeState<SpanGroup>,
1811+
#[allow(unused_variables)] project_id: ProjectId,
18521812
#[allow(unused_variables)] reservoir_counters: ReservoirCounters,
18531813
) -> Result<(), ProcessingError> {
18541814
span::filter(state);
@@ -1863,6 +1823,7 @@ impl EnvelopeProcessorService {
18631823

18641824
span::process(
18651825
state,
1826+
project_id,
18661827
&global_config,
18671828
self.inner.geoip_lookup.as_ref(),
18681829
&reservoir,
@@ -1895,17 +1856,37 @@ impl EnvelopeProcessorService {
18951856
.parametrize_dsc_transaction(&sampling_state.config.tx_name_rules);
18961857
}
18971858

1859+
// Set the event retention. Effectively, this value will only be available in processing
1860+
// mode when the full project config is queried from the upstream.
1861+
if let Some(retention) = project_info.config.event_retention {
1862+
managed_envelope.envelope_mut().set_retention(retention);
1863+
}
1864+
1865+
// Ensure the project ID is updated to the stored instance for this project cache. This can
1866+
// differ in two cases:
1867+
// 1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
1868+
// 2. The DSN was moved and the envelope sent to the old project ID.
1869+
managed_envelope
1870+
.envelope_mut()
1871+
.meta_mut()
1872+
.set_project_id(project_id);
1873+
18981874
macro_rules! run {
18991875
($fn_name:ident $(, $args:expr)*) => {{
19001876
let managed_envelope = managed_envelope.try_into()?;
1901-
let mut state = self.prepare_state(
1902-
self.inner.config.clone(),
1903-
managed_envelope,
1904-
project_id,
1877+
let mut state = ProcessEnvelopeState {
1878+
event: Annotated::empty(),
1879+
event_metrics_extracted: false,
1880+
spans_extracted: false,
1881+
metrics: Metrics::default(),
1882+
extracted_metrics: ProcessingExtractedMetrics::new(),
1883+
config: self.inner.config.clone(),
19051884
project_info,
19061885
rate_limits,
19071886
sampling_project_info,
1908-
);
1887+
managed_envelope,
1888+
};
1889+
19091890
// The state is temporarily supplied, until it will be removed.
19101891
match self.$fn_name(&mut state, $($args),*) {
19111892
Ok(()) => Ok(ProcessingStateResult {
@@ -1926,14 +1907,16 @@ impl EnvelopeProcessorService {
19261907
relay_log::trace!("Processing {group} group", group = group.variant());
19271908

19281909
match group {
1929-
ProcessingGroup::Error => run!(process_errors),
1930-
ProcessingGroup::Transaction => run!(process_transactions, reservoir_counters),
1910+
ProcessingGroup::Error => run!(process_errors, project_id),
1911+
ProcessingGroup::Transaction => {
1912+
run!(process_transactions, project_id, reservoir_counters)
1913+
}
19311914
ProcessingGroup::Session => run!(process_sessions),
1932-
ProcessingGroup::Standalone => run!(process_standalone),
1915+
ProcessingGroup::Standalone => run!(process_standalone, project_id),
19331916
ProcessingGroup::ClientReport => run!(process_client_reports),
19341917
ProcessingGroup::Replay => run!(process_replays),
1935-
ProcessingGroup::CheckIn => run!(process_checkins),
1936-
ProcessingGroup::Span => run!(process_standalone_spans, reservoir_counters),
1918+
ProcessingGroup::CheckIn => run!(process_checkins, project_id),
1919+
ProcessingGroup::Span => run!(process_standalone_spans, project_id, reservoir_counters),
19371920
ProcessingGroup::ProfileChunk => run!(process_profile_chunks),
19381921
// Currently is not used.
19391922
ProcessingGroup::Metrics => {

relay-server/src/services/processor/dynamic_sampling.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ mod tests {
236236

237237
use bytes::Bytes;
238238
use relay_base_schema::events::EventType;
239-
use relay_base_schema::project::{ProjectId, ProjectKey};
239+
use relay_base_schema::project::ProjectKey;
240240
use relay_dynamic_config::{MetricExtractionConfig, TransactionMetricsConfig};
241241
use relay_event_schema::protocol::{EventId, LenientString};
242242
use relay_protocol::RuleCondition;
@@ -441,7 +441,6 @@ mod tests {
441441
project_info,
442442
rate_limits: Default::default(),
443443
sampling_project_info: None,
444-
project_id: ProjectId::new(42),
445444
managed_envelope: ManagedEnvelope::new(
446445
envelope,
447446
outcome_aggregator.clone(),
@@ -745,7 +744,6 @@ mod tests {
745744
}));
746745
Some(Arc::new(state))
747746
},
748-
project_id: ProjectId::new(1),
749747
managed_envelope: ManagedEnvelope::new(
750748
envelope,
751749
Addr::dummy(),

relay-server/src/services/processor/profile.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::net::IpAddr;
44
use relay_dynamic_config::{Feature, GlobalConfig};
55

66
use relay_base_schema::events::EventType;
7+
use relay_base_schema::project::ProjectId;
78
use relay_config::Config;
89
use relay_event_schema::protocol::{Contexts, Event, ProfileContext};
910
use relay_filter::ProjectFiltersConfig;
@@ -18,7 +19,7 @@ use crate::utils::ItemAction;
1819
/// Filters out invalid and duplicate profiles.
1920
///
2021
/// Returns the profile id of the single remaining profile, if there is one.
21-
pub fn filter<G>(state: &mut ProcessEnvelopeState<G>) -> Option<ProfileId> {
22+
pub fn filter<G>(state: &mut ProcessEnvelopeState<G>, project_id: ProjectId) -> Option<ProfileId> {
2223
let profiling_disabled = state.should_filter(Feature::Profiling);
2324
let has_transaction = state.event_type() == Some(EventType::Transaction);
2425
let keep_unsampled_profiles = true;
@@ -38,7 +39,7 @@ pub fn filter<G>(state: &mut ProcessEnvelopeState<G>) -> Option<ProfileId> {
3839
return ItemAction::DropSilently;
3940
}
4041

41-
match relay_profiling::parse_metadata(&item.payload(), state.project_id) {
42+
match relay_profiling::parse_metadata(&item.payload(), project_id) {
4243
Ok(id) => {
4344
profile_id = Some(id);
4445
ItemAction::Keep

0 commit comments

Comments
 (0)