Skip to content

Commit f4150cc

Browse files
authored
refactor(cubesql): logging sanitized query (cube-js#5094)
1 parent 321b74f commit f4150cc

File tree

4 files changed

+86
-67
lines changed

4 files changed

+86
-67
lines changed

rust/cubesql/cubesql/src/compile/mod.rs

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ use crate::{
6969
dataframe,
7070
session::DatabaseProtocol,
7171
statement::{
72-
CastReplacer, RedshiftDatePartReplacer, ToTimestampReplacer, UdfWildcardArgReplacer,
72+
CastReplacer, RedshiftDatePartReplacer, SensitiveDataSanitizer, ToTimestampReplacer,
73+
UdfWildcardArgReplacer,
7374
},
7475
types::{CommandCompletion, StatusFlags},
7576
ColumnFlags, ColumnType, Session, SessionManager, SessionState,
@@ -349,7 +350,7 @@ impl QueryPlanner {
349350
}
350351

351352
pub async fn plan(&self, stmt: &ast::Statement) -> CompilationResult<QueryPlan> {
352-
match (stmt, &self.state.protocol) {
353+
let plan = match (stmt, &self.state.protocol) {
353354
(ast::Statement::Query(q), _) => self.select_to_plan(stmt, q).await,
354355
(ast::Statement::SetTransaction { .. }, _) => Ok(QueryPlan::MetaTabular(
355356
StatusFlags::empty(),
@@ -462,6 +463,21 @@ impl QueryPlanner {
462463
"Unsupported query type: {}",
463464
stmt.to_string()
464465
))),
466+
};
467+
468+
match plan {
469+
Err(err) => {
470+
let meta = Some(HashMap::from([
471+
("query".to_string(), stmt.to_string()),
472+
(
473+
"sanitizedQuery".to_string(),
474+
SensitiveDataSanitizer::new().replace(stmt).to_string(),
475+
),
476+
]));
477+
let msg = err.message();
478+
Err(err.with_message(msg).with_meta(meta))
479+
}
480+
_ => plan,
465481
}
466482
}
467483

@@ -1173,7 +1189,16 @@ WHERE `TABLE_SCHEMA` = '{}'",
11731189
let plan = df_query_planner
11741190
.statement_to_plan(DFStatement::Statement(Box::new(stmt.clone())))
11751191
.map_err(|err| {
1176-
CompilationError::internal(format!("Initial planning error: {}", err))
1192+
let message = format!("Initial planning error: {}", err,);
1193+
let meta = Some(HashMap::from([
1194+
("query".to_string(), stmt.to_string()),
1195+
(
1196+
"sanitizedQuery".to_string(),
1197+
SensitiveDataSanitizer::new().replace(&stmt).to_string(),
1198+
),
1199+
]));
1200+
1201+
CompilationError::internal(message).with_meta(meta)
11771202
})?;
11781203

11791204
let optimized_plan = plan;
@@ -1196,12 +1221,27 @@ WHERE `TABLE_SCHEMA` = '{}'",
11961221
e.message
11971222
),
11981223
e.to_backtrace().unwrap_or_else(|| Backtrace::capture()),
1199-
None,
1224+
Some(HashMap::from([
1225+
("query".to_string(), stmt.to_string()),
1226+
(
1227+
"sanitizedQuery".to_string(),
1228+
SensitiveDataSanitizer::new().replace(&stmt).to_string(),
1229+
),
1230+
])),
1231+
),
1232+
CubeErrorCauseType::User(_) => CompilationError::User(
1233+
format!(
1234+
"Error during rewrite: {}. Please check logs for additional information.",
1235+
e.message
1236+
),
1237+
Some(HashMap::from([
1238+
("query".to_string(), stmt.to_string()),
1239+
(
1240+
"sanitizedQuery".to_string(),
1241+
SensitiveDataSanitizer::new().replace(&stmt).to_string(),
1242+
),
1243+
])),
12001244
),
1201-
CubeErrorCauseType::User(_) => CompilationError::user(format!(
1202-
"Error during rewrite: {}. Please check logs for additional information.",
1203-
e.message
1204-
)),
12051245
});
12061246

12071247
if let Err(_) = &result {

rust/cubesql/cubesql/src/compile/parser.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::HashMap;
2+
13
use sqlparser::{
24
ast::Statement,
35
dialect::{Dialect, PostgreSqlDialect},
@@ -41,6 +43,8 @@ pub fn parse_sql_to_statements(
4143
query: &String,
4244
protocol: DatabaseProtocol,
4345
) -> CompilationResult<Vec<Statement>> {
46+
let original_query = query.clone();
47+
4448
log::debug!("Parsing SQL: {}", query);
4549
// @todo Support without workarounds
4650
// metabase
@@ -241,7 +245,10 @@ pub fn parse_sql_to_statements(
241245
DatabaseProtocol::PostgreSQL => Parser::parse_sql(&PostgreSqlDialect {}, query.as_str()),
242246
};
243247

244-
parse_result.map_err(|err| CompilationError::user(format!("Unable to parse: {:?}", err)))
248+
parse_result.map_err(|err| {
249+
CompilationError::user(format!("Unable to parse: {:?}", err))
250+
.with_meta(Some(HashMap::from([("query".to_string(), original_query)])))
251+
})
245252
}
246253

247254
pub fn parse_sql_to_statement(
@@ -265,7 +272,7 @@ pub fn parse_sql_to_statement(
265272
))
266273
};
267274

268-
Err(err)
275+
Err(err.with_meta(Some(HashMap::from([("query".to_string(), query.clone())]))))
269276
}
270277
}
271278
}

rust/cubesql/cubesql/src/sql/mysql/service.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use tokio::{
2222
use crate::{
2323
compile::{convert_sql_to_cube_query, parser::parse_sql_to_statement},
2424
config::processing_loop::ProcessingLoop,
25-
sql::statement::SensitiveDataSanitizer,
2625
telemetry::{ContextLogger, SessionLogger},
2726
CubeErrorCauseType,
2827
};
@@ -81,20 +80,7 @@ impl MySqlConnection {
8180
}
8281
};
8382

84-
let query = query.to_string();
85-
let mut props = props.unwrap_or_default();
86-
if let Ok(statement) = parse_sql_to_statement(&query, DatabaseProtocol::PostgreSQL)
87-
{
88-
props.insert(
89-
"sanitizedQuery".to_string(),
90-
SensitiveDataSanitizer::new()
91-
.replace(&statement)
92-
.to_string(),
93-
);
94-
}
95-
props.insert("query".to_string(), query);
96-
97-
self.logger.error(message.as_str(), Some(props));
83+
self.logger.error(message.as_str(), props);
9884

9985
if let Some(bt) = e.backtrace() {
10086
trace!("{}", bt);

rust/cubesql/cubesql/src/sql/postgres/shim.rs

Lines changed: 28 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@ use crate::{
1111
df_type_to_pg_tid,
1212
extended::{Cursor, Portal, PortalFrom},
1313
session::DatabaseProtocol,
14-
statement::{
15-
PostgresStatementParamsFinder, SensitiveDataSanitizer, StatementPlaceholderReplacer,
16-
},
14+
statement::{PostgresStatementParamsFinder, StatementPlaceholderReplacer},
1715
types::CommandCompletion,
1816
writer::BatchWriter,
1917
AuthContextRef, Session, StatusFlags,
@@ -241,62 +239,56 @@ impl AsyncPostgresShim {
241239
// When an error is detected while processing any extended-query message, the backend issues ErrorResponse,
242240
// then reads and discards messages until a Sync is reached, then issues ReadyForQuery and returns to normal message processing.
243241
let mut tracked_error: Option<ConnectionError> = None;
244-
let mut tracked_error_query: Option<String> = None;
245242

246243
loop {
247244
let mut doing_extended_query_message = false;
248245

249-
let (result, query) = match buffer::read_message(&mut self.socket).await? {
250-
protocol::FrontendMessage::Query(body) => {
251-
let query = body.query.clone();
252-
(self.process_query(body.query).await, Some(query))
253-
}
254-
protocol::FrontendMessage::Flush => (self.flush().await, None),
246+
let result = match buffer::read_message(&mut self.socket).await? {
247+
protocol::FrontendMessage::Query(body) => self.process_query(body.query).await,
248+
protocol::FrontendMessage::Flush => self.flush().await,
255249
protocol::FrontendMessage::Terminate => return Ok(()),
256250
// Extended
257251
protocol::FrontendMessage::Parse(body) => {
258252
if tracked_error.is_none() {
259253
doing_extended_query_message = true;
260-
let query = body.query.clone();
261-
(self.parse(body).await, Some(query))
254+
self.parse(body).await
262255
} else {
263256
continue;
264257
}
265258
}
266259
protocol::FrontendMessage::Bind(body) => {
267260
if tracked_error.is_none() {
268261
doing_extended_query_message = true;
269-
(self.bind(body).await, None)
262+
self.bind(body).await
270263
} else {
271264
continue;
272265
}
273266
}
274267
protocol::FrontendMessage::Execute(body) => {
275268
if tracked_error.is_none() {
276269
doing_extended_query_message = true;
277-
(self.execute(body).await, None)
270+
self.execute(body).await
278271
} else {
279272
continue;
280273
}
281274
}
282275
protocol::FrontendMessage::Close(body) => {
283276
if tracked_error.is_none() {
284-
(self.close(body).await, None)
277+
self.close(body).await
285278
} else {
286279
continue;
287280
}
288281
}
289282
protocol::FrontendMessage::Describe(body) => {
290283
if tracked_error.is_none() {
291-
(self.describe(body).await, None)
284+
self.describe(body).await
292285
} else {
293286
continue;
294287
}
295288
}
296289
protocol::FrontendMessage::Sync => {
297290
if let Some(err) = tracked_error.take() {
298-
self.handle_connection_error(err, tracked_error_query.take())
299-
.await?;
291+
self.handle_connection_error(err).await?;
300292
};
301293

302294
self.write_ready().await?;
@@ -316,9 +308,8 @@ impl AsyncPostgresShim {
316308
if let Err(err) = result {
317309
if doing_extended_query_message {
318310
tracked_error = Some(err);
319-
tracked_error_query = query;
320311
} else {
321-
self.handle_connection_error(err, query).await?;
312+
self.handle_connection_error(err).await?;
322313
}
323314
}
324315
}
@@ -327,7 +318,6 @@ impl AsyncPostgresShim {
327318
pub async fn handle_connection_error(
328319
&mut self,
329320
err: ConnectionError,
330-
query: Option<String>,
331321
) -> Result<(), ConnectionError> {
332322
let (message, props) = match &err {
333323
ConnectionError::CompilationError(err) => match err {
@@ -349,32 +339,28 @@ impl AsyncPostgresShim {
349339
),
350340
};
351341

352-
let mut props = props.unwrap_or_default();
353-
match query {
354-
Some(query) => {
355-
if let Ok(statement) = parse_sql_to_statement(&query, DatabaseProtocol::PostgreSQL)
356-
{
357-
props.insert(
358-
"sanitizedQuery".to_string(),
359-
SensitiveDataSanitizer::new()
360-
.replace(&statement)
361-
.to_string(),
362-
);
363-
}
364-
props.insert("query".to_string(), query);
365-
}
366-
_ => (),
367-
}
368-
369-
self.logger.error(message.as_str(), Some(props));
370-
371342
if let Some(bt) = err.backtrace() {
372343
trace!("{}", bt);
373344
} else {
374345
trace!("Backtrace: not found");
375346
}
376347

377-
self.write(err.to_error_response()).await?;
348+
let err_response = match &props {
349+
Some(props) => {
350+
let query = props.get(&"query".to_string());
351+
let mut err_response = err.to_error_response();
352+
if let Some(query) = query {
353+
err_response.message = format!("{}\nQUERY: {}", message, query);
354+
}
355+
356+
err_response
357+
}
358+
None => err.to_error_response(),
359+
};
360+
361+
self.logger.error(message.as_str(), props);
362+
363+
self.write(err_response).await?;
378364

379365
Ok(())
380366
}
@@ -1394,7 +1380,7 @@ impl AsyncPostgresShim {
13941380
debug!("Query: {}", query);
13951381

13961382
if let Err(err) = self.execute_query(&query).await {
1397-
self.handle_connection_error(err, Some(query)).await?;
1383+
self.handle_connection_error(err).await?;
13981384
};
13991385

14001386
self.write_ready().await

0 commit comments

Comments
 (0)