Skip to content

Commit 963de7d

Browse files
committed
fix: worker loop error
1 parent e285642 commit 963de7d

File tree

3 files changed

+39
-35
lines changed

3 files changed

+39
-35
lines changed

src/backend.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ unsafe extern "C" fn create_df_scan_state(cscan: *mut CustomScan) -> *mut Node {
104104
continue;
105105
}
106106
match header.packet {
107+
Packet::None => {
108+
// No data, just continue waiting.
109+
continue;
110+
}
107111
Packet::Failure => {
108112
let msg = read_error(&mut stream).expect("Failed to read the error message");
109113
error!("Failed to compile the query: {}", msg);

src/protocol.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,11 @@ impl TryFrom<u8> for Direction {
5151
#[derive(Clone, Debug, Default, PartialEq)]
5252
pub enum Packet {
5353
#[default]
54-
Bind = 0,
55-
Failure = 1,
56-
Metadata = 2,
57-
Parse = 3,
54+
None = 0,
55+
Bind = 1,
56+
Failure = 2,
57+
Metadata = 3,
58+
Parse = 4,
5859
}
5960

6061
impl TryFrom<u8> for Packet {
@@ -63,10 +64,11 @@ impl TryFrom<u8> for Packet {
6364
fn try_from(value: u8) -> Result<Self, Self::Error> {
6465
assert!(value < 128);
6566
match value {
66-
0 => Ok(Packet::Bind),
67-
1 => Ok(Packet::Failure),
68-
2 => Ok(Packet::Metadata),
69-
3 => Ok(Packet::Parse),
67+
0 => Ok(Packet::None),
68+
1 => Ok(Packet::Bind),
69+
2 => Ok(Packet::Failure),
70+
3 => Ok(Packet::Metadata),
71+
4 => Ok(Packet::Parse),
7072
_ => Err(FusionError::Deserialize("packet".to_string(), value.into())),
7173
}
7274
}

src/worker.rs

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::ipc::{
66
};
77
use crate::protocol::{
88
consume_header, prepare_metadata, read_params, read_query, request_params, send_error,
9-
send_table_refs, Direction, Flag, Header, Packet,
9+
send_table_refs, write_header, Direction, Flag, Header, Packet,
1010
};
1111
use crate::sql::Catalog;
1212
use anyhow::Result;
@@ -84,6 +84,21 @@ impl WorkerContext {
8484
}
8585
}
8686

87+
fn init_slots() -> Result<()> {
88+
for locked_slot in Bus::new().into_iter().flatten() {
89+
let mut stream = SlotStream::from(locked_slot);
90+
stream.reset();
91+
let header = Header {
92+
direction: Direction::ToBackend,
93+
packet: Packet::None,
94+
length: 0,
95+
flag: Flag::Last,
96+
};
97+
write_header(&mut stream, &header)?;
98+
}
99+
Ok(())
100+
}
101+
87102
#[pg_guard]
88103
#[no_mangle]
89104
pub extern "C" fn worker_main(_arg: pg_sys::Datum) {
@@ -98,10 +113,13 @@ pub extern "C" fn worker_main(_arg: pg_sys::Datum) {
98113
let mut do_retry = false;
99114
let mut slots_with_error: Vec<(SlotNumber, SmolStr)> =
100115
vec![(INVALID_SLOT_NUMBER, "".into()); max_backends() as usize];
116+
init_slots().expect("Failed to initialize slots");
101117

102-
rt.block_on(async {
103-
log!("DataFusion worker is running");
104-
while do_retry || BackgroundWorker::wait_latch(Some(WORKER_WAIT_TIMEOUT)) {
118+
log!("DataFusion worker is running");
119+
while do_retry || BackgroundWorker::wait_latch(Some(WORKER_WAIT_TIMEOUT)) {
120+
// Do not use any pgrx API in this loop: it is multithreaded while PostgreSQL
121+
// functions can work only in the main thread.
122+
rt.block_on(async {
105123
do_retry = false;
106124
for (id, locked_slot) in Bus::new().into_iter().enumerate() {
107125
let Some(slot) = locked_slot else {
@@ -191,16 +209,15 @@ pub extern "C" fn worker_main(_arg: pg_sys::Datum) {
191209
response_error(*slot_id, &mut ctx, stream, msg);
192210
}
193211
}
194-
}
195-
});
212+
});
213+
}
196214
}
197215

198216
async fn parse(header: Header, mut stream: SlotStream) -> Result<TaskResult> {
199217
assert_eq!(header.packet, Packet::Parse);
200218
// TODO: handle long queries that span multiple packets.
201219
assert_eq!(header.flag, Flag::Last);
202220
let (query, _) = read_query(&mut stream)?;
203-
log!("Received query: {}", query);
204221

205222
let stmts = DFParser::parse_sql(query)?;
206223
let Some(stmt) = stmts.into_iter().next() else {
@@ -234,26 +251,7 @@ async fn bind(
234251
Ok(TaskResult::Bind(plan))
235252
}
236253

237-
#[inline]
238-
fn slot_warning() {
239-
// The slot should be locked before sending the response.
240-
// Normally this operation can not fail because its backend
241-
// is waiting for response and should not hold any locks.
242-
// But if it does it means that the old backend was terminated
243-
// and some other one acquired the same slot. So, fsm should
244-
// be reset to initial value.
245-
warning!(
246-
"{} {} {} {}",
247-
"Failed to lock the slot for error response.",
248-
"Looks like the old backend was terminated",
249-
"and the slot is acquired by another backend.",
250-
"The state machine will be reset to the initial state.",
251-
);
252-
}
253-
254254
fn response_error(id: SlotNumber, ctx: &mut WorkerContext, stream: SlotStream, message: &str) {
255255
ctx.flush(id);
256-
if let Err(err) = send_error(id, stream, message) {
257-
warning!("Failed to send the error message: {}", err);
258-
}
256+
send_error(id, stream, message).expect("Failed to send error response");
259257
}

0 commit comments

Comments
 (0)