Skip to content

Commit 385bc56

Browse files
authored
ref(server): Introduces a new processing pipeline and implements it for logs (#4777)
Introduces a new processing pipeline and implements it for logs. All of the log processing logic has been moved to the new pipeline. A `Processor` is designed to build its unit of work from multiple sources, for now it only supports a `ManagedEnvelope` as this is what we do today. In the future it's possible to imagine a custom transport between Relays and processor will be able to process from different formats. Similar to the input, the output is now also an additional layer which can be serialized into multiple formats, for now this only supports `ManagedEnvelope` again, but in the future, an additional serialization will be introduced to serialize to a format which the `StoreService` (Kafka) expects. This finally means we no longer need to needlessly `serialize->deserialize->serialize` certain items for Kafka. Rate limiting is now no longer envelope based, which makes it much much easier, but has the downside that envelope based rate limiting is still currently necessary for the fast path. We might want to explore more in this direction in the future. The processor also forces a correct outcome handling, through the type system (usage of `Rejected<>`), which is much easier to use. As well as just runtime, debug validations of outcome counts. For every change in items must be accounted for. The processing code itself is structured to include more strongly typed containers, instead of an untyped envelope with a lot of implicit state. Processing is now longer conditionally compiled, ideally all of this should be hidden behind either normal `if` statements or a facade which conditionally does nothing or directs to somewhere else (like the new rate limiter). This means we can maybe finally reduce the cancer which is `cfg(feature = "processing")`, which is only really useful for Kafka and Redis dependencies. Refs: INGEST-395
1 parent 8f250ce commit 385bc56

File tree

22 files changed

+1501
-334
lines changed

22 files changed

+1501
-334
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
**Internal**:
1111

12+
- Introduces a new processing pipeline and implements it for logs. ([#4777](https://github.com/getsentry/relay/pull/4777))
1213
- Produce spans to the items topic. ([#4735](https://github.com/getsentry/relay/pull/4735))
1314
- Update opentelemetry-proto and sentry-protos dependencies. ([#4847](https://github.com/getsentry/relay/pull/4847))
1415
- Take into account more types of tokens when doing AI cost calculation. ([#4840](https://github.com/getsentry/relay/pull/4840))

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

relay-ourlogs/src/ourlog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ pub fn otel_to_sentry_log(otel_log: OtelLog, received_at: DateTime<Utc>) -> Resu
109109
Ok(ourlog)
110110
}
111111

112-
/// This fills attributes with OTel specific fields to be compatible with the otel schema.
112+
/// This fills attributes with OTel specific fields to be compatible with the OTel schema.
113113
pub fn ourlog_merge_otel(ourlog: &mut Annotated<OurLog>, received_at: DateTime<Utc>) {
114114
let Some(ourlog_value) = ourlog.value_mut() else {
115115
return;

relay-quotas/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@ redis = ["dep:thiserror", "dep:relay-log", "relay-redis/impl"]
1717
workspace = true
1818

1919
[dependencies]
20+
async-trait = { workspace = true }
2021
hashbrown = { workspace = true }
22+
itertools = { workspace = true }
2123
relay-base-schema = { workspace = true }
2224
relay-common = { workspace = true }
2325
relay-log = { workspace = true, optional = true }
2426
relay-redis = { workspace = true, optional = true }
2527
serde = { workspace = true }
2628
smallvec = { workspace = true }
2729
thiserror = { workspace = true, optional = true }
28-
itertools = { workspace = true }
2930
tokio = { workspace = true, features = ["sync"] }
3031

3132
[dev-dependencies]

relay-quotas/src/rate_limit.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,14 @@ impl RateLimits {
299299
}
300300
}
301301

302+
/// Merges all limits from another [`RateLimits`] with this one.
303+
///
304+
/// See also: [`Self::merge`].
305+
pub fn merge_with(mut self, other: Self) -> Self {
306+
self.merge(other);
307+
self
308+
}
309+
302310
/// Returns `true` if this instance contains no active limits.
303311
///
304312
/// This is the opposite of [`is_limited`](Self::is_limited).
@@ -468,6 +476,10 @@ impl CachedRateLimits {
468476
///
469477
/// This is a thread-safe wrapper around [`RateLimits::merge`].
470478
pub fn merge(&self, limits: RateLimits) {
479+
if limits.is_empty() {
480+
return;
481+
}
482+
471483
let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner);
472484
let current = Arc::make_mut(&mut inner);
473485
for limit in limits {

relay-quotas/src/redis.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ impl std::ops::Deref for RedisQuota<'_> {
225225
/// attachments. For more information on quota parameters, see [`Quota`].
226226
///
227227
/// Requires the `redis` feature.
228+
#[derive(Clone)]
228229
pub struct RedisRateLimiter<T> {
229230
client: AsyncRedisClient,
230231
script: &'static Script,

relay-server/src/envelope/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,13 @@ impl EnvelopeHeaders<PartialMeta> {
163163
}
164164
}
165165

166+
impl<M> EnvelopeHeaders<M> {
167+
/// Returns a reference to the contained meta.
168+
pub fn meta(&self) -> &M {
169+
&self.meta
170+
}
171+
}
172+
166173
#[doc(hidden)]
167174
#[derive(Clone, Debug)]
168175
pub struct Envelope {

relay-server/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ mod http;
261261
mod metrics;
262262
mod metrics_extraction;
263263
mod middlewares;
264+
mod processing;
264265
mod service;
265266
mod services;
266267
mod statsd;
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use relay_event_schema::protocol::OurLog;
2+
use relay_protocol::Annotated;
3+
use relay_quotas::DataCategory;
4+
use smallvec::SmallVec;
5+
6+
use crate::Envelope;
7+
use crate::envelope::Item;
8+
use crate::utils::EnvelopeSummary;
9+
10+
/// A list of data categories and amounts.
11+
pub type Quantities = SmallVec<[(DataCategory, usize); 1]>;
12+
13+
/// A counted item.
14+
///
15+
/// An item may represent multiple categories with different counts at once.
16+
pub trait Counted {
17+
/// Returns the contained item quantities.
18+
///
19+
/// Implementation are expected to be pure.
20+
fn quantities(&self) -> Quantities;
21+
}
22+
23+
impl Counted for () {
24+
fn quantities(&self) -> Quantities {
25+
Quantities::new()
26+
}
27+
}
28+
29+
impl Counted for Item {
30+
fn quantities(&self) -> Quantities {
31+
self.quantities()
32+
}
33+
}
34+
35+
impl Counted for Box<Envelope> {
36+
fn quantities(&self) -> Quantities {
37+
let mut quantities = Quantities::new();
38+
39+
// This matches the implementation of `ManagedEnvelope::reject`.
40+
let summary = EnvelopeSummary::compute(self);
41+
if let Some(category) = summary.event_category {
42+
quantities.push((category, 1));
43+
if let Some(category) = category.index_category() {
44+
quantities.push((category, 1));
45+
}
46+
}
47+
48+
let data = [
49+
(DataCategory::Attachment, summary.attachment_quantity),
50+
(DataCategory::Profile, summary.profile_quantity),
51+
(DataCategory::ProfileIndexed, summary.profile_quantity),
52+
(DataCategory::Span, summary.span_quantity),
53+
(DataCategory::SpanIndexed, summary.span_quantity),
54+
(
55+
DataCategory::Transaction,
56+
summary.secondary_transaction_quantity,
57+
),
58+
(DataCategory::Span, summary.secondary_span_quantity),
59+
(DataCategory::Replay, summary.replay_quantity),
60+
(DataCategory::ProfileChunk, summary.profile_chunk_quantity),
61+
(
62+
DataCategory::ProfileChunkUi,
63+
summary.profile_chunk_ui_quantity,
64+
),
65+
(DataCategory::LogItem, summary.log_item_quantity),
66+
(DataCategory::LogByte, summary.log_byte_quantity),
67+
];
68+
69+
for (category, quantity) in data {
70+
if quantity > 0 {
71+
quantities.push((category, quantity));
72+
}
73+
}
74+
75+
quantities
76+
}
77+
}
78+
79+
impl Counted for Annotated<OurLog> {
80+
fn quantities(&self) -> Quantities {
81+
smallvec::smallvec![
82+
(DataCategory::LogItem, 1),
83+
(
84+
DataCategory::LogByte,
85+
crate::processing::logs::DUMMY_LOG_SIZE
86+
)
87+
]
88+
}
89+
}
90+
91+
impl<T> Counted for &T
92+
where
93+
T: Counted,
94+
{
95+
fn quantities(&self) -> Quantities {
96+
(*self).quantities()
97+
}
98+
}

0 commit comments

Comments
 (0)