|
| 1 | +//! An experimental interface for reading and writing record batches to and from PostgreSQL |
| 2 | +
|
| 3 | +use arrow::builder::*; |
| 4 | +use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; |
| 5 | +use arrow::record_batch::RecordBatch; |
| 6 | +use chrono::Timelike; |
| 7 | +use postgres::types::*; |
| 8 | +use postgres::{Client, NoTls, Row}; |
| 9 | + |
| 10 | +fn pg_to_arrow_type(dt: &Type) -> Option<DataType> { |
| 11 | + match dt { |
| 12 | + &Type::BOOL => Some(DataType::Boolean), |
| 13 | + &Type::BYTEA | &Type::CHAR | &Type::NAME | &Type::TEXT | &Type::VARCHAR => { |
| 14 | + Some(DataType::Utf8) |
| 15 | + } |
| 16 | + &Type::INT8 => Some(DataType::Int64), |
| 17 | + &Type::INT2 => Some(DataType::Int16), |
| 18 | + &Type::INT4 => Some(DataType::Int32), |
| 19 | + // &OID => None, |
| 20 | + // &JSON => None, |
| 21 | + &Type::FLOAT4 => Some(DataType::Float32), |
| 22 | + &Type::FLOAT8 => Some(DataType::Float64), |
| 23 | + // &ABSTIME => None, |
| 24 | + // &RELTIME => None, |
| 25 | + // &TINTERVAL => None, |
| 26 | + // &MONEY => None, |
| 27 | + &Type::BOOL_ARRAY => Some(DataType::List(Box::new(DataType::Boolean))), |
| 28 | + &Type::BYTEA_ARRAY | &Type::CHAR_ARRAY | &Type::NAME_ARRAY => { |
| 29 | + Some(DataType::List(Box::new(DataType::Utf8))) |
| 30 | + } |
| 31 | + // &INT2_ARRAY => None, |
| 32 | + // &INT2_VECTOR => None, |
| 33 | + // &INT2_VECTOR_ARRAY => None, |
| 34 | + // &INT4_ARRAY => None, |
| 35 | + // &TEXT_ARRAY => None, |
| 36 | + // &INT8_ARRAY => None, |
| 37 | + // &FLOAT4_ARRAY => None, |
| 38 | + // &FLOAT8_ARRAY => None, |
| 39 | + // &ABSTIME_ARRAY => None, |
| 40 | + // &RELTIME_ARRAY => None, |
| 41 | + // &TINTERVAL_ARRAY => None, |
| 42 | + // &DATE => None, |
| 43 | + &Type::TIME => Some(DataType::Time64(TimeUnit::Microsecond)), |
| 44 | + &Type::TIMESTAMP => Some(DataType::Timestamp(TimeUnit::Millisecond)), |
| 45 | + // &TIMESTAMP_ARRAY => None, |
| 46 | + // &DATE_ARRAY => None, |
| 47 | + // &TIME_ARRAY => None, |
| 48 | + // &TIMESTAMPTZ => None, |
| 49 | + // &TIMESTAMPTZ_ARRAY => None, |
| 50 | + // &INTERVAL => None, |
| 51 | + // &INTERVAL_ARRAY => None, |
| 52 | + // &NUMERIC_ARRAY => None, |
| 53 | + // &TIMETZ => None, |
| 54 | + // &BIT => None, |
| 55 | + // &BIT_ARRAY => None, |
| 56 | + // &VARBIT => None, |
| 57 | + // &NUMERIC => None, |
| 58 | + // &UUID => None, |
| 59 | + t @ _ => panic!("Postgres type {:?} not supported", t), |
| 60 | + } |
| 61 | +} |
| 62 | + |
| 63 | +fn from_field(f: &Field, capacity: usize) -> Box<ArrayBuilder> { |
| 64 | + match f.data_type() { |
| 65 | + DataType::Boolean => Box::new(BooleanBuilder::new(capacity)), |
| 66 | + DataType::Int8 => Box::new(Int8Builder::new(capacity)), |
| 67 | + DataType::Int16 => Box::new(Int16Builder::new(capacity)), |
| 68 | + DataType::Int32 => Box::new(Int32Builder::new(capacity)), |
| 69 | + DataType::Int64 => Box::new(Int64Builder::new(capacity)), |
| 70 | + DataType::UInt8 => Box::new(UInt8Builder::new(capacity)), |
| 71 | + DataType::UInt16 => Box::new(UInt16Builder::new(capacity)), |
| 72 | + DataType::UInt32 => Box::new(UInt32Builder::new(capacity)), |
| 73 | + DataType::UInt64 => Box::new(UInt64Builder::new(capacity)), |
| 74 | + DataType::Float32 => Box::new(Float32Builder::new(capacity)), |
| 75 | + DataType::Float64 => Box::new(Float64Builder::new(capacity)), |
| 76 | + DataType::Utf8 => Box::new(BinaryBuilder::new(capacity)), |
| 77 | + t @ _ => panic!("Data type {:?} is not currently supported", t), |
| 78 | + } |
| 79 | +} |
| 80 | + |
| 81 | +// TODO can make this a common trait for DB sources |
| 82 | +pub fn read_table( |
| 83 | + connection_string: &str, |
| 84 | + table_name: &str, |
| 85 | + limit: usize, |
| 86 | + batch_size: usize, |
| 87 | +) -> Result<Vec<RecordBatch>, ()> { |
| 88 | + // create connection |
| 89 | + let mut client = Client::connect(connection_string, NoTls).unwrap(); |
| 90 | + let results = client |
| 91 | + .query(format!("SELECT * FROM {}", table_name).as_str(), &[]) |
| 92 | + .unwrap(); |
| 93 | + if results.is_empty() { |
| 94 | + return Ok(vec![]); |
| 95 | + } |
| 96 | + let schema = row_to_schema(results.get(0).unwrap()).unwrap(); |
| 97 | + let field_len = schema.fields().len(); |
| 98 | + let mut builder = StructBuilder::from_schema(schema.clone(), batch_size); |
| 99 | + let chunks = results.chunks(batch_size); |
| 100 | + let mut batches = vec![]; |
| 101 | + chunks.for_each(|chunk: &[Row]| { |
| 102 | + for j in 0..field_len { |
| 103 | + match schema.field(j).data_type() { |
| 104 | + DataType::Int32 => { |
| 105 | + let field_builder = builder.field_builder::<Int32Builder>(j).unwrap(); |
| 106 | + for i in 0..chunk.len() { |
| 107 | + let row: &Row = chunk.get(i).unwrap(); |
| 108 | + field_builder.append_value(row.get(j)).unwrap(); |
| 109 | + } |
| 110 | + } |
| 111 | + DataType::Int64 => { |
| 112 | + let field_builder = builder.field_builder::<Int64Builder>(j).unwrap(); |
| 113 | + for i in 0..chunk.len() { |
| 114 | + let row: &Row = chunk.get(i).unwrap(); |
| 115 | + field_builder.append_value(row.get(j)).unwrap(); |
| 116 | + } |
| 117 | + } |
| 118 | + DataType::Timestamp(TimeUnit::Millisecond) => { |
| 119 | + let field_builder = builder |
| 120 | + .field_builder::<TimestampMillisecondBuilder>(j) |
| 121 | + .unwrap(); |
| 122 | + for i in 0..chunk.len() { |
| 123 | + let row: &Row = chunk.get(i).unwrap(); |
| 124 | + let timestamp: chrono::NaiveDateTime = row.get(j); |
| 125 | + field_builder |
| 126 | + .append_value(timestamp.timestamp_millis()) |
| 127 | + .unwrap(); |
| 128 | + } |
| 129 | + } |
| 130 | + DataType::Time64(TimeUnit::Microsecond) => { |
| 131 | + let field_builder = builder |
| 132 | + .field_builder::<Time64MicrosecondBuilder>(j) |
| 133 | + .unwrap(); |
| 134 | + for i in 0..chunk.len() { |
| 135 | + let row: &Row = chunk.get(i).unwrap(); |
| 136 | + let time: chrono::NaiveTime = row.get(j); |
| 137 | + field_builder |
| 138 | + .append_value( |
| 139 | + time.num_seconds_from_midnight() as i64 * 1000000 |
| 140 | + + time.nanosecond() as i64 / 1000, |
| 141 | + ) |
| 142 | + .unwrap(); |
| 143 | + } |
| 144 | + } |
| 145 | + DataType::Boolean => { |
| 146 | + let field_builder = builder.field_builder::<BooleanBuilder>(j).unwrap(); |
| 147 | + for i in 0..chunk.len() { |
| 148 | + let row: &Row = chunk.get(i).unwrap(); |
| 149 | + field_builder.append_value(row.get(j)).unwrap(); |
| 150 | + } |
| 151 | + } |
| 152 | + DataType::Utf8 => { |
| 153 | + let field_builder = builder.field_builder::<BinaryBuilder>(j).unwrap(); |
| 154 | + for i in 0..chunk.len() { |
| 155 | + let row: &Row = chunk.get(i).unwrap(); |
| 156 | + field_builder.append_string(row.get(j)).unwrap(); |
| 157 | + } |
| 158 | + } |
| 159 | + t @ _ => panic!("Field builder for {:?} not yet supported", t), |
| 160 | + } |
| 161 | + } |
| 162 | + builder.append(true).unwrap(); |
| 163 | + batches.push(builder.finish().flatten()); |
| 164 | + }); |
| 165 | + Ok(batches) |
| 166 | +} |
| 167 | + |
| 168 | +fn populate_builder() {} |
| 169 | + |
| 170 | +fn row_to_schema(row: &postgres::Row) -> Result<Schema, ()> { |
| 171 | + let fields = row |
| 172 | + .columns() |
| 173 | + .iter() |
| 174 | + .map(|col: &postgres::Column| { |
| 175 | + Field::new(col.name(), pg_to_arrow_type(col.type_()).unwrap(), true) |
| 176 | + }) |
| 177 | + .collect(); |
| 178 | + Ok(Schema::new(fields)) |
| 179 | +} |
0 commit comments