Skip to content

Commit eadbd7b

Browse files
committed
fix: fix the rebase caused the error
fix rebase error. Signed-off-by: jokemanfire <hu.dingyang@zte.com.cn>
1 parent 23256d3 commit eadbd7b

File tree

7 files changed

+18
-73
lines changed

7 files changed

+18
-73
lines changed

.github/workflows/bvt.yml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,7 @@ jobs:
1515
shell: bash
1616
- name: Check
1717
run: |
18-
make deps
19-
make check
20-
make -C compiler check
21-
make -C ttrpc-codegen check
22-
make -C codegen check
18+
make check-all
2319
2420
make:
2521
name: Build

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ prost-build = { version = "0.11", optional = true }
3939

4040
[features]
4141
default = ["sync","rustprotobuf"]
42-
async = ["dep:async-trait", "dep:tokio", "dep:futures", "dep:tokio-vsock"]
42+
async = ["async-trait", "async-stream", "tokio", "futures", "tokio-vsock"]
4343
sync = []
4444
prost = ["dep:prost", "dep:prost-build"]
4545
rustprotobuf = ["dep:protobuf"]

src/proto.rs

Lines changed: 9 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -241,23 +241,9 @@ impl GenMessage {
241241
.await
242242
.map_err(|e| Error::Socket(e.to_string()))?;
243243

244-
if header.length > MESSAGE_LENGTH_MAX as u32 {
245-
#[cfg(not(feature = "prost"))]
246-
return Err(get_rpc_status(
247-
Code::INVALID_ARGUMENT,
248-
format!(
249-
"message length {} exceed maximum message size of {}",
250-
header.length, MESSAGE_LENGTH_MAX
251-
),
252-
));
253-
#[cfg(feature = "prost")]
254-
return Err(get_rpc_status(
255-
Code::InvalidArgument,
256-
format!(
257-
"message length {} exceed maximum message size of {}",
258-
header.length, MESSAGE_LENGTH_MAX
259-
),
260-
));
244+
if let Err(e) = check_oversize(header.length as usize, true) {
245+
discard_message_body(reader, &header).await?;
246+
return Err(GenMessageError::ReturnError(header, e));
261247
}
262248

263249
let mut content = vec![0; header.length as usize];
@@ -407,23 +393,12 @@ where
407393
.await
408394
.map_err(|e| Error::Socket(e.to_string()))?;
409395

410-
if header.length > MESSAGE_LENGTH_MAX as u32 {
411-
#[cfg(not(feature = "prost"))]
412-
return Err(get_rpc_status(
413-
Code::INVALID_ARGUMENT,
414-
format!(
415-
"message length {} exceed maximum message size of {}",
416-
header.length, MESSAGE_LENGTH_MAX
417-
),
418-
));
419-
#[cfg(feature = "prost")]
420-
return Err(get_rpc_status(
421-
Code::InvalidArgument,
422-
format!(
423-
"message length {} exceed maximum message size of {}",
424-
header.length, MESSAGE_LENGTH_MAX
425-
),
426-
));
396+
if check_oversize(header.length as usize, true).is_err() {
397+
discard_message_body(reader, &header).await?;
398+
return Ok(Self {
399+
header,
400+
payload: C::decode("").map_err(err_to_others_err!(e, "Decode payload failed."))?,
401+
});
427402
}
428403

429404
let mut content = vec![0; header.length as usize];

src/sync/channel.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,17 +97,10 @@ pub fn read_message(conn: &PipeConnection) -> Result<(MessageHeader, Result<Vec<
9797
let mh = read_message_header(conn)?;
9898
trace!("Got Message header {:?}", mh);
9999

100-
if mh.length > MESSAGE_LENGTH_MAX as u32 {
101-
return Err(get_rpc_status(
102-
#[cfg(not(feature = "prost"))]
103-
Code::INVALID_ARGUMENT,
104-
#[cfg(feature = "prost")]
105-
Code::InvalidArgument,
106-
format!(
107-
"message length {} exceed maximum message size of {}",
108-
mh.length, MESSAGE_LENGTH_MAX
109-
),
110-
));
100+
let mh_len = mh.length as usize;
101+
if let Err(e) = check_oversize(mh_len, true) {
102+
discard_count(conn, mh_len)?;
103+
return Ok((mh, Err(e)));
111104
}
112105

113106
let buf = read_count(conn, mh.length as usize)?;

src/sync/server.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -173,20 +173,13 @@ fn start_method_handler_thread(
173173
{
174174
let mut s = CodedInputStream::from_bytes(&buf);
175175
req = Request::new();
176-
req = Request::new();
177176
if let Err(x) = req.merge_from(&mut s) {
178177
let status = get_status(Code::INVALID_ARGUMENT, x.to_string());
179178
let mut res = Response::new();
180179
res.set_status(status);
181180
if let Err(x) = response_to_channel(mh.stream_id, res, res_tx.clone()) {
182181
debug!("response_to_channel get error {:?}", x);
183-
quit.store(true, Ordering::SeqCst);
184-
// the client connection would be closed and
185-
// the connection dealing main thread would have
186-
// exited.
187-
control_tx
188-
.send(())
189-
.unwrap_or_else(|err| trace!("Failed to send {:?}", err));
182+
quit_connection(quit, control_tx);
190183
break;
191184
}
192185
continue;

src/sync/utils.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,10 @@ pub fn response_to_channel(
6161

6262
pub fn response_error_to_channel(
6363
stream_id: u32,
64-
res: Response,
64+
e: Error,
6565
tx: std::sync::mpsc::Sender<(MessageHeader, Vec<u8>)>,
6666
) -> Result<()> {
67-
let buf = res.encode_to_vec();
68-
69-
let mh = MessageHeader {
70-
length: buf.len() as u32,
71-
stream_id,
72-
type_: MESSAGE_TYPE_RESPONSE,
73-
flags: 0,
74-
};
75-
tx.send((mh, buf)).map_err(err_to_others_err!(e, ""))?;
76-
77-
Ok(())
67+
response_to_channel(stream_id, e.into(), tx)
7868
}
7969

8070
/// Handle request in sync mode.

tests/run-examples.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,7 @@ fn run_examples() -> Result<(), Box<dyn std::error::Error>> {
9090
{
9191
println!("Running examples with rustprotobuf feature");
9292
run_example("server", "client", "example")?;
93-
#[cfg(unix)]
9493
run_example("async-server", "async-client", "example")?;
95-
#[cfg(unix)]
9694
run_example("async-stream-server", "async-stream-client", "example")?;
9795
}
9896

0 commit comments

Comments
 (0)