@@ -7,6 +7,9 @@ use chrono::Timelike;
7
7
use postgres:: types:: * ;
8
8
use postgres:: { Client , NoTls , Row } ;
9
9
10
+ /// Convert Postgres Type to Arrow DataType
11
+ ///
12
+ /// Not all types are covered, but can be easily added
10
13
fn pg_to_arrow_type ( dt : & Type ) -> Option < DataType > {
11
14
match dt {
12
15
& Type :: BOOL => Some ( DataType :: Boolean ) ,
@@ -60,24 +63,6 @@ fn pg_to_arrow_type(dt: &Type) -> Option<DataType> {
60
63
}
61
64
}
62
65
63
- fn from_field ( f : & Field , capacity : usize ) -> Box < ArrayBuilder > {
64
- match f. data_type ( ) {
65
- DataType :: Boolean => Box :: new ( BooleanBuilder :: new ( capacity) ) ,
66
- DataType :: Int8 => Box :: new ( Int8Builder :: new ( capacity) ) ,
67
- DataType :: Int16 => Box :: new ( Int16Builder :: new ( capacity) ) ,
68
- DataType :: Int32 => Box :: new ( Int32Builder :: new ( capacity) ) ,
69
- DataType :: Int64 => Box :: new ( Int64Builder :: new ( capacity) ) ,
70
- DataType :: UInt8 => Box :: new ( UInt8Builder :: new ( capacity) ) ,
71
- DataType :: UInt16 => Box :: new ( UInt16Builder :: new ( capacity) ) ,
72
- DataType :: UInt32 => Box :: new ( UInt32Builder :: new ( capacity) ) ,
73
- DataType :: UInt64 => Box :: new ( UInt64Builder :: new ( capacity) ) ,
74
- DataType :: Float32 => Box :: new ( Float32Builder :: new ( capacity) ) ,
75
- DataType :: Float64 => Box :: new ( Float64Builder :: new ( capacity) ) ,
76
- DataType :: Utf8 => Box :: new ( BinaryBuilder :: new ( capacity) ) ,
77
- t @ _ => panic ! ( "Data type {:?} is not currently supported" , t) ,
78
- }
79
- }
80
-
81
66
// TODO can make this a common trait for DB sources
82
67
pub fn read_table (
83
68
connection_string : & str ,
@@ -105,14 +90,20 @@ pub fn read_table(
105
90
let field_builder = builder. field_builder :: < Int32Builder > ( j) . unwrap ( ) ;
106
91
for i in 0 ..chunk. len ( ) {
107
92
let row: & Row = chunk. get ( i) . unwrap ( ) ;
108
- field_builder. append_value ( row. get ( j) ) . unwrap ( ) ;
93
+ match row. try_get ( j) {
94
+ Ok ( value) => field_builder. append_value ( value) . unwrap ( ) ,
95
+ Err ( _) => field_builder. append_null ( ) . unwrap ( ) ,
96
+ } ;
109
97
}
110
98
}
111
99
DataType :: Int64 => {
112
100
let field_builder = builder. field_builder :: < Int64Builder > ( j) . unwrap ( ) ;
113
101
for i in 0 ..chunk. len ( ) {
114
102
let row: & Row = chunk. get ( i) . unwrap ( ) ;
115
- field_builder. append_value ( row. get ( j) ) . unwrap ( ) ;
103
+ match row. try_get ( j) {
104
+ Ok ( value) => field_builder. append_value ( value) . unwrap ( ) ,
105
+ Err ( _) => field_builder. append_null ( ) . unwrap ( ) ,
106
+ } ;
116
107
}
117
108
}
118
109
DataType :: Timestamp ( TimeUnit :: Millisecond ) => {
@@ -160,13 +151,12 @@ pub fn read_table(
160
151
}
161
152
}
162
153
builder. append ( true ) . unwrap ( ) ;
163
- batches. push ( builder. finish ( ) . flatten ( ) ) ;
154
+ batches. push ( RecordBatch :: from ( & builder. finish ( ) ) ) ;
164
155
} ) ;
165
156
Ok ( batches)
166
157
}
167
158
168
- fn populate_builder ( ) { }
169
-
159
+ /// Generate Arrow schema from a row
170
160
fn row_to_schema ( row : & postgres:: Row ) -> Result < Schema , ( ) > {
171
161
let fields = row
172
162
. columns ( )
0 commit comments