Skip to content

Commit 6c8c84d

Browse files
authored
Merge pull request containerd#76 from liubin/add-metadata-for-ttrpc-context
api: add metadata for server and client
2 parents 79da032 + d1ea7a2 commit 6c8c84d

File tree

16 files changed

+228
-29
lines changed

16 files changed

+228
-29
lines changed

compiler/src/codegen.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ impl<'a> MethodGen<'a> {
212212
// Method signatures
213213
fn unary(&self, method_name: &str) -> String {
214214
format!(
215-
"{}(&self, req: &{}, timeout_nano: i64) -> {}<{}>",
215+
"{}(&self, ctx: ttrpc::context::Context, req: &{}) -> {}<{}>",
216216
method_name,
217217
self.input(),
218218
fq_grpc("Result"),
@@ -222,7 +222,7 @@ impl<'a> MethodGen<'a> {
222222

223223
fn unary_async(&self, method_name: &str) -> String {
224224
format!(
225-
"{}(&mut self, req: &{}, timeout_nano: i64) -> {}<{}>",
225+
"{}(&mut self, ctx: ttrpc::context::Context, req: &{}) -> {}<{}>",
226226
method_name,
227227
self.input(),
228228
fq_grpc("Result"),
@@ -314,7 +314,7 @@ impl<'a> MethodGen<'a> {
314314
self.output()
315315
));
316316
w.write_line(&format!(
317-
"::ttrpc::client_request!(self, req, timeout_nano, \"{}.{}\", \"{}\", cres);",
317+
"::ttrpc::client_request!(self, ctx, req, \"{}.{}\", \"{}\", cres);",
318318
self.package_name,
319319
self.service_name,
320320
&self.proto.get_name(),
@@ -335,7 +335,7 @@ impl<'a> MethodGen<'a> {
335335
pub_async_fn(w, &self.unary_async(&method_name), |w| {
336336
w.write_line(&format!("let mut cres = {}::new();", self.output()));
337337
w.write_line(&format!(
338-
"::ttrpc::async_client_request!(self, req, timeout_nano, \"{}.{}\", \"{}\", cres);",
338+
"::ttrpc::async_client_request!(self, ctx, req, \"{}.{}\", \"{}\", cres);",
339339
self.package_name,
340340
self.service_name,
341341
&self.proto.get_name(),

example/Cargo.lock

Lines changed: 1 addition & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

example/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,6 @@ path = "./async-client.rs"
4343
[build-dependencies]
4444
ttrpc-codegen = { path = "../ttrpc-codegen"}
4545

46+
[patch.crates-io]
47+
ttrpc-compiler = { path = "../compiler"}
48+

example/async-client.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod protocols;
77

88
use nix::sys::socket::*;
99
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc};
10+
use ttrpc::context::{self, Context};
1011
use ttrpc::r#async::Client;
1112

1213
#[tokio::main(flavor = "current_thread")]
@@ -44,7 +45,7 @@ async fn main() {
4445
println!(
4546
"Green Thread 1 - {} -> {:?} ended: {:?}",
4647
"health.check()",
47-
thc.check(&req, 0).await,
48+
thc.check(default_ctx(), &req).await,
4849
now.elapsed(),
4950
);
5051
});
@@ -57,7 +58,7 @@ async fn main() {
5758
);
5859

5960
let show = match tac
60-
.list_interfaces(&agent::ListInterfacesRequest::new(), 0)
61+
.list_interfaces(default_ctx(), &agent::ListInterfacesRequest::new())
6162
.await
6263
{
6364
Err(e) => format!("{:?}", e),
@@ -80,7 +81,7 @@ async fn main() {
8081
);
8182

8283
let show = match ac
83-
.online_cpu_mem(&agent::OnlineCPUMemRequest::new(), 0)
84+
.online_cpu_mem(default_ctx(), &agent::OnlineCPUMemRequest::new())
8485
.await
8586
{
8687
Err(e) => format!("{:?}", e),
@@ -101,10 +102,20 @@ async fn main() {
101102
println!(
102103
"Green Thread 3 - {} -> {:?} ended: {:?}",
103104
"health.version()",
104-
hc.version(&health::CheckRequest::new(), 0).await,
105+
hc.version(default_ctx(), &health::CheckRequest::new())
106+
.await,
105107
now.elapsed()
106108
);
107109
});
108110

109111
let _ = tokio::join!(t1, t2, t3);
110112
}
113+
114+
fn default_ctx() -> Context {
115+
let mut ctx = context::with_timeout(0);
116+
ctx.add("key-1".to_string(), "value-1-1".to_string());
117+
ctx.add("key-1".to_string(), "value-1-2".to_string());
118+
ctx.set("key-2".to_string(), vec!["value-2".to_string()]);
119+
120+
ctx
121+
}

example/async-server.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ impl health_ttrpc::Health for HealthService {
3939

4040
Err(Error::RpcStatus(status))
4141
}
42+
4243
async fn version(
4344
&self,
44-
_ctx: &::ttrpc::r#async::TtrpcContext,
45+
ctx: &::ttrpc::r#async::TtrpcContext,
4546
req: health::CheckRequest,
4647
) -> Result<health::VersionCheckResponse> {
4748
info!("version {:?}", req);
49+
info!("ctx {:?}", ctx);
4850
let mut rep = health::VersionCheckResponse::new();
4951
rep.agent_version = "mock.0.1".to_string();
5052
rep.grpc_version = "0.0.1".to_string();

example/build.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@ fn main() {
1717
"protocols/protos/oci.proto",
1818
];
1919

20-
// Tell Cargo that if the .proto files changed, to rerun this build script.
21-
protos
22-
.iter()
23-
.for_each(|p| println!("cargo:rerun-if-changed={}", &p));
24-
2520
Codegen::new()
2621
.out_dir("protocols/sync")
2722
.inputs(&protos)

example/client.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use nix::sys::socket::*;
1818
use protocols::sync::{agent, agent_ttrpc, health, health_ttrpc};
1919
use std::thread;
2020
use ttrpc::client::Client;
21+
use ttrpc::context::{self, Context};
2122

2223
fn main() {
2324
let path = "/tmp/1";
@@ -55,7 +56,7 @@ fn main() {
5556
"OS Thread {:?} - {} -> {:?} ended: {:?}",
5657
std::thread::current().id(),
5758
"health.check()",
58-
thc.check(&req, 0),
59+
thc.check(default_ctx(), &req),
5960
now.elapsed(),
6061
);
6162
});
@@ -68,7 +69,7 @@ fn main() {
6869
now.elapsed(),
6970
);
7071

71-
let show = match tac.list_interfaces(&agent::ListInterfacesRequest::new(), 0) {
72+
let show = match tac.list_interfaces(default_ctx(), &agent::ListInterfacesRequest::new()) {
7273
Err(e) => format!("{:?}", e),
7374
Ok(s) => format!("{:?}", s),
7475
};
@@ -87,7 +88,7 @@ fn main() {
8788
"agent.online_cpu_mem()",
8889
now.elapsed()
8990
);
90-
let show = match ac.online_cpu_mem(&agent::OnlineCPUMemRequest::new(), 0) {
91+
let show = match ac.online_cpu_mem(default_ctx(), &agent::OnlineCPUMemRequest::new()) {
9192
Err(e) => format!("{:?}", e),
9293
Ok(s) => format!("{:?}", s),
9394
};
@@ -108,10 +109,19 @@ fn main() {
108109
println!(
109110
"Main OS Thread - {} -> {:?} ended: {:?}",
110111
"health.version()",
111-
hc.version(&health::CheckRequest::new(), 0),
112+
hc.version(default_ctx(), &health::CheckRequest::new()),
112113
now.elapsed()
113114
);
114115

115116
t.join().unwrap();
116117
t2.join().unwrap();
117118
}
119+
120+
fn default_ctx() -> Context {
121+
let mut ctx = context::with_timeout(0);
122+
ctx.add("key-1".to_string(), "value-1-1".to_string());
123+
ctx.add("key-1".to_string(), "value-1-2".to_string());
124+
ctx.set("key-2".to_string(), vec!["value-2".to_string()]);
125+
126+
ctx
127+
}

example/server.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,14 @@ impl health_ttrpc::Health for HealthService {
3838
status.set_message("Just for fun".to_string());
3939
Err(Error::RpcStatus(status))
4040
}
41+
4142
fn version(
4243
&self,
43-
_ctx: &::ttrpc::TtrpcContext,
44+
ctx: &::ttrpc::TtrpcContext,
4445
req: health::CheckRequest,
4546
) -> Result<health::VersionCheckResponse> {
4647
info!("version {:?}", req);
48+
info!("ctx {:?}", ctx);
4749
let mut rep = health::VersionCheckResponse::new();
4850
rep.agent_version = "mock.0.1".to_string();
4951
rep.grpc_version = "0.0.1".to_string();
@@ -73,7 +75,6 @@ impl agent_ttrpc::AgentService for AgentService {
7375
i.set_Interfaces(rp);
7476

7577
Ok(i)
76-
//Err(::ttrpc::Error::RpcStatus(::ttrpc::get_Status(::ttrpc::Code::NOT_FOUND, "".to_string())))
7778
}
7879
}
7980

src/asynchronous/server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::sync::Arc;
1212
use crate::asynchronous::stream::{receive, respond, respond_with_status};
1313
use crate::asynchronous::unix_incoming::UnixIncoming;
1414
use crate::common::{self, Domain, MESSAGE_TYPE_REQUEST};
15+
use crate::context;
1516
use crate::error::{get_status, Error, Result};
1617
use crate::r#async::{MethodHandler, TtrpcContext};
1718
use crate::ttrpc::{Code, Request};
@@ -310,7 +311,7 @@ async fn handle_request(
310311
let path = format!("/{}/{}", req.service, req.method);
311312
if let Some(x) = methods.get(&path) {
312313
let method = x;
313-
let ctx = TtrpcContext { fd, mh: header };
314+
let ctx = TtrpcContext { fd, mh: header, metadata: context::from_pb(&req.metadata) };
314315

315316
match method.handler(ctx, req).await {
316317
Ok((stream_id, body)) => {

src/asynchronous/utils.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::error::{Error, Result};
88
use crate::ttrpc::{Request, Response};
99
use async_trait::async_trait;
1010
use protobuf::Message;
11+
use std::collections::HashMap;
1112
use std::os::unix::io::{FromRawFd, RawFd};
1213
use tokio::net::UnixStream;
1314

@@ -53,11 +54,13 @@ macro_rules! async_request_handler {
5354
/// Send request through async client.
5455
#[macro_export]
5556
macro_rules! async_client_request {
56-
($self: ident, $req: ident, $timeout_nano: ident, $server: expr, $method: expr, $cres: ident) => {
57+
($self: ident, $ctx: ident, $req: ident, $server: expr, $method: expr, $cres: ident) => {
5758
let mut creq = ::ttrpc::Request::new();
5859
creq.set_service($server.to_string());
5960
creq.set_method($method.to_string());
60-
creq.set_timeout_nano($timeout_nano);
61+
creq.set_timeout_nano($ctx.timeout_nano);
62+
let md = ::ttrpc::context::to_pb($ctx.metadata);
63+
creq.set_metadata(md);
6164
creq.payload.reserve($req.compute_size() as usize);
6265
{
6366
let mut s = CodedOutputStream::vec(&mut creq.payload);
@@ -87,6 +90,7 @@ pub trait MethodHandler {
8790
pub struct TtrpcContext {
8891
pub fd: std::os::unix::io::RawFd,
8992
pub mh: MessageHeader,
93+
pub metadata: HashMap<String, Vec<String>>,
9094
}
9195

9296
pub fn convert_response_to_buf(res: Response) -> Result<Vec<u8>> {

0 commit comments

Comments
 (0)