diff --git a/Cargo.lock b/Cargo.lock index e77db9fc7467e..c5a249b7ad94d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5273,7 +5273,6 @@ dependencies = [ "goldenfile", "headers", "hex", - "highway", "http 1.3.1", "humantime", "hyper-util", @@ -8278,12 +8277,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9958ab3ce3170c061a27679916bd9b969eceeb5e8b120438e6751d0987655c42" -[[package]] -name = "highway" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9040319a6910b901d5d49cbada4a99db52836a1b63228a05f7e2b7f8feef89b1" - [[package]] name = "hive_metastore" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 5233b81af595f..cbfb0cd69cd61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -328,7 +328,6 @@ hashlink = "0.8" headers = "0.4.0" hex = "0.4.3" hickory-resolver = "0.25" -highway = "1.1" hive_metastore = "0.1.0" hostname = "0.3.1" http = "1" diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 659e7d2f83253..e3d7f9ce3584a 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -126,7 +126,6 @@ futures-util = { workspace = true } geozero = { workspace = true } headers = { workspace = true } hex = { workspace = true } -highway = { workspace = true } http = { workspace = true } humantime = { workspace = true } indicatif = { workspace = true } diff --git a/src/query/service/src/servers/flight/v1/actions/init_query_env.rs b/src/query/service/src/servers/flight/v1/actions/init_query_env.rs index c64119b54c934..93ecb10ed3416 100644 --- a/src/query/service/src/servers/flight/v1/actions/init_query_env.rs +++ b/src/query/service/src/servers/flight/v1/actions/init_query_env.rs @@ -28,7 +28,10 @@ use crate::servers::flight::v1::packets::QueryEnv; pub static INIT_QUERY_ENV: &str = "/actions/init_query_env"; -pub async fn init_query_env(env: QueryEnv) -> Result<()> { +pub async fn init_query_env(mut env: QueryEnv) -> Result<()> { + // Update query id to make sure they are compatible. + env.query_id = env.query_id.replace('-', ""); + let mut tracking_workload_group = None; let mut parent_mem_stat = ParentMemStat::StaticRef(&GLOBAL_MEM_STAT); diff --git a/src/query/service/src/servers/flight/v1/actions/start_prepared_query.rs b/src/query/service/src/servers/flight/v1/actions/start_prepared_query.rs index fbb7e8fce55e7..7a760c33dea65 100644 --- a/src/query/service/src/servers/flight/v1/actions/start_prepared_query.rs +++ b/src/query/service/src/servers/flight/v1/actions/start_prepared_query.rs @@ -21,6 +21,7 @@ use crate::servers::flight::v1::exchange::DataExchangeManager; pub static START_PREPARED_QUERY: &str = "/actions/start_prepared_query"; pub async fn start_prepared_query(id: String) -> Result<()> { + let id = id.replace('-', ""); let ctx = DataExchangeManager::instance().get_query_ctx(&id)?; let mut tracking_payload = ThreadTracker::new_tracking_payload(); diff --git a/src/query/service/src/servers/http/middleware/session.rs b/src/query/service/src/servers/http/middleware/session.rs index 85311f7cf9475..99bea6200d4de 100644 --- a/src/query/service/src/servers/http/middleware/session.rs +++ b/src/query/service/src/servers/http/middleware/session.rs @@ -681,8 +681,8 @@ impl Endpoint for HTTPSessionEndpoint { let query_id = req .headers() .get(HEADER_QUERY_ID) - .map(|id| id.to_str().unwrap().to_string()) - .unwrap_or_else(|| Uuid::new_v4().to_string()); + .map(|id| id.to_str().unwrap().replace('-', "")) + .unwrap_or_else(|| Uuid::now_v7().simple().to_string()); let mut login_history = LoginHistory::new(); login_history.handler = LoginHandler::HTTP; diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index a28e8a63137ad..941ddb8809bdd 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -34,7 +34,6 @@ use databend_common_metrics::http::metrics_incr_http_response_errors_count; use databend_common_version::DATABEND_SEMVER; use fastrace::func_path; use fastrace::prelude::*; -use highway::HighwayHash; use http::HeaderMap; use http::HeaderValue; use http::StatusCode; @@ -56,6 +55,7 @@ use poem::Request; use poem::Route; use serde::Deserialize; use serde::Serialize; +use uuid::Uuid; use super::query::ExecuteStateKind; use super::query::HttpQuery; @@ -788,8 +788,8 @@ fn query_id_not_found(query_id: &str, node_id: &str) -> PoemError { } fn query_id_to_trace_id(query_id: &str) -> TraceId { - let [hash_high, hash_low] = highway::PortableHash::default().hash128(query_id.as_bytes()); - TraceId(((hash_high as u128) << 64) + (hash_low as u128)) + let uuid = Uuid::parse_str(query_id).unwrap_or_else(|_| Uuid::now_v7()); + TraceId(uuid.as_u128()) } /// The HTTP query endpoints are expected to be responses within 60 seconds. diff --git a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs index 7cb10ac1b0856..9ac973836d88c 100644 --- a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs +++ b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs @@ -214,15 +214,20 @@ impl AsyncMysqlShim for InteractiveWorke query: &'a str, writer: QueryResultWriter<'a, W>, ) -> Result<()> { - let query_id = Uuid::new_v4().to_string(); + let query_id = Uuid::now_v7(); + // Ensure the query id shares the same representation as trace_id. + let query_id_str = query_id.simple().to_string(); + let sampled = thread_rng().gen_range(0..100) <= self.base.session.get_trace_sample_rate()?; - let root = Span::root(func_path!(), SpanContext::random().sampled(sampled)) + let span_context = + SpanContext::new(TraceId(query_id.as_u128()), SpanId::default()).sampled(sampled); + let root = Span::root(func_path!(), span_context) .with_properties(|| self.base.session.to_fastrace_properties()); let mut tracking_payload = ThreadTracker::new_tracking_payload(); - tracking_payload.query_id = Some(query_id.clone()); - tracking_payload.mem_stat = Some(MemStat::create(query_id.clone())); + tracking_payload.query_id = Some(query_id_str.clone()); + tracking_payload.mem_stat = Some(MemStat::create(query_id_str.to_string())); let _guard = ThreadTracker::tracking(tracking_payload); ThreadTracker::tracking_future(async { @@ -247,7 +252,7 @@ impl AsyncMysqlShim for InteractiveWorke let instant = Instant::now(); let query_result = self .base - .do_query(query_id, query) + .do_query(query_id_str, query) .await .map_err(|err| err.display_with_sql(query)); diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 7202ac8303e77..ef41cfd4048d9 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -847,7 +847,7 @@ impl TableContext for QueryContext { } fn get_id(&self) -> String { - self.shared.init_query_id.as_ref().read().clone() + self.shared.init_query_id.as_ref().read().replace('-', "") } fn get_current_catalog(&self) -> String { diff --git a/src/query/service/tests/it/servers/http/http_query_handlers.rs b/src/query/service/tests/it/servers/http/http_query_handlers.rs index ca7751833b2e4..861052d0bc369 100644 --- a/src/query/service/tests/it/servers/http/http_query_handlers.rs +++ b/src/query/service/tests/it/servers/http/http_query_handlers.rs @@ -447,6 +447,24 @@ async fn test_return_when_finish() -> Result<()> { async fn test_client_query_id() -> Result<()> { let _fixture = TestFixture::setup().await?; + let wait_time_secs = 5; + let sql = "select * from numbers(1)"; + let ep = create_endpoint()?; + let mut headers = HeaderMap::new(); + headers.insert("x-databend-query-id", "testqueryid".parse().unwrap()); + let (status, result) = + post_sql_to_endpoint_new_session(&ep, sql, wait_time_secs, headers).await?; + assert_eq!(status, StatusCode::OK); + assert_eq!(result.id, "testqueryid"); + + Ok(()) +} + +// `-` in query id will be trimmed. +#[tokio::test(flavor = "current_thread")] +async fn test_client_compatible_query_id() -> Result<()> { + let _fixture = TestFixture::setup().await?; + let wait_time_secs = 5; let sql = "select * from numbers(1)"; let ep = create_endpoint()?; @@ -455,7 +473,7 @@ async fn test_client_query_id() -> Result<()> { let (status, result) = post_sql_to_endpoint_new_session(&ep, sql, wait_time_secs, headers).await?; assert_eq!(status, StatusCode::OK); - assert_eq!(result.id, "test-query-id"); + assert_eq!(result.id, "testqueryid"); Ok(()) } diff --git a/tests/sqllogictests/suites/stage/ordered_unload.test b/tests/sqllogictests/suites/stage/ordered_unload.test index 22ee5cac697b2..9555bb339c7f7 100644 --- a/tests/sqllogictests/suites/stage/ordered_unload.test +++ b/tests/sqllogictests/suites/stage/ordered_unload.test @@ -23,9 +23,9 @@ SELECT COUNT(*) FROM (SELECT $1 AS a, rank() OVER (ORDER BY metadata$filename, m ---- 10000 -# data_af2ab6dc-8725-46e5-a601-3dad9c512769_0000_00000770.csv +# data_af2ab6dc872546e5a6013dad9c512769_0000_00000770.csv query -SELECT * from list_stage(location => '@s1') where substr(name, 43, 4) != '0000' +SELECT * from list_stage(location => '@s1') where substr(name, 39, 4) != '0000' ---- statement ok diff --git a/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.sh b/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.sh index 7b33fa988d8c1..1435e651cee31 100755 --- a/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.sh +++ b/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.sh @@ -5,7 +5,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) export TEST_USER_PASSWORD="password" export TEST_USER_CONNECT="bendsql --user=test-user --password=password --host=${QUERY_MYSQL_HANDLER_HOST} --port ${QUERY_HTTP_HANDLER_PORT}" -export RM_UUID="sed -E ""s/[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}/UUID/g""" +export RM_UUID="sed -E ""s/[a-z0-9]{32}/UUID/g""" stmt "drop database if exists db01;" stmt "create database db01;" diff --git a/tests/suites/1_stateful/00_stage/00_0012_stage_priv.sh b/tests/suites/1_stateful/00_stage/00_0012_stage_priv.sh index c8d4d6de3e6f1..95cde6c227ea4 100755 --- a/tests/suites/1_stateful/00_stage/00_0012_stage_priv.sh +++ b/tests/suites/1_stateful/00_stage/00_0012_stage_priv.sh @@ -7,7 +7,7 @@ export TEST_USER_NAME="u1" export TEST_USER_PASSWORD="password" export TEST_USER_CONNECT="bendsql --user=u1 --password=password --host=${QUERY_MYSQL_HANDLER_HOST} --port ${QUERY_HTTP_HANDLER_PORT}" export USER_B_CONNECT="bendsql --user=b --password=password --host=${QUERY_MYSQL_HANDLER_HOST} --port ${QUERY_HTTP_HANDLER_PORT}" -export RM_UUID="sed -E ""s/[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}/UUID/g""" +export RM_UUID="sed -E ""s/[a-z0-9]{32}/UUID/g""" echo "drop table if exists test_table;" | $BENDSQL_CLIENT_CONNECT echo "drop user if exists u1;" | $BENDSQL_CLIENT_CONNECT diff --git a/tests/suites/1_stateful/00_stage/00_0015_unload_output.sh b/tests/suites/1_stateful/00_stage/00_0015_unload_output.sh index f560a73487d08..c4545b8646903 100755 --- a/tests/suites/1_stateful/00_stage/00_0015_unload_output.sh +++ b/tests/suites/1_stateful/00_stage/00_0015_unload_output.sh @@ -5,7 +5,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) . "$CURDIR"/../../../shell_env.sh -export RM_UUID="sed -E ""s/[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}/UUID/g""" +export RM_UUID="sed -E ""s/[a-z0-9]{32}/UUID/g""" stmt "drop table if exists t1" stmt "create table t1 (a int)" diff --git a/tests/suites/1_stateful/01_streaming_load/01_0006_streaming_load_parquet.result b/tests/suites/1_stateful/01_streaming_load/01_0006_streaming_load_parquet.result index 786de0a69bea9..d9969b3735bfb 100755 --- a/tests/suites/1_stateful/01_streaming_load/01_0006_streaming_load_parquet.result +++ b/tests/suites/1_stateful/01_streaming_load/01_0006_streaming_load_parquet.result @@ -5,7 +5,7 @@ q1.parquet 624 1 >>>> streaming load: q1.parquet error : + curl -sS -H x-databend-query-id:load-q1 -H 'sql:insert into streaming_load_parquet(c2,c3) values file_format = (type='\''parquet'\'', missing_field_as=error, null_if=())' -F upload=@/tmp/streaming_load_parquet/q1.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load -{"id":"load-q1","stats":{"rows":1,"bytes":25}} +{"id":"loadq1","stats":{"rows":1,"bytes":25}} <<<< >>>> select * from streaming_load_parquet; ok 1 2021-01-01 @@ -26,7 +26,7 @@ q2.parquet 426 1 q3.parquet 426 1 >>>> streaming load: q3.parquet field_default : + curl -sS -H x-databend-query-id:load-q3 -H 'sql:insert into streaming_load_parquet(c2,c3) values file_format = (type='\''parquet'\'', missing_field_as=field_default, null_if=())' -F upload=@/tmp/streaming_load_parquet/q3.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load -{"id":"load-q3","stats":{"rows":1,"bytes":21}} +{"id":"loadq3","stats":{"rows":1,"bytes":21}} <<<< >>>> select * from streaming_load_parquet; ok NULL 2021-01-01 @@ -37,7 +37,7 @@ ok NULL 2021-01-01 q4.parquet 643 1 >>>> streaming load: q4.parquet error : + curl -sS -H x-databend-query-id:load-q4 -H 'sql:insert into streaming_load_parquet(c1,c3) values file_format = (type='\''parquet'\'', missing_field_as=error, null_if=())' -F upload=@/tmp/streaming_load_parquet/q4.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load -{"id":"load-q4","stats":{"rows":1,"bytes":26}} +{"id":"loadq4","stats":{"rows":1,"bytes":26}} <<<< >>>> select * from streaming_load_parquet; my_null NULL 2021-01-01 @@ -48,7 +48,7 @@ my_null NULL 2021-01-01 q5.parquet 643 1 >>>> streaming load: q5.parquet error 'my_null': + curl -sS -H x-databend-query-id:load-q5 -H 'sql:insert into streaming_load_parquet(c1,c3) values file_format = (type='\''parquet'\'', missing_field_as=error, null_if=('\''my_null'\''))' -F upload=@/tmp/streaming_load_parquet/q5.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load -{"id":"load-q5","stats":{"rows":1,"bytes":7}} +{"id":"loadq5","stats":{"rows":1,"bytes":7}} <<<< >>>> select * from streaming_load_parquet; NULL NULL 2021-01-01 diff --git a/tests/suites/1_stateful/01_streaming_load/01_0007_streaming_load_placeholder.result b/tests/suites/1_stateful/01_streaming_load/01_0007_streaming_load_placeholder.result index f01ccbc688a63..a69e29bc966ce 100755 --- a/tests/suites/1_stateful/01_streaming_load/01_0007_streaming_load_placeholder.result +++ b/tests/suites/1_stateful/01_streaming_load/01_0007_streaming_load_placeholder.result @@ -4,7 +4,7 @@ >>>> copy into @streaming_load_07/data.csv from (select '2020-01-02' as c4, 110 as c2) file_format=(type='csv') single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true; data.csv 17 1 + curl -sS -H x-databend-query-id:load-csv -H 'sql:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) file_format = (type=csv)' -F upload=@/tmp/streaming_load_07/data.csv -u root: -XPUT http://localhost:8000/v1/streaming_load -{"id":"load-csv","stats":{"rows":1,"bytes":39}} +{"id":"loadcsv","stats":{"rows":1,"bytes":39}} <<<< >>>> select * from streaming_load_07; ok 110 a 2020-01-02 @@ -14,7 +14,7 @@ ok 110 a 2020-01-02 >>>> copy into @streaming_load_07/data.tsv from (select '2020-01-02' as c4, 110 as c2) file_format=(type='tsv') single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true; data.tsv 15 1 + curl -sS -H x-databend-query-id:load-tsv -H 'sql:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) file_format = (type=tsv)' -F upload=@/tmp/streaming_load_07/data.tsv -u root: -XPUT http://localhost:8000/v1/streaming_load -{"id":"load-tsv","stats":{"rows":1,"bytes":39}} +{"id":"loadtsv","stats":{"rows":1,"bytes":39}} <<<< >>>> select * from streaming_load_07; ok 110 a 2020-01-02 @@ -24,7 +24,7 @@ ok 110 a 2020-01-02 >>>> copy into @streaming_load_07/data.ndjson from (select '2020-01-02' as c4, 110 as c2) file_format=(type='ndjson') single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true; data.ndjson 29 1 + curl -sS -H x-databend-query-id:load-ndjson -H 'sql:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) file_format = (type=ndjson)' -F upload=@/tmp/streaming_load_07/data.ndjson -u root: -XPUT http://localhost:8000/v1/streaming_load -{"id":"load-ndjson","stats":{"rows":1,"bytes":39}} +{"id":"loadndjson","stats":{"rows":1,"bytes":39}} <<<< >>>> select * from streaming_load_07; ok 110 a 2020-01-02 @@ -34,7 +34,7 @@ ok 110 a 2020-01-02 >>>> copy into @streaming_load_07/data.parquet from (select '2020-01-02' as c4, 110 as c2) file_format=(type='parquet') single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true; data.parquet 665 1 + curl -sS -H x-databend-query-id:load-parquet -H 'sql:insert into streaming_load_07(c3, c4, c2) values ('\''a'\'', ?, ?) file_format = (type=parquet)' -F upload=@/tmp/streaming_load_07/data.parquet -u root: -XPUT http://localhost:8000/v1/streaming_load -{"id":"load-parquet","stats":{"rows":1,"bytes":39}} +{"id":"loadparquet","stats":{"rows":1,"bytes":39}} <<<< >>>> select * from streaming_load_07; ok 110 a 2020-01-02