Skip to content

Commit 96237f1

Browse files
authored
Merge branch 'main' into bump-opendal
2 parents 5d602c3 + b5ba4a9 commit 96237f1

File tree

11 files changed

+112
-75
lines changed

11 files changed

+112
-75
lines changed

src/meta/client/src/grpc_action.rs

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,6 @@ pub trait RequestFor {
4545
type Reply;
4646
}
4747

48-
// TODO: reduce this and MetaGrpcReadReq into one enum?
49-
// Action wrapper for do_action.
50-
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, derive_more::From)]
51-
pub enum MetaGrpcWriteReq {
52-
UpsertKV(UpsertKVReq),
53-
}
54-
55-
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, derive_more::From)]
56-
pub enum MetaGrpcReadReq {
57-
GetKV(GetKVReq),
58-
MGetKV(MGetKVReq),
59-
ListKV(ListKVReq), // since 2022-05-23
60-
}
61-
6248
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, derive_more::From)]
6349
pub enum MetaGrpcReq {
6450
UpsertKV(UpsertKVReq),
@@ -81,15 +67,6 @@ impl TryInto<MetaGrpcReq> for Request<RaftRequest> {
8167
}
8268
}
8369

84-
impl tonic::IntoRequest<RaftRequest> for MetaGrpcWriteReq {
85-
fn into_request(self) -> Request<RaftRequest> {
86-
let raft_request = RaftRequest {
87-
data: serde_json::to_string(&self).expect("fail to serialize"),
88-
};
89-
tonic::Request::new(raft_request)
90-
}
91-
}
92-
9370
impl TryInto<Request<RaftRequest>> for MetaGrpcReq {
9471
type Error = serde_json::Error;
9572

@@ -103,32 +80,6 @@ impl TryInto<Request<RaftRequest>> for MetaGrpcReq {
10380
}
10481
}
10582

106-
impl TryInto<Request<RaftRequest>> for MetaGrpcWriteReq {
107-
type Error = serde_json::Error;
108-
109-
fn try_into(self) -> Result<Request<RaftRequest>, Self::Error> {
110-
let raft_request = RaftRequest {
111-
data: serde_json::to_string(&self)?,
112-
};
113-
114-
let request = tonic::Request::new(raft_request);
115-
Ok(request)
116-
}
117-
}
118-
119-
impl TryInto<Request<RaftRequest>> for MetaGrpcReadReq {
120-
type Error = serde_json::Error;
121-
122-
fn try_into(self) -> Result<Request<RaftRequest>, Self::Error> {
123-
let get_req = RaftRequest {
124-
data: serde_json::to_string(&self)?,
125-
};
126-
127-
let request = tonic::Request::new(get_req);
128-
Ok(request)
129-
}
130-
}
131-
13283
impl RequestFor for GetKVReq {
13384
type Reply = GetKVReply;
13485
}

src/meta/client/src/grpc_client.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -715,21 +715,22 @@ impl MetaGrpcClient {
715715
let resp =
716716
res.map_err(|status| MetaHandshakeError::new("handshake is refused", &status))?;
717717

718-
// backward compatibility: no version in handshake.
719-
// TODO(xp): remove this when merged.
720-
if resp.protocol_version > 0 {
721-
let min_compatible = to_digit_ver(min_metasrv_ver);
722-
if resp.protocol_version < min_compatible {
723-
let invalid_err = AnyError::error(format!(
724-
"metasrv protocol_version({}) < meta-client min-compatible({})",
725-
from_digit_ver(resp.protocol_version),
726-
min_metasrv_ver,
727-
));
728-
return Err(MetaHandshakeError::new(
729-
"incompatible protocol version",
730-
&invalid_err,
731-
));
732-
}
718+
assert!(
719+
resp.protocol_version > 0,
720+
"talking to a very old databend-meta: upgrade databend-meta to at least 0.8"
721+
);
722+
723+
let min_compatible = to_digit_ver(min_metasrv_ver);
724+
if resp.protocol_version < min_compatible {
725+
let invalid_err = AnyError::error(format!(
726+
"metasrv protocol_version({}) < meta-client min-compatible({})",
727+
from_digit_ver(resp.protocol_version),
728+
min_metasrv_ver,
729+
));
730+
return Err(MetaHandshakeError::new(
731+
"incompatible protocol version",
732+
&invalid_err,
733+
));
733734
}
734735

735736
let token = resp.payload;

src/meta/client/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ mod grpc_client;
1717
mod kv_api_impl;
1818
mod message;
1919

20-
pub use grpc_action::MetaGrpcReadReq;
2120
pub use grpc_action::MetaGrpcReq;
22-
pub use grpc_action::MetaGrpcWriteReq;
2321
pub use grpc_action::RequestFor;
2422
pub use grpc_client::ClientHandle;
2523
pub use grpc_client::MetaGrpcClient;

src/meta/client/tests/it/grpc_server.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ use std::thread::sleep;
1717
use std::time::Duration;
1818

1919
use common_base::base::tokio;
20+
use common_meta_client::to_digit_ver;
21+
use common_meta_client::MIN_METASRV_SEMVER;
2022
use common_meta_types::protobuf::meta_service_server::MetaService;
2123
use common_meta_types::protobuf::meta_service_server::MetaServiceServer;
2224
use common_meta_types::protobuf::ClientInfo;
@@ -51,7 +53,12 @@ impl MetaService for GrpcServiceForTestImpl {
5153
_request: Request<Streaming<common_meta_types::protobuf::HandshakeRequest>>,
5254
) -> Result<Response<Self::HandshakeStream>, Status> {
5355
tokio::time::sleep(Duration::from_secs(2)).await;
54-
let output = futures::stream::once(async { Ok(HandshakeResponse::default()) });
56+
let output = futures::stream::once(async {
57+
Ok(HandshakeResponse {
58+
protocol_version: to_digit_ver(&MIN_METASRV_SEMVER),
59+
payload: vec![],
60+
})
61+
});
5562
Ok(Response::new(Box::pin(output)))
5663
}
5764

src/query/service/src/pipelines/processors/transforms/hash_join/common.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ impl JoinHashTable {
154154
Ok(ConstColumn::new(inner, col.len()).arc())
155155
} else if column.is_nullable() {
156156
let col: &NullableColumn = Series::check_get(column)?;
157+
if col.is_empty() {
158+
let ty = col.data_type();
159+
return ty.create_constant_column(&DataValue::Null, validity.len());
160+
}
157161
// It's possible validity is longer than col.
158162
let diff_len = validity.len() - col.ensure_validity().len();
159163
let mut new_validity = MutableBitmap::with_capacity(validity.len());

tests/logictest/suites/mode/cluster/broadcast_join.test

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,5 +469,45 @@ drop table z0;
469469
statement ok
470470
drop table z1;
471471

472+
statement ok
473+
CREATE TABLE t0(c0BOOLEAN BOOLEAN NULL DEFAULT(false));
474+
475+
statement ok
476+
CREATE TABLE t1(c0BOOLEAN BOOL NULL, c1FLOAT FLOAT NOT NULL DEFAULT(0.4661566913127899));
477+
478+
statement ok
479+
CREATE TABLE t2(c0VARCHAR VARCHAR NULL, c1FLOAT DOUBLE NULL DEFAULT(0.954969048500061), c2VARCHAR VARCHAR NULL);
480+
481+
statement ok
482+
INSERT INTO t0(c0boolean) VALUES (false), (true);
483+
484+
statement ok
485+
INSERT INTO t0(c0boolean) VALUES (false), (false), (true);
486+
487+
statement ok
488+
INSERT INTO t1(c1float) VALUES (0.43919482827186584);
489+
490+
statement ok
491+
INSERT INTO t1(c1float) VALUES (0.2492278516292572);
492+
493+
statement ok
494+
INSERT INTO t2(c1float) VALUES (0.9702655076980591);
495+
496+
statement ok
497+
INSERT INTO t2(c1float, c2varchar) VALUES (0.5340723991394043, '02'), (0.4661566913127899, '1261837');
498+
499+
statement ok
500+
SELECT t0.c0boolean, t1.c0boolean, t1.c1float FROM t0, t1 RIGHT JOIN t2 ON t1.c0boolean;
501+
502+
503+
statement ok
504+
drop table t0;
505+
506+
statement ok
507+
drop table t1;
508+
509+
statement ok
510+
drop table t2;
511+
472512
statement ok
473513
set prefer_broadcast_join = 0;

tests/logictest/suites/query/join.test

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,3 +476,43 @@ drop table z0;
476476
statement ok
477477
drop table z1;
478478

479+
statement ok
480+
CREATE TABLE t0(c0BOOLEAN BOOLEAN NULL DEFAULT(false));
481+
482+
statement ok
483+
CREATE TABLE t1(c0BOOLEAN BOOL NULL, c1FLOAT FLOAT NOT NULL DEFAULT(0.4661566913127899));
484+
485+
statement ok
486+
CREATE TABLE t2(c0VARCHAR VARCHAR NULL, c1FLOAT DOUBLE NULL DEFAULT(0.954969048500061), c2VARCHAR VARCHAR NULL);
487+
488+
statement ok
489+
INSERT INTO t0(c0boolean) VALUES (false), (true);
490+
491+
statement ok
492+
INSERT INTO t0(c0boolean) VALUES (false), (false), (true);
493+
494+
statement ok
495+
INSERT INTO t1(c1float) VALUES (0.43919482827186584);
496+
497+
statement ok
498+
INSERT INTO t1(c1float) VALUES (0.2492278516292572);
499+
500+
statement ok
501+
INSERT INTO t2(c1float) VALUES (0.9702655076980591);
502+
503+
statement ok
504+
INSERT INTO t2(c1float, c2varchar) VALUES (0.5340723991394043, '02'), (0.4661566913127899, '1261837');
505+
506+
statement ok
507+
SELECT t0.c0boolean, t1.c0boolean, t1.c1float FROM t0, t1 RIGHT JOIN t2 ON t1.c0boolean;
508+
509+
510+
statement ok
511+
drop table t0;
512+
513+
statement ok
514+
drop table t1;
515+
516+
statement ok
517+
drop table t2;
518+

tests/suites/1_stateful/01_load_v2/01_0004_streaming_variant_load.sh

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ if [ $? -ne 0 ]; then
3232
fi
3333

3434
# load csv
35-
# todo(ariesdevil): change to new syntax when format_quote landing
36-
curl -H "insert_sql:insert into variant_test format Csv" -H "format_skip_header:0" -H 'format_field_delimiter: ,' -H 'format_record_delimiter: \n' -H "format_quote: \'" -F "upload=@/tmp/json_sample1.csv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" > /dev/null 2>&1
35+
curl -H "insert_sql:insert into variant_test file_format = (type = 'CSV' skip_header = 0 field_delimiter = ',' record_delimiter = '\n' quote = '\'')" -F "upload=@/tmp/json_sample1.csv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" > /dev/null 2>&1
3736
curl -H "insert_sql:insert into variant_test format Csv" -H "format_skip_header:0" -H 'format_field_delimiter: |' -H 'format_record_delimiter: \n' -H "format_quote: \'" -F "upload=@/tmp/json_sample2.csv" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" > /dev/null 2>&1
3837
echo "select * from variant_test order by Id asc;" | $MYSQL_CLIENT_CONNECT
3938

tests/suites/1_stateful/05_formats/05_03_xml/05_03_01_xml_v1.sh

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ curl -sH "insert_sql:insert into test_xml file_format = (type = 'XML')" -F "uplo
6868
echo "select * from test_xml" | $MYSQL_CLIENT_CONNECT
6969
echo "truncate table test_xml" | $MYSQL_CLIENT_CONNECT
7070

71-
# todo(ariesdevil): wait for row_tag landing
72-
curl -sH "insert_sql:insert into test_xml format XML" -F "upload=@/tmp/simple_v2.xml" -H "row_tag:'databend'" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | grep -c "SUCCESS"
71+
curl -sH "insert_sql:insert into test_xml file_format = (type = 'XML' row_tag = 'databend')" -F "upload=@/tmp/simple_v2.xml" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | grep -c "SUCCESS"
7372
echo "select * from test_xml" | $MYSQL_CLIENT_CONNECT
7473
echo "truncate table test_xml" | $MYSQL_CLIENT_CONNECT

tests/suites/1_stateful/05_formats/05_03_xml/05_03_02_xml_v2.sh

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ curl -sH "insert_sql:insert into test_xml file_format = (type = 'XML')" -F "uplo
3636
echo "select * from test_xml" | $MYSQL_CLIENT_CONNECT
3737
echo "truncate table test_xml" | $MYSQL_CLIENT_CONNECT
3838

39-
# todo(ariesdevil): wait for row_tag landing
40-
curl -sH "insert_sql:insert into test_xml format XML" -F "upload=@/tmp/simple_v3.xml" -H "row_tag:'databend'" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | grep -c "SUCCESS"
39+
curl -sH "insert_sql:insert into test_xml file_format = (type = 'XML' row_tag = 'databend')" -F "upload=@/tmp/simple_v3.xml" -u root: -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | grep -c "SUCCESS"
4140
echo "select * from test_xml" | $MYSQL_CLIENT_CONNECT
4241
echo "truncate table test_xml" | $MYSQL_CLIENT_CONNECT

0 commit comments

Comments
 (0)