Skip to content

Commit e3799a2

Browse files
authored
feat: http handler support set variable. (#16239)
* http handler support set variable. * serialize Scalar value of variable as separate json string.
1 parent 66c8dea commit e3799a2

File tree

7 files changed

+123
-8
lines changed

7 files changed

+123
-8
lines changed

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::sync::Arc;
1617
use std::time::SystemTime;
1718

@@ -22,6 +23,7 @@ use databend_common_exception::ErrorCode;
2223
use databend_common_exception::Result;
2324
use databend_common_expression::DataBlock;
2425
use databend_common_expression::DataSchemaRef;
26+
use databend_common_expression::Scalar;
2527
use databend_common_io::prelude::FormatSettings;
2628
use databend_common_settings::Settings;
2729
use databend_storages_common_txn::TxnManagerRef;
@@ -147,6 +149,7 @@ pub struct ExecutorSessionState {
147149
pub secondary_roles: Option<Vec<String>>,
148150
pub settings: Arc<Settings>,
149151
pub txn_manager: TxnManagerRef,
152+
pub variables: HashMap<String, Scalar>,
150153
}
151154

152155
impl ExecutorSessionState {
@@ -157,6 +160,7 @@ impl ExecutorSessionState {
157160
secondary_roles: session.get_secondary_roles(),
158161
settings: session.get_settings(),
159162
txn_manager: session.txn_mgr(),
163+
variables: session.get_all_variables(),
160164
}
161165
}
162166
}

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::collections::BTreeMap;
16+
use std::collections::HashMap;
1617
use std::fmt::Debug;
1718
use std::sync::atomic::AtomicBool;
1819
use std::sync::atomic::Ordering;
@@ -29,6 +30,7 @@ use databend_common_base::runtime::TrySpawn;
2930
use databend_common_catalog::table_context::StageAttachment;
3031
use databend_common_exception::ErrorCode;
3132
use databend_common_exception::Result;
33+
use databend_common_expression::Scalar;
3234
use databend_common_io::prelude::FormatSettings;
3335
use databend_common_metrics::http::metrics_incr_http_response_errors_count;
3436
use databend_common_settings::ScopeLevel;
@@ -39,7 +41,9 @@ use log::warn;
3941
use poem::web::Json;
4042
use poem::IntoResponse;
4143
use serde::Deserialize;
44+
use serde::Deserializer;
4245
use serde::Serialize;
46+
use serde::Serializer;
4347

4448
use super::HttpQueryContext;
4549
use super::RemoveReason;
@@ -181,6 +185,75 @@ pub struct ServerInfo {
181185
pub start_time: String,
182186
}
183187

188+
#[derive(Deserialize, Serialize, Debug, Default, Clone, Eq, PartialEq)]
189+
pub struct HttpSessionStateInternal {
190+
/// value is JSON of Scalar
191+
variables: Vec<(String, String)>,
192+
}
193+
194+
impl HttpSessionStateInternal {
195+
fn new(variables: &HashMap<String, Scalar>) -> Self {
196+
let variables = variables
197+
.iter()
198+
.map(|(k, v)| {
199+
(
200+
k.clone(),
201+
serde_json::to_string(&v).expect("fail to serialize Scalar"),
202+
)
203+
})
204+
.collect();
205+
Self { variables }
206+
}
207+
208+
pub fn get_variables(&self) -> Result<HashMap<String, Scalar>> {
209+
let mut vars = HashMap::with_capacity(self.variables.len());
210+
for (k, v) in self.variables.iter() {
211+
match serde_json::from_str::<Scalar>(v) {
212+
Ok(s) => {
213+
vars.insert(k.to_string(), s);
214+
}
215+
Err(e) => {
216+
return Err(ErrorCode::BadBytes(format!(
217+
"fail decode scalar from string '{v}', error: {e}"
218+
)));
219+
}
220+
}
221+
}
222+
Ok(vars)
223+
}
224+
}
225+
226+
fn serialize_as_json_string<S>(
227+
value: &Option<HttpSessionStateInternal>,
228+
serializer: S,
229+
) -> Result<S::Ok, S::Error>
230+
where
231+
S: Serializer,
232+
{
233+
match value {
234+
Some(complex_value) => {
235+
let json_string =
236+
serde_json::to_string(complex_value).map_err(serde::ser::Error::custom)?;
237+
serializer.serialize_some(&json_string)
238+
}
239+
None => serializer.serialize_none(),
240+
}
241+
}
242+
243+
fn deserialize_from_json_string<'de, D>(
244+
deserializer: D,
245+
) -> Result<Option<HttpSessionStateInternal>, D::Error>
246+
where D: Deserializer<'de> {
247+
let json_string: Option<String> = Option::deserialize(deserializer)?;
248+
match json_string {
249+
Some(s) => {
250+
let complex_value = serde_json::from_str(&s).map_err(serde::de::Error::custom)?;
251+
Ok(Some(complex_value))
252+
}
253+
None => Ok(None),
254+
}
255+
}
256+
184257
#[derive(Deserialize, Serialize, Debug, Default, Clone, Eq, PartialEq)]
185258
pub struct HttpSessionConf {
186259
#[serde(skip_serializing_if = "Option::is_none")]
@@ -189,6 +262,7 @@ pub struct HttpSessionConf {
189262
pub role: Option<String>,
190263
#[serde(skip_serializing_if = "Option::is_none")]
191264
pub secondary_roles: Option<Vec<String>>,
265+
// todo: remove this later
192266
#[serde(skip_serializing_if = "Option::is_none")]
193267
pub keep_server_session_secs: Option<u64>,
194268
#[serde(skip_serializing_if = "Option::is_none")]
@@ -198,9 +272,19 @@ pub struct HttpSessionConf {
198272
// used to check if the session is still on the same server
199273
#[serde(skip_serializing_if = "Option::is_none")]
200274
pub last_server_info: Option<ServerInfo>,
201-
// last_query_ids[0] is the last query id, last_query_ids[1] is the second last query id, etc.
275+
/// last_query_ids[0] is the last query id, last_query_ids[1] is the second last query id, etc.
202276
#[serde(default)]
203277
pub last_query_ids: Vec<String>,
278+
/// hide state not useful to clients
279+
/// so client only need to know there is a String field `internal`,
280+
/// which need to carry with session/conn
281+
#[serde(default)]
282+
#[serde(skip_serializing_if = "Option::is_none")]
283+
#[serde(
284+
serialize_with = "serialize_as_json_string",
285+
deserialize_with = "deserialize_from_json_string"
286+
)]
287+
pub internal: Option<HttpSessionStateInternal>,
204288
}
205289

206290
impl HttpSessionConf {}
@@ -360,6 +444,11 @@ impl HttpQuery {
360444
})?;
361445
}
362446
}
447+
if let Some(state) = &session_conf.internal {
448+
if !state.variables.is_empty() {
449+
session.set_all_variables(state.get_variables()?)
450+
}
451+
}
363452
try_set_txn(&ctx.query_id, &session, session_conf, &http_query_manager)?;
364453
};
365454

@@ -548,6 +637,11 @@ impl HttpQuery {
548637
let role = session_state.current_role.clone();
549638
let secondary_roles = session_state.secondary_roles.clone();
550639
let txn_state = session_state.txn_manager.lock().state();
640+
let internal = if !session_state.variables.is_empty() {
641+
Some(HttpSessionStateInternal::new(&session_state.variables))
642+
} else {
643+
None
644+
};
551645
if txn_state != TxnState::AutoCommit
552646
&& !self.is_txn_mgr_saved.load(Ordering::Relaxed)
553647
&& matches!(executor.state, ExecuteState::Stopped(_))
@@ -573,6 +667,7 @@ impl HttpQuery {
573667
txn_state: Some(txn_state),
574668
last_server_info: Some(HttpQueryManager::instance().server_info.clone()),
575669
last_query_ids: vec![self.id.clone()],
670+
internal,
576671
}
577672
}
578673

src/query/service/src/sessions/session.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::net::SocketAddr;
1617
use std::sync::Arc;
1718

@@ -20,6 +21,7 @@ use databend_common_catalog::cluster_info::Cluster;
2021
use databend_common_config::GlobalConfig;
2122
use databend_common_exception::ErrorCode;
2223
use databend_common_exception::Result;
24+
use databend_common_expression::Scalar;
2325
use databend_common_io::prelude::FormatSettings;
2426
use databend_common_meta_app::principal::GrantObject;
2527
use databend_common_meta_app::principal::OwnershipObject;
@@ -352,6 +354,14 @@ impl Session {
352354
Some(x) => x.get_query_profiles(),
353355
}
354356
}
357+
358+
pub fn get_all_variables(&self) -> HashMap<String, Scalar> {
359+
self.session_ctx.get_all_variables()
360+
}
361+
362+
pub fn set_all_variables(&self, variables: HashMap<String, Scalar>) {
363+
self.session_ctx.set_all_variables(variables)
364+
}
355365
}
356366

357367
impl Drop for Session {

src/query/service/src/sessions/session_ctx.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,4 +316,10 @@ impl SessionContext {
316316
pub fn get_variable(&self, key: &str) -> Option<Scalar> {
317317
self.variables.read().get(key).cloned()
318318
}
319+
pub fn get_all_variables(&self) -> HashMap<String, Scalar> {
320+
self.variables.read().clone()
321+
}
322+
pub fn set_all_variables(&self, variables: HashMap<String, Scalar>) {
323+
*self.variables.write() = variables
324+
}
319325
}

src/query/service/tests/it/servers/http/http_query_handlers.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1393,6 +1393,7 @@ async fn test_affect() -> Result<()> {
13931393
txn_state: Some(TxnState::AutoCommit),
13941394
last_server_info: None,
13951395
last_query_ids: vec![],
1396+
internal: None,
13961397
}),
13971398
),
13981399
(
@@ -1415,6 +1416,7 @@ async fn test_affect() -> Result<()> {
14151416
txn_state: Some(TxnState::AutoCommit),
14161417
last_server_info: None,
14171418
last_query_ids: vec![],
1419+
internal: None,
14181420
}),
14191421
),
14201422
(
@@ -1432,6 +1434,7 @@ async fn test_affect() -> Result<()> {
14321434
txn_state: Some(TxnState::AutoCommit),
14331435
last_server_info: None,
14341436
last_query_ids: vec![],
1437+
internal: None,
14351438
}),
14361439
),
14371440
(
@@ -1451,6 +1454,7 @@ async fn test_affect() -> Result<()> {
14511454
txn_state: Some(TxnState::AutoCommit),
14521455
last_server_info: None,
14531456
last_query_ids: vec![],
1457+
internal: None,
14541458
}),
14551459
),
14561460
(
@@ -1472,6 +1476,7 @@ async fn test_affect() -> Result<()> {
14721476
txn_state: Some(TxnState::AutoCommit),
14731477
last_server_info: None,
14741478
last_query_ids: vec![],
1479+
internal: None,
14751480
}),
14761481
),
14771482
];

tests/sqllogictests/src/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub struct HttpSessionConf {
4242
pub last_server_info: Option<ServerInfo>,
4343
#[serde(default)]
4444
pub last_query_ids: Vec<String>,
45+
pub internal: Option<String>,
4546
}
4647

4748
pub fn parser_rows(rows: &Value) -> Result<Vec<Vec<String>>> {

tests/sqllogictests/suites/query/set.test

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@ select value, default = value from system.settings where name in ('max_threads'
77
4 0
88
56 0
99

10-
onlyif mysql
1110
statement ok
1211
set variable (a, b) = (select 3, 55)
1312

14-
onlyif mysql
13+
1514
statement ok
1615
SET GLOBAL (max_threads, storage_io_min_bytes_for_seek) = select $a + 1, $b + 1;
1716

@@ -30,25 +29,20 @@ select default = value from system.settings where name in ('max_threads', 'stor
3029
1
3130
1
3231

33-
onlyif mysql
3432
statement ok
3533
set variable a = 1;
3634

37-
onlyif mysql
3835
statement ok
3936
set variable (b, c) = ('yy', 'zz');
4037

41-
onlyif mysql
4238
query ITT
4339
select $a + getvariable('a') + $a, getvariable('b'), getvariable('c'), getvariable('d')
4440
----
4541
3 yy zz NULL
4642

47-
onlyif mysql
4843
statement ok
4944
unset variable (a, b)
5045

51-
onlyif mysql
5246
query ITT
5347
select getvariable('a'), getvariable('b'), 'xx' || 'yy' || getvariable('c') , getvariable('d')
5448
----

0 commit comments

Comments
 (0)