Skip to content

Commit a74637c

Browse files
authored
Merge pull request #7744 from drmingdrmer/11-refact-read-table
chore: minor refactor for extract_scan_fields_from_projection(); cleanup info log
2 parents 6fdcc08 + 4468ff3 commit a74637c

File tree

9 files changed

+99
-26
lines changed

9 files changed

+99
-26
lines changed

src/meta/raft-store/src/state_machine/sm.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -263,11 +263,7 @@ impl StateMachine {
263263
/// command safely in case of network failure etc.
264264
#[tracing::instrument(level = "debug", skip(self, entry), fields(log_id=%entry.log_id))]
265265
pub async fn apply(&self, entry: &Entry<LogEntry>) -> Result<AppliedState, MetaStorageError> {
266-
info!(
267-
"apply: summary: {}; payload: {:?}",
268-
entry.summary(),
269-
entry.payload
270-
);
266+
info!("apply: summary: {}", entry.summary(),);
271267

272268
let log_id = &entry.log_id;
273269

@@ -280,8 +276,11 @@ impl StateMachine {
280276
txn_sm_meta.insert(&LastApplied, &StateMachineMetaValue::LogId(*log_id))?;
281277

282278
match entry.payload {
283-
EntryPayload::Blank => {}
279+
EntryPayload::Blank => {
280+
info!("apply: blank");
281+
}
284282
EntryPayload::Normal(ref data) => {
283+
info!("apply: {}", data);
285284
if let Some(ref txid) = data.txid {
286285
let (serial, resp) =
287286
self.txn_get_client_last_resp(&txid.client, &txn_tree)?;
@@ -303,7 +302,12 @@ impl StateMachine {
303302
};
304303

305304
let res = self.apply_cmd(&data.cmd, &txn_tree, kv_pairs.as_ref(), log_time_ms);
306-
info!("apply_result: summary: {}; res: {:?}", entry.summary(), res);
305+
if let Ok(ok) = &res {
306+
info!("apply_result: summary: {}; res ok: {}", entry.summary(), ok);
307+
}
308+
if let Err(err) = &res {
309+
info!("apply_result: summary: {}; res err: {:?}", entry.summary(), err);
310+
}
307311

308312
let applied_state = res?;
309313

@@ -317,6 +321,7 @@ impl StateMachine {
317321
return Ok(Some(applied_state));
318322
}
319323
EntryPayload::Membership(ref mem) => {
324+
info!("apply: membership: {:?}", mem);
320325
txn_sm_meta.insert(
321326
&LastMembership,
322327
&StateMachineMetaValue::Membership(EffectiveMembership {
@@ -760,7 +765,7 @@ impl StateMachine {
760765
kv_pairs: Option<&(DeleteByPrefixKeyMap, DeleteByPrefixKeyMap)>,
761766
log_time_ms: u64,
762767
) -> Result<AppliedState, MetaStorageError> {
763-
info!("apply_cmd: {:?}", cmd);
768+
info!("apply_cmd: {}", cmd);
764769

765770
match cmd {
766771
Cmd::IncrSeq { ref key } => self.apply_incr_seq_cmd(key, txn_tree),

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ impl MetaService for MetaServiceImpl {
255255

256256
let request = request.into_inner();
257257

258-
info!("Receive txn_request: {:?}", request);
258+
info!("Receive txn_request: {}", request);
259259

260260
let body = self.execute_txn(request).await;
261261
incr_meta_metrics_meta_sent_bytes(body.encoded_len() as u64);

src/meta/service/src/meta_service/meta_leader.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,19 @@ impl<'a> MetaLeader<'a> {
184184
// report metrics
185185
let _guard = WithCount::new((), ProposalPending);
186186

187-
info!("write LogEntry: {:?}", entry);
188-
let write_rst = self.meta_node.raft.client_write(entry).await;
189-
info!("raft.client_write rst: {:?}", write_rst);
187+
info!("write LogEntry: {}", entry);
188+
let write_res = self.meta_node.raft.client_write(entry).await;
189+
if let Ok(ok) = &write_res {
190+
info!(
191+
"raft.client_write res ok: log_id: {}, data: {}, membership: {:?}",
192+
ok.log_id, ok.data, ok.membership
193+
);
194+
}
195+
if let Err(err) = &write_res {
196+
info!("raft.client_write res err: {:?}", err);
197+
}
190198

191-
match write_rst {
199+
match write_res {
192200
Ok(resp) => Ok(resp.data),
193201
Err(cli_write_err) => Err(RaftWriteError::from_raft_err(cli_write_err)),
194202
}

src/meta/types/src/applied_state.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::fmt;
1516
use std::fmt::Debug;
17+
use std::fmt::Formatter;
1618

1719
use openraft::AppDataResponse;
1820
use serde::Deserialize;
@@ -47,6 +49,29 @@ pub enum AppliedState {
4749

4850
impl AppDataResponse for AppliedState {}
4951

52+
impl fmt::Display for AppliedState {
53+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
54+
write!(f, "AppliedState: ")?;
55+
match self {
56+
AppliedState::Seq { seq } => {
57+
write!(f, "Seq: {}", seq)
58+
}
59+
AppliedState::Node { prev, result } => {
60+
write!(f, "Node: prev: {:?}, result: {:?}", prev, result)
61+
}
62+
AppliedState::KV(change) => {
63+
write!(f, "KV: {}", change)
64+
}
65+
AppliedState::TxnReply(txnreply) => {
66+
write!(f, "Txn: {}", txnreply)
67+
}
68+
AppliedState::None => {
69+
write!(f, "None")
70+
}
71+
}
72+
}
73+
}
74+
5075
impl AppliedState {
5176
/// Whether the state changed
5277
pub fn changed(&self) -> bool {

src/meta/types/src/change.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::fmt::Debug;
16+
use std::fmt::Display;
17+
use std::fmt::Formatter;
18+
1519
use serde::Deserialize;
1620
use serde::Serialize;
1721

@@ -104,3 +108,15 @@ where
104108
unreachable!("impossible: both prev and result are None");
105109
}
106110
}
111+
112+
impl<T, ID> Display for Change<T, ID>
113+
where
114+
T: Debug + Clone + PartialEq,
115+
ID: Debug + Clone + PartialEq,
116+
{
117+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
118+
write!(f, "id: {:?}", self.ident)?;
119+
write!(f, "prev: {:?}", self.prev)?;
120+
write!(f, "result: {:?}", self.result)
121+
}
122+
}

src/meta/types/src/cmd.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ impl fmt::Display for Cmd {
7979
write!(f, "upsert_kv:{}", upsert_kv)
8080
}
8181
Cmd::Transaction(txn) => {
82-
write!(f, "txn:{:?}", txn)
82+
write!(f, "txn:{}", txn)
8383
}
8484
}
8585
}

src/meta/types/src/log_entry.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::fmt::Display;
16+
use std::fmt::Formatter;
17+
1518
use openraft::AppData;
1619
use serde::Deserialize;
1720
use serde::Serialize;
@@ -42,3 +45,16 @@ pub struct LogEntry {
4245
}
4346

4447
impl AppData for LogEntry {}
48+
49+
impl Display for LogEntry {
50+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
51+
if let Some(txid) = &self.txid {
52+
write!(f, "txid: {:?}", txid)?;
53+
}
54+
if let Some(time) = &self.time_ms {
55+
write!(f, "time: {} ms", time)?;
56+
}
57+
58+
write!(f, " cmd: {}", self.cmd)
59+
}
60+
}

src/query/service/src/sql/executor/physical_plan_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl PhysicalPlanBuilder {
126126
}
127127
if let Some(prewhere) = &scan.prewhere {
128128
// if there is a prewhere optimization,
129-
// we can prune `PhysicalScan`'s ouput schema.
129+
// we can prune `PhysicalScan`'s output schema.
130130
if prewhere.output_columns.contains(index) {
131131
name_mapping.insert(column.name().to_string(), index.to_string());
132132
}

src/query/service/src/storages/storage_table_read_plan.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ use std::collections::BTreeMap;
1616
use std::sync::Arc;
1717

1818
use common_datavalues::DataField;
19+
use common_datavalues::DataSchema;
1920
use common_exception::Result;
2021
use common_legacy_planners::Extras;
2122
use common_legacy_planners::Projection;
2223
use common_legacy_planners::ReadDataSourcePlan;
2324
use common_legacy_planners::SourceInfo;
24-
use common_meta_app::schema::TableInfo;
2525

2626
use crate::sessions::QueryContext;
2727
use crate::storages::Table;
@@ -55,16 +55,21 @@ impl ToReadDataSourcePlan for dyn Table {
5555
push_downs: Option<Extras>,
5656
) -> Result<ReadDataSourcePlan> {
5757
let (statistics, parts) = self.read_partitions(ctx, push_downs.clone()).await?;
58+
5859
let table_info = self.get_table_info();
60+
let table_meta = &table_info.meta;
5961
let description = statistics.get_description(table_info);
6062

6163
let scan_fields = match (self.benefit_column_prune(), &push_downs) {
6264
(true, Some(push_downs)) => match &push_downs.prewhere {
63-
Some(prewhere) => {
64-
extract_scan_fields_from_projection(table_info, &prewhere.output_columns)
65-
}
65+
Some(prewhere) => extract_scan_fields_from_projection(
66+
&table_meta.schema,
67+
&prewhere.output_columns,
68+
),
6669
_ => match &push_downs.projection {
67-
Some(projection) => extract_scan_fields_from_projection(table_info, projection),
70+
Some(projection) => {
71+
extract_scan_fields_from_projection(&table_meta.schema, projection)
72+
}
6873
_ => None,
6974
},
7075
},
@@ -87,15 +92,13 @@ impl ToReadDataSourcePlan for dyn Table {
8792
}
8893

8994
fn extract_scan_fields_from_projection(
90-
table_info: &TableInfo,
95+
schema: &DataSchema,
9196
projection: &Projection,
9297
) -> Option<BTreeMap<usize, DataField>> {
9398
match projection {
9499
Projection::Columns(ref indices) => {
95-
if indices.len() < table_info.schema().fields().len() {
96-
let fields = indices
97-
.iter()
98-
.map(|i| table_info.schema().field(*i).clone());
100+
if indices.len() < schema.fields().len() {
101+
let fields = indices.iter().map(|i| schema.field(*i).clone());
99102

100103
Some((indices.iter().cloned().zip(fields)).collect::<BTreeMap<_, _>>())
101104
} else {
@@ -104,7 +107,7 @@ fn extract_scan_fields_from_projection(
104107
}
105108
Projection::InnerColumns(ref path_indices) => {
106109
let column_ids: Vec<usize> = path_indices.keys().cloned().collect();
107-
let new_schema = table_info.schema().inner_project(path_indices);
110+
let new_schema = schema.inner_project(path_indices);
108111
Some(
109112
(column_ids.iter().cloned().zip(new_schema.fields().clone()))
110113
.collect::<BTreeMap<_, _>>(),

0 commit comments

Comments
 (0)