Skip to content

Commit d1ea7a2

Browse files
committed
client: use a new Context struct wrapping metadata and timeout info
This will reduce parameters count in rpc functions. Signed-off-by: bin <bin@hyper.sh>
1 parent de58a49 commit d1ea7a2

File tree

10 files changed

+200
-120
lines changed

10 files changed

+200
-120
lines changed

compiler/src/codegen.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ impl<'a> MethodGen<'a> {
212212
// Method signatures
213213
fn unary(&self, method_name: &str) -> String {
214214
format!(
215-
"{}(&self, req: &{}, metadata: Option<HashMap<String, Vec<String>>>, timeout_nano: i64) -> {}<{}>",
215+
"{}(&self, ctx: ttrpc::context::Context, req: &{}) -> {}<{}>",
216216
method_name,
217217
self.input(),
218218
fq_grpc("Result"),
@@ -222,7 +222,7 @@ impl<'a> MethodGen<'a> {
222222

223223
fn unary_async(&self, method_name: &str) -> String {
224224
format!(
225-
"{}(&mut self, req: &{}, metadata: Option<HashMap<String, Vec<String>>>, timeout_nano: i64) -> {}<{}>",
225+
"{}(&mut self, ctx: ttrpc::context::Context, req: &{}) -> {}<{}>",
226226
method_name,
227227
self.input(),
228228
fq_grpc("Result"),
@@ -314,7 +314,7 @@ impl<'a> MethodGen<'a> {
314314
self.output()
315315
));
316316
w.write_line(&format!(
317-
"::ttrpc::client_request!(self, req, metadata, timeout_nano, \"{}.{}\", \"{}\", cres);",
317+
"::ttrpc::client_request!(self, ctx, req, \"{}.{}\", \"{}\", cres);",
318318
self.package_name,
319319
self.service_name,
320320
&self.proto.get_name(),
@@ -335,7 +335,7 @@ impl<'a> MethodGen<'a> {
335335
pub_async_fn(w, &self.unary_async(&method_name), |w| {
336336
w.write_line(&format!("let mut cres = {}::new();", self.output()));
337337
w.write_line(&format!(
338-
"::ttrpc::async_client_request!(self, req, metadata, timeout_nano, \"{}.{}\", \"{}\", cres);",
338+
"::ttrpc::async_client_request!(self, ctx, req, \"{}.{}\", \"{}\", cres);",
339339
self.package_name,
340340
self.service_name,
341341
&self.proto.get_name(),

example/async-client.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ mod protocols;
77

88
use nix::sys::socket::*;
99
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc};
10-
use std::collections::HashMap;
10+
use ttrpc::context::{self, Context};
1111
use ttrpc::r#async::Client;
1212

1313
#[tokio::main(flavor = "current_thread")]
@@ -45,7 +45,7 @@ async fn main() {
4545
println!(
4646
"Green Thread 1 - {} -> {:?} ended: {:?}",
4747
"health.check()",
48-
thc.check(&req, default_metadata(), 0).await,
48+
thc.check(default_ctx(), &req).await,
4949
now.elapsed(),
5050
);
5151
});
@@ -58,7 +58,7 @@ async fn main() {
5858
);
5959

6060
let show = match tac
61-
.list_interfaces(&agent::ListInterfacesRequest::new(), default_metadata(), 0)
61+
.list_interfaces(default_ctx(), &agent::ListInterfacesRequest::new())
6262
.await
6363
{
6464
Err(e) => format!("{:?}", e),
@@ -81,7 +81,7 @@ async fn main() {
8181
);
8282

8383
let show = match ac
84-
.online_cpu_mem(&agent::OnlineCPUMemRequest::new(), None, 0)
84+
.online_cpu_mem(default_ctx(), &agent::OnlineCPUMemRequest::new())
8585
.await
8686
{
8787
Err(e) => format!("{:?}", e),
@@ -102,7 +102,7 @@ async fn main() {
102102
println!(
103103
"Green Thread 3 - {} -> {:?} ended: {:?}",
104104
"health.version()",
105-
hc.version(&health::CheckRequest::new(), default_metadata(), 0)
105+
hc.version(default_ctx(), &health::CheckRequest::new())
106106
.await,
107107
now.elapsed()
108108
);
@@ -111,8 +111,11 @@ async fn main() {
111111
let _ = tokio::join!(t1, t2, t3);
112112
}
113113

114-
fn default_metadata() -> Option<HashMap<String, Vec<String>>> {
115-
let mut md: HashMap<String, Vec<String>> = HashMap::new();
116-
md.insert("key".to_string(), vec!["v1".to_string(), "v2".to_string()]);
117-
Some(md)
114+
fn default_ctx() -> Context {
115+
let mut ctx = context::with_timeout(0);
116+
ctx.add("key-1".to_string(), "value-1-1".to_string());
117+
ctx.add("key-1".to_string(), "value-1-2".to_string());
118+
ctx.set("key-2".to_string(), vec!["value-2".to_string()]);
119+
120+
ctx
118121
}

example/client.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ mod protocols;
1616

1717
use nix::sys::socket::*;
1818
use protocols::sync::{agent, agent_ttrpc, health, health_ttrpc};
19-
use std::collections::HashMap;
2019
use std::thread;
2120
use ttrpc::client::Client;
21+
use ttrpc::context::{self, Context};
2222

2323
fn main() {
2424
let path = "/tmp/1";
@@ -56,7 +56,7 @@ fn main() {
5656
"OS Thread {:?} - {} -> {:?} ended: {:?}",
5757
std::thread::current().id(),
5858
"health.check()",
59-
thc.check(&req, default_metadata(), 0),
59+
thc.check(default_ctx(), &req),
6060
now.elapsed(),
6161
);
6262
});
@@ -69,11 +69,7 @@ fn main() {
6969
now.elapsed(),
7070
);
7171

72-
let show = match tac.list_interfaces(
73-
&agent::ListInterfacesRequest::new(),
74-
default_metadata(),
75-
0,
76-
) {
72+
let show = match tac.list_interfaces(default_ctx(), &agent::ListInterfacesRequest::new()) {
7773
Err(e) => format!("{:?}", e),
7874
Ok(s) => format!("{:?}", s),
7975
};
@@ -92,7 +88,7 @@ fn main() {
9288
"agent.online_cpu_mem()",
9389
now.elapsed()
9490
);
95-
let show = match ac.online_cpu_mem(&agent::OnlineCPUMemRequest::new(), None, 0) {
91+
let show = match ac.online_cpu_mem(default_ctx(), &agent::OnlineCPUMemRequest::new()) {
9692
Err(e) => format!("{:?}", e),
9793
Ok(s) => format!("{:?}", s),
9894
};
@@ -113,16 +109,19 @@ fn main() {
113109
println!(
114110
"Main OS Thread - {} -> {:?} ended: {:?}",
115111
"health.version()",
116-
hc.version(&health::CheckRequest::new(), default_metadata(), 0),
112+
hc.version(default_ctx(), &health::CheckRequest::new()),
117113
now.elapsed()
118114
);
119115

120116
t.join().unwrap();
121117
t2.join().unwrap();
122118
}
123119

124-
fn default_metadata() -> Option<HashMap<String, Vec<String>>> {
125-
let mut md: HashMap<String, Vec<String>> = HashMap::new();
126-
md.insert("key".to_string(), vec!["v1".to_string(), "v2".to_string()]);
127-
Some(md)
120+
fn default_ctx() -> Context {
121+
let mut ctx = context::with_timeout(0);
122+
ctx.add("key-1".to_string(), "value-1-1".to_string());
123+
ctx.add("key-1".to_string(), "value-1-2".to_string());
124+
ctx.set("key-2".to_string(), vec!["value-2".to_string()]);
125+
126+
ctx
128127
}

src/asynchronous/server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::sync::Arc;
1212
use crate::asynchronous::stream::{receive, respond, respond_with_status};
1313
use crate::asynchronous::unix_incoming::UnixIncoming;
1414
use crate::common::{self, Domain, MESSAGE_TYPE_REQUEST};
15+
use crate::context;
1516
use crate::error::{get_status, Error, Result};
1617
use crate::r#async::{MethodHandler, TtrpcContext};
1718
use crate::ttrpc::{Code, Request};
@@ -310,7 +311,7 @@ async fn handle_request(
310311
let path = format!("/{}/{}", req.service, req.method);
311312
if let Some(x) = methods.get(&path) {
312313
let method = x;
313-
let ctx = TtrpcContext { fd, mh: header, metadata: common::parse_metadata(&req.metadata) };
314+
let ctx = TtrpcContext { fd, mh: header, metadata: context::from_pb(&req.metadata) };
314315

315316
match method.handler(ctx, req).await {
316317
Ok((stream_id, body)) => {

src/asynchronous/utils.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@ macro_rules! async_request_handler {
5454
/// Send request through async client.
5555
#[macro_export]
5656
macro_rules! async_client_request {
57-
($self: ident, $req: ident, $metadata: ident, $timeout_nano: ident, $server: expr, $method: expr, $cres: ident) => {
57+
($self: ident, $ctx: ident, $req: ident, $server: expr, $method: expr, $cres: ident) => {
5858
let mut creq = ::ttrpc::Request::new();
5959
creq.set_service($server.to_string());
6060
creq.set_method($method.to_string());
61-
creq.set_timeout_nano($timeout_nano);
62-
let md = ::ttrpc::common::convert_metadata(&$metadata);
61+
creq.set_timeout_nano($ctx.timeout_nano);
62+
let md = ::ttrpc::context::to_pb($ctx.metadata);
6363
creq.set_metadata(md);
6464
creq.payload.reserve($req.compute_size() as usize);
6565
{

src/common.rs

Lines changed: 0 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,8 @@
88
#![allow(unused_macros)]
99

1010
use crate::error::{Error, Result};
11-
use crate::ttrpc::KeyValue;
1211
use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag};
1312
use nix::sys::socket::*;
14-
use std::collections::HashMap;
1513
use std::os::unix::io::RawFd;
1614
use std::str::FromStr;
1715

@@ -141,87 +139,3 @@ pub const MESSAGE_LENGTH_MAX: usize = 4 << 20;
141139

142140
pub const MESSAGE_TYPE_REQUEST: u8 = 0x1;
143141
pub const MESSAGE_TYPE_RESPONSE: u8 = 0x2;
144-
145-
pub fn parse_metadata(kvs: &protobuf::RepeatedField<KeyValue>) -> HashMap<String, Vec<String>> {
146-
let mut meta: HashMap<String, Vec<String>> = HashMap::new();
147-
for kv in kvs {
148-
if let Some(ref mut vl) = meta.get_mut(&kv.key) {
149-
vl.push(kv.value.clone());
150-
} else {
151-
meta.insert(kv.key.clone(), vec![kv.value.clone()]);
152-
}
153-
}
154-
meta
155-
}
156-
157-
pub fn convert_metadata(
158-
kvs: &Option<HashMap<String, Vec<String>>>,
159-
) -> protobuf::RepeatedField<KeyValue> {
160-
let mut meta: protobuf::RepeatedField<KeyValue> = protobuf::RepeatedField::default();
161-
match kvs {
162-
Some(kvs) => {
163-
for (k, vl) in kvs {
164-
for v in vl {
165-
let key = KeyValue {
166-
key: k.clone(),
167-
value: v.clone(),
168-
..Default::default()
169-
};
170-
meta.push(key);
171-
}
172-
}
173-
}
174-
None => {}
175-
}
176-
meta
177-
}
178-
179-
#[cfg(test)]
180-
mod tests {
181-
use super::{convert_metadata, parse_metadata};
182-
use crate::ttrpc::KeyValue;
183-
184-
#[test]
185-
fn test_metadata() {
186-
// RepeatedField -> HashMap, test parse_metadata()
187-
let mut src: protobuf::RepeatedField<KeyValue> = protobuf::RepeatedField::default();
188-
for i in &[
189-
("key1", "value1-1"),
190-
("key1", "value1-2"),
191-
("key2", "value2"),
192-
] {
193-
let key = KeyValue {
194-
key: i.0.to_string(),
195-
value: i.1.to_string(),
196-
..Default::default()
197-
};
198-
src.push(key);
199-
}
200-
201-
let dst = parse_metadata(&src);
202-
assert_eq!(dst.len(), 2);
203-
204-
assert_eq!(
205-
dst.get("key1"),
206-
Some(&vec!["value1-1".to_string(), "value1-2".to_string()])
207-
);
208-
assert_eq!(dst.get("key2"), Some(&vec!["value2".to_string()]));
209-
assert_eq!(dst.get("key3"), None);
210-
211-
// HashMap -> RepeatedField , test convert_metadata()
212-
let src = convert_metadata(&Some(dst));
213-
let mut kvs = src.into_vec();
214-
kvs.sort_by(|a, b| a.key.partial_cmp(&b.key).unwrap());
215-
216-
assert_eq!(kvs.len(), 3);
217-
218-
assert_eq!(kvs[0].key, "key1");
219-
assert_eq!(kvs[0].value, "value1-1");
220-
221-
assert_eq!(kvs[1].key, "key1");
222-
assert_eq!(kvs[1].value, "value1-2");
223-
224-
assert_eq!(kvs[2].key, "key2");
225-
assert_eq!(kvs[2].value, "value2");
226-
}
227-
}

0 commit comments

Comments
 (0)