Skip to content

Commit efc3382

Browse files
committed
generated code
1 parent 3edb17c commit efc3382

File tree

12 files changed

+1058
-2
lines changed

12 files changed

+1058
-2
lines changed

Cargo.lock

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ members = [
111111
"lib/k8s-e2e-tests",
112112
"lib/k8s-test-framework",
113113
"lib/loki-logproto",
114+
"lib/my-vector-proto",
114115
"lib/portpicker",
115116
"lib/prometheus-parser",
116117
"lib/opentelemetry-proto",
@@ -761,6 +762,7 @@ sinks-logs = [
761762
"sinks-socket",
762763
"sinks-splunk_hec",
763764
"sinks-vector",
765+
"sinks-my_vector",
764766
"sinks-webhdfs",
765767
"sinks-websocket",
766768
"sinks-websocket-server",
@@ -779,6 +781,7 @@ sinks-metrics = [
779781
"sinks-sematext",
780782
"sinks-statsd",
781783
"sinks-vector",
784+
"sinks-my_vector",
782785
"sinks-splunk_hec"
783786
]
784787

@@ -833,6 +836,7 @@ sinks-splunk_hec = []
833836
sinks-statsd = ["sinks-utils-udp", "tokio-util/net"]
834837
sinks-utils-udp = []
835838
sinks-vector = ["sinks-utils-udp", "dep:tonic", "protobuf-build", "dep:prost"]
839+
sinks-my_vector = ["sinks-utils-udp", "dep:tonic", "protobuf-build", "dep:prost"]
836840
sinks-websocket = ["dep:tokio-tungstenite"]
837841
sinks-websocket-server = ["dep:tokio-tungstenite", "sources-utils-http-auth", "sources-utils-http-error", "sources-utils-http-prelude"]
838842
sinks-webhdfs = ["dep:opendal"]

lib/my-vector-proto/Cargo.toml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "my-vector-proto"
3+
version = "0.1.0"
4+
authors = ["Vector Contributors <vector@datadoghq.com>"]
5+
edition = "2021"
6+
publish = false
7+
8+
[build-dependencies]
9+
prost-build.workspace = true
10+
tonic-build.workspace = true
11+
12+
[dependencies]
13+
bytes = { version = "1.10.1", default-features = false, features = ["serde"] }
14+
chrono.workspace = true
15+
hex = { version = "0.4.3", default-features = false, features = ["std"] }
16+
lookup = { package = "vector-lookup", path = "../vector-lookup", default-features = false }
17+
ordered-float = { version = "4.6.0", default-features = false }
18+
prost .workspace = true
19+
tonic.workspace = true
20+
vrl.workspace = true
21+
vector-core = { path = "../vector-core", default-features = false }

lib/my-vector-proto/build.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
use std::io::Error;
2+
3+
fn main() -> Result<(), Error> {
4+
tonic_build::configure()
5+
.build_client(true)
6+
.build_server(true)
7+
.compile(
8+
&[
9+
"src/proto/kafkaproducerproxy/kafkaproducerproxy.proto",
10+
],
11+
&["src/proto/kafkaproducerproxy"],
12+
)?;
13+
14+
Ok(())
15+
}

lib/my-vector-proto/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub mod kafkaproducerproxy {
2+
tonic::include_proto!("kafkaproducerproxy");
3+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
syntax = "proto2";
2+
3+
package kafkaproducerproxy;
4+
5+
service KafkaProducerProxyService {
6+
7+
rpc produceMessage(KafkaMessage) returns (KafkaMessage.Response) {
8+
}
9+
10+
rpc produceMessages(KafkaMessages) returns (KafkaMessages.Response) {
11+
}
12+
13+
}
14+
15+
message KafkaMessage {
16+
optional string topic_name = 1;
17+
optional string key = 2;
18+
optional bytes data = 3;
19+
optional bytes log_entry = 4;
20+
21+
message Response {
22+
optional int32 error_code = 1;
23+
optional string error_msg = 2;
24+
}
25+
}
26+
27+
message KafkaMessages {
28+
repeated KafkaMessage messages = 1;
29+
30+
message Response {
31+
repeated KafkaMessage.Response responses = 1;
32+
optional bool all_succeed = 2;
33+
}
34+
}
35+

src/sinks/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ pub mod splunk_hec;
108108
pub mod statsd;
109109
#[cfg(feature = "sinks-vector")]
110110
pub mod vector;
111+
#[cfg(feature = "sinks-my_vector")]
112+
pub mod my_vector;
111113
#[cfg(feature = "sinks-webhdfs")]
112114
pub mod webhdfs;
113115
#[cfg(feature = "sinks-websocket")]
@@ -137,6 +139,14 @@ pub enum BuildError {
137139
/// Common healthcheck errors
138140
#[derive(Debug, Snafu)]
139141
pub enum HealthcheckError {
140-
#[snafu(display("Unexpected status: {}", status))]
141-
UnexpectedStatus { status: ::http::StatusCode },
142+
#[snafu(display("Unable to resolve DNS for {:?}", address))]
143+
DnsFailure { address: String },
144+
#[snafu(display("DNS errored {}", source))]
145+
DnsError { source: crate::dns::DnsError },
146+
#[snafu(display("Socket address problem: {}", source))]
147+
SocketAddressError { source: std::io::Error },
148+
#[snafu(display("URI parse error: {}", source))]
149+
UriParseError { source: ::http::uri::InvalidUri },
150+
#[snafu(display("HTTP request build error: {}", source))]
151+
HTTPRequestBuilderError { source: ::http::Error },
142152
}

src/sinks/my_vector/blocking_layer.rs

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
use std::{
2+
sync::{Arc, Mutex},
3+
time::{Duration, Instant},
4+
};
5+
use tower::{Layer, Service};
6+
use vector_lib::stream::DriverResponse;
7+
8+
/// A layer that blocks all requests for 30 seconds when any clone receives a response code 555
9+
#[derive(Clone)]
10+
pub struct BlockingLayer {
11+
blocked_until: Arc<Mutex<Option<Instant>>>,
12+
}
13+
14+
impl BlockingLayer {
15+
pub fn new() -> Self {
16+
Self {
17+
blocked_until: Arc::new(Mutex::new(None)),
18+
}
19+
}
20+
}
21+
22+
impl<S> Layer<S> for BlockingLayer {
23+
type Service = BlockingService<S>;
24+
25+
fn layer(&self, service: S) -> Self::Service {
26+
BlockingService {
27+
service,
28+
blocked_until: self.blocked_until.clone(),
29+
}
30+
}
31+
}
32+
33+
#[derive(Clone)]
34+
pub struct BlockingService<S> {
35+
service: S,
36+
blocked_until: Arc<Mutex<Option<Instant>>>,
37+
}
38+
39+
impl<S> BlockingService<S> {
40+
pub fn new(service: S) -> Self {
41+
Self {
42+
service,
43+
blocked_until: Arc::new(Mutex::new(None)),
44+
}
45+
}
46+
}
47+
48+
impl<S, Request> Service<Request> for BlockingService<S>
49+
where
50+
S: Service<Request> + Clone + Send + 'static,
51+
S::Future: Send + 'static,
52+
S::Response: DriverResponse + Send + 'static,
53+
{
54+
type Response = S::Response;
55+
type Error = S::Error;
56+
type Future = std::pin::Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;
57+
58+
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
59+
// Check if we're currently blocked
60+
if let Some(blocked_until) = *self.blocked_until.lock().unwrap() {
61+
if blocked_until > Instant::now() {
62+
// Still blocked, return NotReady
63+
return std::task::Poll::Pending;
64+
} else {
65+
// Block has expired, clear it
66+
*self.blocked_until.lock().unwrap() = None;
67+
}
68+
}
69+
70+
// Not blocked, check if inner service is ready
71+
self.service.poll_ready(cx)
72+
}
73+
74+
fn call(&mut self, req: Request) -> Self::Future {
75+
let mut service = self.service.clone();
76+
let blocked_until = self.blocked_until.clone();
77+
78+
Box::pin(async move {
79+
// Check if we're blocked before making the request
80+
if let Some(blocked_until) = *blocked_until.lock().unwrap() {
81+
if blocked_until > Instant::now() {
82+
// Calculate remaining time
83+
let remaining = blocked_until.duration_since(Instant::now());
84+
tokio::time::sleep(remaining).await;
85+
} else {
86+
// Block has expired, clear it
87+
*blocked_until.lock().unwrap() = None;
88+
}
89+
}
90+
91+
// Make the request
92+
let response = service.call(req).await?;
93+
94+
// Check if we got a 555 response
95+
if response.event_status().is_rejected() {
96+
// Set the block for all clones
97+
*blocked_until.lock().unwrap() = Some(Instant::now() + Duration::from_secs(30));
98+
}
99+
100+
Ok(response)
101+
})
102+
}
103+
}
104+
105+
#[cfg(test)]
106+
mod tests {
107+
use super::*;
108+
use std::time::Duration;
109+
use tower::ServiceBuilder;
110+
use vector_lib::stream::EventStatus;
111+
112+
struct MockResponse;
113+
impl DriverResponse for MockResponse {
114+
fn event_status(&self) -> EventStatus {
115+
EventStatus::Rejected
116+
}
117+
}
118+
119+
struct MockService;
120+
impl Service<()> for MockService {
121+
type Response = MockResponse;
122+
type Error = ();
123+
type Future = std::pin::Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;
124+
125+
fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
126+
std::task::Poll::Ready(Ok(()))
127+
}
128+
129+
fn call(&mut self, _: ()) -> Self::Future {
130+
Box::pin(async move { Ok(MockResponse) })
131+
}
132+
}
133+
134+
#[tokio::test]
135+
async fn test_blocking_layer() {
136+
let layer = BlockingLayer::new();
137+
let service = ServiceBuilder::new()
138+
.layer(layer)
139+
.service(MockService);
140+
141+
// First request should succeed but trigger blocking
142+
let response = service.clone().call(()).await.unwrap();
143+
assert!(response.event_status().is_rejected());
144+
145+
// Second request should be blocked
146+
let start = Instant::now();
147+
let response = service.clone().call(()).await.unwrap();
148+
let duration = start.elapsed();
149+
assert!(duration >= Duration::from_secs(30));
150+
assert!(response.event_status().is_rejected());
151+
}
152+
}

0 commit comments

Comments
 (0)