Skip to content

Commit 46e46d5

Browse files
authored
Support native PostgreSQL logging format (#483)
* feat: support native PostgreSQL logging format Signed-off-by: jinweios <jinwei.peng@beingthink.com> * fix: csv format log output correction Signed-off-by: jinweios <jinwei.peng@beingthink.com> * refactor: change log_desitation judgment from string to intt Signed-off-by: jinweios <jinwei.peng@beingthink.com> * chore: put the code under src/logger, remove logger crate Signed-off-by: jinweios <jinwei.peng@beingthink.com> * feat: log transfer supports large chunk writing Signed-off-by: jinweios <jinwei.peng@beingthink.com> * chore: remove test code Signed-off-by: jinweios <jinwei.peng@beingthink.com> * chore: remove no used mod Signed-off-by: jinweios <jinwei.peng@beingthink.com> * fix: add logger dir declaration Signed-off-by: jinweios <jinwei.peng@beingthink.com> * chore: replace PIPE_BUF with sizeof PipeProtoChunk Signed-off-by: jinweios <jinwei.peng@beingthink.com> --------- Signed-off-by: jinweios <jinwei.peng@beingthink.com>
1 parent 430733d commit 46e46d5

File tree

7 files changed

+263
-4
lines changed

7 files changed

+263
-4
lines changed

Cargo.lock

Lines changed: 84 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ arrayvec.workspace = true
2121
bincode.workspace = true
2222
bytemuck.workspace = true
2323
byteorder.workspace = true
24+
chrono = "0.4.38"
25+
csv = "1.3.0"
2426
env_logger = "0.11.2"
2527
libc.workspace = true
2628
log.workspace = true

src/bgworker/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,14 @@ extern "C" fn _vectors_main(_arg: pgrx::pg_sys::Datum) {
3434
pub layout: std::alloc::Layout,
3535
}
3636
{
37-
let mut builder = env_logger::builder();
38-
builder.target(env_logger::Target::Stderr);
37+
let mut builder = crate::logger::VectorLogger::build();
3938
#[cfg(not(debug_assertions))]
4039
{
41-
builder.filter(None, log::LevelFilter::Info);
40+
builder.filter_level(log::LevelFilter::Info);
4241
}
4342
#[cfg(debug_assertions)]
4443
{
45-
builder.filter(None, log::LevelFilter::Trace);
44+
builder.filter_level(log::LevelFilter::Trace);
4645
}
4746
builder.init();
4847
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ mod error;
1717
mod gucs;
1818
mod index;
1919
mod ipc;
20+
mod logger;
2021
mod upgrade;
2122
mod utils;
2223

src/logger/message.rs

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
use chrono::Local;
2+
use pgrx::pg_sys::PipeProtoChunk;
3+
use serde::Serialize;
4+
use std::io::{stderr, Write};
5+
use std::{process, vec};
6+
7+
const PIPE_PROTO_DEST_STDERR: u8 = 0x10;
8+
const PIPE_PROTO_DEST_CSVLOG: u8 = 0x20;
9+
const PIPE_PROTO_DEST_JSONLOG: u8 = 0x40;
10+
const PIPE_PROTO_IS_LAST: u8 = 0x01;
11+
12+
const PIPE_CHUNK_SIZE: usize = std::mem::size_of::<PipeProtoChunk>();
13+
const PIPE_HEADER_SIZE: usize = 9;
14+
const PIPE_MAX_PAYLOAD: usize = PIPE_CHUNK_SIZE - PIPE_HEADER_SIZE;
15+
16+
struct PipeProtoHeader {
17+
nuls: [u8; 2],
18+
len: u16,
19+
pid: i32,
20+
flags: u8,
21+
}
22+
23+
impl PipeProtoHeader {
24+
fn new(flags: u8) -> Self {
25+
Self {
26+
nuls: [0, 0],
27+
len: 0,
28+
pid: process::id() as i32,
29+
flags,
30+
}
31+
}
32+
33+
fn chunk(&self) -> Vec<u8> {
34+
let mut chunk = Vec::from(&self.nuls);
35+
chunk.extend_from_slice(&self.len.to_le_bytes());
36+
chunk.extend_from_slice(&self.pid.to_le_bytes());
37+
chunk.push(self.flags);
38+
chunk
39+
}
40+
41+
fn set_flag(&mut self, flags: u8) {
42+
self.flags = flags
43+
}
44+
45+
fn set_len(&mut self, len: usize) {
46+
self.len = len as u16;
47+
}
48+
}
49+
50+
#[derive(Serialize)]
51+
pub struct Message {
52+
timestamp: String,
53+
pid: u32,
54+
message: String,
55+
}
56+
57+
impl Message {
58+
pub fn new(msg: &str) -> Self {
59+
Self {
60+
timestamp: Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(),
61+
pid: process::id(),
62+
message: String::from(msg),
63+
}
64+
}
65+
66+
pub fn csv_chunk(&self) {
67+
let mut wtr = csv::WriterBuilder::new()
68+
.has_headers(false)
69+
.from_writer(vec![]);
70+
wtr.serialize(self).unwrap();
71+
write_pipe_chunks(wtr.into_inner().unwrap(), PIPE_PROTO_DEST_CSVLOG)
72+
}
73+
74+
pub fn stderr_chunk(&self) {
75+
let err = format!("{} [{}] LOG: {}", self.timestamp, self.pid, self.message);
76+
let mut data: Vec<u8> = Vec::new();
77+
writeln!(&mut data, "{}", &err).unwrap();
78+
write_pipe_chunks(data, PIPE_PROTO_DEST_STDERR)
79+
}
80+
81+
pub fn json_chunk(&self) {
82+
let json = serde_json::to_string(self).unwrap();
83+
let mut data: Vec<u8> = Vec::new();
84+
writeln!(&mut data, "{}", &json).unwrap();
85+
write_pipe_chunks(data, PIPE_PROTO_DEST_JSONLOG)
86+
}
87+
}
88+
89+
fn write_pipe_chunks(msg_buf: Vec<u8>, flags: u8) {
90+
let mut len = msg_buf.len();
91+
let mut header = PipeProtoHeader::new(flags);
92+
let mut fd = stderr();
93+
let mut cursor: usize = 0;
94+
while len > PIPE_MAX_PAYLOAD {
95+
header.set_len(PIPE_MAX_PAYLOAD);
96+
let data = &msg_buf[cursor..(cursor + PIPE_MAX_PAYLOAD)];
97+
let mut chunk = Vec::new();
98+
chunk.extend_from_slice(&header.chunk());
99+
chunk.extend_from_slice(data);
100+
let _ = fd.write(&chunk).unwrap();
101+
len -= PIPE_MAX_PAYLOAD;
102+
cursor += PIPE_MAX_PAYLOAD;
103+
}
104+
header.set_flag(flags | PIPE_PROTO_IS_LAST);
105+
header.set_len(len);
106+
let data = &msg_buf[cursor..(cursor + len)];
107+
let mut chunk = Vec::new();
108+
chunk.extend_from_slice(&header.chunk());
109+
chunk.extend_from_slice(data);
110+
let _ = fd.write(&chunk).unwrap();
111+
}

src/logger/mod.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
mod message;
2+
mod protocol;
3+
4+
use log::{set_boxed_logger, set_max_level, LevelFilter, Metadata, Record};
5+
use protocol::pipe_log;
6+
7+
#[derive(Debug, Clone)]
8+
pub struct VectorLogger {
9+
level: LevelFilter,
10+
}
11+
12+
impl log::Log for VectorLogger {
13+
fn enabled(&self, metadata: &Metadata) -> bool {
14+
metadata.level() <= self.level
15+
}
16+
17+
fn log(&self, record: &Record) {
18+
if self.enabled(record.metadata()) {
19+
pipe_log(&record.args().to_string());
20+
}
21+
}
22+
23+
fn flush(&self) {}
24+
}
25+
26+
impl VectorLogger {
27+
pub fn init(&self) {
28+
set_boxed_logger(Box::new(self.clone())).unwrap();
29+
set_max_level(self.level);
30+
}
31+
32+
pub fn build() -> Self {
33+
VectorLogger {
34+
level: LevelFilter::Info,
35+
}
36+
}
37+
38+
pub fn filter_level(&mut self, level: LevelFilter) {
39+
self.level = level;
40+
}
41+
}

src/logger/protocol.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use super::message::Message;
2+
use pgrx::pg_sys::Log_destination;
3+
4+
const LOG_DESTINATION_STDERR: u32 = 1;
5+
const LOG_DESTINATION_CSVLOG: u32 = 8;
6+
const LOG_DESTINATION_JSONLOG: u32 = 16;
7+
8+
pub fn pipe_log(msg: &str) {
9+
let message = Message::new(msg);
10+
unsafe {
11+
if Log_destination as u32 & LOG_DESTINATION_CSVLOG != 0 {
12+
message.csv_chunk()
13+
}
14+
if Log_destination as u32 & LOG_DESTINATION_STDERR != 0 {
15+
message.stderr_chunk()
16+
}
17+
if Log_destination as u32 & LOG_DESTINATION_JSONLOG != 0 {
18+
message.json_chunk()
19+
}
20+
}
21+
}

0 commit comments

Comments
 (0)