Skip to content

Commit dabfc62

Browse files
Decompress and return arrow data correctly
1 parent 7b4f1ad commit dabfc62

File tree

1 file changed

+50
-53
lines changed

1 file changed

+50
-53
lines changed

src/main/java/io/tiledb/cloud/TileDBSQL.java

Lines changed: 50 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,33 @@
22

33
import io.tiledb.cloud.rest_api.ApiException;
44
import io.tiledb.cloud.rest_api.api.SqlApi;
5-
import io.tiledb.cloud.rest_api.model.ResultFormat;
65
import io.tiledb.cloud.rest_api.model.SQLParameters;
76
import org.apache.arrow.memory.RootAllocator;
7+
import org.apache.arrow.vector.FieldVector;
8+
import org.apache.arrow.vector.ValueVector;
89
import org.apache.arrow.vector.VectorSchemaRoot;
910
import org.apache.arrow.vector.ipc.ArrowStreamReader;
10-
import org.apache.arrow.vector.types.pojo.Schema;
11+
import org.apache.arrow.vector.util.TransferPair;
1112

1213
import java.io.ByteArrayInputStream;
1314
import java.io.IOException;
14-
import java.util.ArrayList;
15-
import java.util.Arrays;
16-
import java.util.List;
17-
import java.util.Objects;
15+
import java.util.*;
16+
import org.apache.arrow.compression.CommonsCompressionFactory;
1817

19-
public class TileDBSQL {
20-
String namespace;
18+
public class TileDBSQL implements AutoCloseable{
19+
private String namespace;
2120

22-
SQLParameters sql;
21+
private SQLParameters sql;
2322

24-
TileDBClient tileDBClient;
23+
private TileDBClient tileDBClient;
2524

26-
SqlApi apiInstance;
25+
private SqlApi apiInstance;
2726

28-
ArrayList<VectorSchemaRoot> readBatches;
27+
private ArrayList<VectorSchemaRoot> readBatches;
2928

30-
List<Object> results;
29+
private List<Object> results;
30+
31+
private ArrowStreamReader reader;
3132

3233
/**
3334
*
@@ -48,29 +49,40 @@ public TileDBSQL(TileDBClient tileDBClient, String namespace, SQLParameters sql)
4849

4950
/**
5051
* Exec an SQL query and get results in arrow format.
52+
*
53+
* @return A pair that consists of an ArrayList of all valueVectors and the
54+
* number of batches read.
5155
*/
52-
public void execArrow(){
56+
public io.tiledb.java.api.Pair<ArrayList<ValueVector>, Integer> execArrow(){
5357
try {
5458
assert sql.getResultFormat() != null;
55-
byte[] bytes = apiInstance.runSQLBytes(namespace, sql, sql.getResultFormat().toString());
56-
System.out.println(Arrays.toString(bytes));
59+
byte[] bytes = apiInstance.runSQLBytes(namespace, sql, "none");
60+
ArrayList<ValueVector> valueVectors = null;
61+
int readBatchesCount = 0;
5762

5863
RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
59-
try (ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(bytes), allocator)) {
60-
while (reader.loadNextBatch()) {
61-
// This will be loaded with new values on every call to loadNextBatch
62-
VectorSchemaRoot readBatch = reader.getVectorSchemaRoot();
63-
readBatches.add(readBatch);
64+
ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(bytes), allocator, CommonsCompressionFactory.INSTANCE);
65+
66+
VectorSchemaRoot root = reader.getVectorSchemaRoot();
67+
68+
while(reader.loadNextBatch()) {
69+
readBatchesCount++;
70+
valueVectors = new ArrayList<>();
71+
for (FieldVector f : root.getFieldVectors()) {
72+
// transfer will not copy data but transfer ownership of memory
73+
// from ArrowStreamReader to TileDBSQL. This is necessary because
74+
// otherwise we are not able to close the reader and retain the
75+
// data.
76+
TransferPair t = f.getTransferPair(allocator);
77+
t.transfer();
78+
valueVectors.add(t.getTo());
6479
}
65-
} catch (IOException e) {
66-
throw new RuntimeException(e);
6780
}
68-
} catch (ApiException e) {
69-
System.err.println("Exception when calling SqlApi#runSQL/runSQLBytes");
70-
System.err.println("Status code: " + e.getCode());
71-
System.err.println("Reason: " + e.getResponseBody());
72-
System.err.println("Response headers: " + e.getResponseHeaders());
73-
e.printStackTrace();
81+
reader.close();
82+
return new io.tiledb.java.api.Pair<>(valueVectors, readBatchesCount);
83+
84+
} catch (IOException | ApiException e) {
85+
throw new RuntimeException(e);
7486
}
7587
}
7688

@@ -79,43 +91,28 @@ public void execArrow(){
7991
*
8092
* @return
8193
*/
82-
public void execStandard(){
94+
public List<Object> exec(){
8395
try {
8496
assert sql.getResultFormat() != null;
85-
results = apiInstance.runSQL(namespace, sql, sql.getResultFormat().toString());
97+
return apiInstance.runSQL(namespace, sql, sql.getResultFormat().toString());
8698
} catch (ApiException e) {
8799
System.err.println("Exception when calling SqlApi#runSQL/runSQLBytes");
88100
System.err.println("Status code: " + e.getCode());
89101
System.err.println("Reason: " + e.getResponseBody());
90102
System.err.println("Response headers: " + e.getResponseHeaders());
91103
e.printStackTrace();
92104
}
105+
return null;
93106
}
94107

95108
/**
96-
* Exec an SQL query
109+
*
97110
*/
98-
public void exec(){
99-
if (this.sql.getResultFormat() == ResultFormat.ARROW){
100-
execArrow();
101-
}else {
102-
execStandard();
111+
public void close(){
112+
try {
113+
reader.close();
114+
} catch (IOException e) {
115+
throw new RuntimeException(e);
103116
}
104117
}
105-
106-
/**
107-
* Get the results in Arrow format
108-
* @return
109-
*/
110-
public ArrayList<VectorSchemaRoot> getReadBatches() {
111-
return readBatches;
112-
}
113-
114-
/**
115-
* Get the results as lists of Objects
116-
* @return
117-
*/
118-
public List<Object> getResults() {
119-
return results;
120-
}
121118
}

0 commit comments

Comments
 (0)