Skip to content

Commit faf7b6b

Browse files
committed
api: add metadata for ttrpc call signature
This is a broken change, users should pass additional parameter metadata to ttrpc call for passing context between server and client. The type of metadata is Option<HashMap<String, Vec<String>>>. The metadata acts as context in Golang. See also: https://github.com/containerd/ttrpc/blob/07203d0ded4300d2dc3749aa82ec32e7f9fd4369/example/cmd/main.go#L116 Signed-off-by: bin <bin@hyper.sh>
1 parent f7d7708 commit faf7b6b

File tree

12 files changed

+94
-31
lines changed

12 files changed

+94
-31
lines changed

compiler/src/codegen.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ impl<'a> MethodGen<'a> {
212212
// Method signatures
213213
fn unary(&self, method_name: &str) -> String {
214214
format!(
215-
"{}(&self, req: &{}, timeout_nano: i64) -> {}<{}>",
215+
"{}(&self, req: &{}, metadata: Option<HashMap<String, Vec<String>>>, timeout_nano: i64) -> {}<{}>",
216216
method_name,
217217
self.input(),
218218
fq_grpc("Result"),
@@ -222,7 +222,7 @@ impl<'a> MethodGen<'a> {
222222

223223
fn unary_async(&self, method_name: &str) -> String {
224224
format!(
225-
"{}(&mut self, req: &{}, timeout_nano: i64) -> {}<{}>",
225+
"{}(&mut self, req: &{}, metadata: Option<HashMap<String, Vec<String>>>, timeout_nano: i64) -> {}<{}>",
226226
method_name,
227227
self.input(),
228228
fq_grpc("Result"),
@@ -314,7 +314,7 @@ impl<'a> MethodGen<'a> {
314314
self.output()
315315
));
316316
w.write_line(&format!(
317-
"::ttrpc::client_request!(self, req, timeout_nano, \"{}.{}\", \"{}\", cres);",
317+
"::ttrpc::client_request!(self, req, metadata, timeout_nano, \"{}.{}\", \"{}\", cres);",
318318
self.package_name,
319319
self.service_name,
320320
&self.proto.get_name(),
@@ -335,7 +335,7 @@ impl<'a> MethodGen<'a> {
335335
pub_async_fn(w, &self.unary_async(&method_name), |w| {
336336
w.write_line(&format!("let mut cres = {}::new();", self.output()));
337337
w.write_line(&format!(
338-
"::ttrpc::async_client_request!(self, req, timeout_nano, \"{}.{}\", \"{}\", cres);",
338+
"::ttrpc::async_client_request!(self, req, metadata, timeout_nano, \"{}.{}\", \"{}\", cres);",
339339
self.package_name,
340340
self.service_name,
341341
&self.proto.get_name(),

example/Cargo.lock

Lines changed: 1 addition & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

example/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,6 @@ path = "./async-client.rs"
4343
[build-dependencies]
4444
ttrpc-codegen = { path = "../ttrpc-codegen"}
4545

46+
[patch.crates-io]
47+
ttrpc-compiler = { path = "../compiler"}
48+

example/async-client.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod protocols;
77

88
use nix::sys::socket::*;
99
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc};
10+
use std::collections::HashMap;
1011
use ttrpc::r#async::Client;
1112

1213
#[tokio::main(flavor = "current_thread")]
@@ -44,7 +45,7 @@ async fn main() {
4445
println!(
4546
"Green Thread 1 - {} -> {:?} ended: {:?}",
4647
"health.check()",
47-
thc.check(&req, 0).await,
48+
thc.check(&req, default_metadata(), 0).await,
4849
now.elapsed(),
4950
);
5051
});
@@ -57,7 +58,7 @@ async fn main() {
5758
);
5859

5960
let show = match tac
60-
.list_interfaces(&agent::ListInterfacesRequest::new(), 0)
61+
.list_interfaces(&agent::ListInterfacesRequest::new(), default_metadata(), 0)
6162
.await
6263
{
6364
Err(e) => format!("{:?}", e),
@@ -80,7 +81,7 @@ async fn main() {
8081
);
8182

8283
let show = match ac
83-
.online_cpu_mem(&agent::OnlineCPUMemRequest::new(), 0)
84+
.online_cpu_mem(&agent::OnlineCPUMemRequest::new(), None, 0)
8485
.await
8586
{
8687
Err(e) => format!("{:?}", e),
@@ -101,10 +102,17 @@ async fn main() {
101102
println!(
102103
"Green Thread 3 - {} -> {:?} ended: {:?}",
103104
"health.version()",
104-
hc.version(&health::CheckRequest::new(), 0).await,
105+
hc.version(&health::CheckRequest::new(), default_metadata(), 0)
106+
.await,
105107
now.elapsed()
106108
);
107109
});
108110

109111
let _ = tokio::join!(t1, t2, t3);
110112
}
113+
114+
fn default_metadata() -> Option<HashMap<String, Vec<String>>> {
115+
let mut md: HashMap<String, Vec<String>> = HashMap::new();
116+
md.insert("key".to_string(), vec!["v1".to_string(), "v2".to_string()]);
117+
Some(md)
118+
}

example/async-server.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ impl health_ttrpc::Health for HealthService {
3939

4040
Err(Error::RpcStatus(status))
4141
}
42+
4243
async fn version(
4344
&self,
44-
_ctx: &::ttrpc::r#async::TtrpcContext,
45+
ctx: &::ttrpc::r#async::TtrpcContext,
4546
req: health::CheckRequest,
4647
) -> Result<health::VersionCheckResponse> {
4748
info!("version {:?}", req);
49+
info!("ctx {:?}", ctx);
4850
let mut rep = health::VersionCheckResponse::new();
4951
rep.agent_version = "mock.0.1".to_string();
5052
rep.grpc_version = "0.0.1".to_string();

example/build.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@ fn main() {
1717
"protocols/protos/oci.proto",
1818
];
1919

20-
// Tell Cargo that if the .proto files changed, to rerun this build script.
21-
protos
22-
.iter()
23-
.for_each(|p| println!("cargo:rerun-if-changed={}", &p));
24-
2520
Codegen::new()
2621
.out_dir("protocols/sync")
2722
.inputs(&protos)

example/client.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ mod protocols;
1616

1717
use nix::sys::socket::*;
1818
use protocols::sync::{agent, agent_ttrpc, health, health_ttrpc};
19+
use std::collections::HashMap;
1920
use std::thread;
2021
use ttrpc::client::Client;
2122

@@ -55,7 +56,7 @@ fn main() {
5556
"OS Thread {:?} - {} -> {:?} ended: {:?}",
5657
std::thread::current().id(),
5758
"health.check()",
58-
thc.check(&req, 0),
59+
thc.check(&req, default_metadata(), 0),
5960
now.elapsed(),
6061
);
6162
});
@@ -68,7 +69,11 @@ fn main() {
6869
now.elapsed(),
6970
);
7071

71-
let show = match tac.list_interfaces(&agent::ListInterfacesRequest::new(), 0) {
72+
let show = match tac.list_interfaces(
73+
&agent::ListInterfacesRequest::new(),
74+
default_metadata(),
75+
0,
76+
) {
7277
Err(e) => format!("{:?}", e),
7378
Ok(s) => format!("{:?}", s),
7479
};
@@ -87,7 +92,7 @@ fn main() {
8792
"agent.online_cpu_mem()",
8893
now.elapsed()
8994
);
90-
let show = match ac.online_cpu_mem(&agent::OnlineCPUMemRequest::new(), 0) {
95+
let show = match ac.online_cpu_mem(&agent::OnlineCPUMemRequest::new(), None, 0) {
9196
Err(e) => format!("{:?}", e),
9297
Ok(s) => format!("{:?}", s),
9398
};
@@ -108,10 +113,16 @@ fn main() {
108113
println!(
109114
"Main OS Thread - {} -> {:?} ended: {:?}",
110115
"health.version()",
111-
hc.version(&health::CheckRequest::new(), 0),
116+
hc.version(&health::CheckRequest::new(), default_metadata(), 0),
112117
now.elapsed()
113118
);
114119

115120
t.join().unwrap();
116121
t2.join().unwrap();
117122
}
123+
124+
fn default_metadata() -> Option<HashMap<String, Vec<String>>> {
125+
let mut md: HashMap<String, Vec<String>> = HashMap::new();
126+
md.insert("key".to_string(), vec!["v1".to_string(), "v2".to_string()]);
127+
Some(md)
128+
}

example/server.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,14 @@ impl health_ttrpc::Health for HealthService {
3838
status.set_message("Just for fun".to_string());
3939
Err(Error::RpcStatus(status))
4040
}
41+
4142
fn version(
4243
&self,
43-
_ctx: &::ttrpc::TtrpcContext,
44+
ctx: &::ttrpc::TtrpcContext,
4445
req: health::CheckRequest,
4546
) -> Result<health::VersionCheckResponse> {
4647
info!("version {:?}", req);
48+
info!("ctx {:?}", ctx);
4749
let mut rep = health::VersionCheckResponse::new();
4850
rep.agent_version = "mock.0.1".to_string();
4951
rep.grpc_version = "0.0.1".to_string();
@@ -73,7 +75,6 @@ impl agent_ttrpc::AgentService for AgentService {
7375
i.set_Interfaces(rp);
7476

7577
Ok(i)
76-
//Err(::ttrpc::Error::RpcStatus(::ttrpc::get_Status(::ttrpc::Code::NOT_FOUND, "".to_string())))
7778
}
7879
}
7980

src/asynchronous/utils.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,13 @@ macro_rules! async_request_handler {
5454
/// Send request through async client.
5555
#[macro_export]
5656
macro_rules! async_client_request {
57-
($self: ident, $req: ident, $timeout_nano: ident, $server: expr, $method: expr, $cres: ident) => {
57+
($self: ident, $req: ident, $metadata: ident, $timeout_nano: ident, $server: expr, $method: expr, $cres: ident) => {
5858
let mut creq = ::ttrpc::Request::new();
5959
creq.set_service($server.to_string());
6060
creq.set_method($method.to_string());
6161
creq.set_timeout_nano($timeout_nano);
62+
let md = ::ttrpc::common::convert_metadata(&$metadata);
63+
creq.set_metadata(md);
6264
creq.payload.reserve($req.compute_size() as usize);
6365
{
6466
let mut s = CodedOutputStream::vec(&mut creq.payload);

src/common.rs

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,22 +154,47 @@ pub fn parse_metadata(kvs: &protobuf::RepeatedField<KeyValue>) -> HashMap<String
154154
meta
155155
}
156156

157+
pub fn convert_metadata(
158+
kvs: &Option<HashMap<String, Vec<String>>>,
159+
) -> protobuf::RepeatedField<KeyValue> {
160+
let mut meta: protobuf::RepeatedField<KeyValue> = protobuf::RepeatedField::default();
161+
match kvs {
162+
Some(kvs) => {
163+
for (k, vl) in kvs {
164+
for v in vl {
165+
let key = KeyValue {
166+
key: k.clone(),
167+
value: v.clone(),
168+
..Default::default()
169+
};
170+
meta.push(key);
171+
}
172+
}
173+
}
174+
None => {}
175+
}
176+
meta
177+
}
178+
157179
#[cfg(test)]
158180
mod tests {
159-
use super::parse_metadata;
181+
use super::{convert_metadata, parse_metadata};
160182
use crate::ttrpc::KeyValue;
161183

162184
#[test]
163-
fn test_parse_metadata() {
185+
fn test_metadata() {
186+
// RepeatedField -> HashMap, test parse_metadata()
164187
let mut src: protobuf::RepeatedField<KeyValue> = protobuf::RepeatedField::default();
165188
for i in &[
166189
("key1", "value1-1"),
167190
("key1", "value1-2"),
168191
("key2", "value2"),
169192
] {
170-
let mut key = KeyValue::default();
171-
key.key = i.0.to_string();
172-
key.value = i.1.to_string();
193+
let key = KeyValue {
194+
key: i.0.to_string(),
195+
value: i.1.to_string(),
196+
..Default::default()
197+
};
173198
src.push(key);
174199
}
175200

@@ -182,5 +207,21 @@ mod tests {
182207
);
183208
assert_eq!(dst.get("key2"), Some(&vec!["value2".to_string()]));
184209
assert_eq!(dst.get("key3"), None);
210+
211+
// HashMap -> RepeatedField , test convert_metadata()
212+
let src = convert_metadata(&Some(dst));
213+
let mut kvs = src.into_vec();
214+
kvs.sort_by(|a, b| a.key.partial_cmp(&b.key).unwrap());
215+
216+
assert_eq!(kvs.len(), 3);
217+
218+
assert_eq!(kvs[0].key, "key1");
219+
assert_eq!(kvs[0].value, "value1-1");
220+
221+
assert_eq!(kvs[1].key, "key1");
222+
assert_eq!(kvs[1].value, "value1-2");
223+
224+
assert_eq!(kvs[2].key, "key2");
225+
assert_eq!(kvs[2].value, "value2");
185226
}
186227
}

0 commit comments

Comments
 (0)