Skip to content

Commit f24f347

Browse files
add support for JSON and Arrow return formats
1 parent 5d8b1b0 commit f24f347

File tree

3 files changed

+246
-85
lines changed

3 files changed

+246
-85
lines changed
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package io.tiledb.cloud;
2+
3+
import com.google.gson.Gson;
4+
import io.tiledb.cloud.rest_api.ApiException;
5+
import io.tiledb.cloud.rest_api.api.UdfApi;
6+
import io.tiledb.cloud.rest_api.model.GenericUDF;
7+
import io.tiledb.cloud.rest_api.model.MultiArrayUDF;
8+
import io.tiledb.cloud.rest_api.model.ResultFormat;
9+
import org.apache.arrow.compression.CommonsCompressionFactory;
10+
import org.apache.arrow.memory.RootAllocator;
11+
import org.apache.arrow.memory.UnsafeAllocationManager;
12+
import org.apache.arrow.vector.FieldVector;
13+
import org.apache.arrow.vector.ValueVector;
14+
import org.apache.arrow.vector.VectorSchemaRoot;
15+
import org.apache.arrow.vector.ipc.ArrowStreamReader;
16+
import org.apache.arrow.vector.util.TransferPair;
17+
import org.json.JSONObject;
18+
19+
import java.io.ByteArrayInputStream;
20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.HashMap;
23+
24+
public class TileDBUDF {
25+
private TileDBClient tileDBClient;
26+
private String namespace;
27+
private UdfApi apiInstance;
28+
29+
public TileDBUDF(TileDBClient tileDBClient, String namespace) {
30+
this.tileDBClient = tileDBClient;
31+
this.namespace = namespace;
32+
this.apiInstance = new UdfApi(this.tileDBClient.getApiClient());
33+
34+
}
35+
36+
/**
37+
* Executes a generic-UDF. A generic-UDF is a UDF that is not using a TIleDB array.
38+
*
39+
* @param genericUDF The generic UDF definition
40+
* @param arguments The UDF arguments
41+
* @return The result in String format
42+
*/
43+
public String executeGeneric(GenericUDF genericUDF, HashMap<String, Object> arguments){
44+
String serializedArgs = serializeArgs(arguments);
45+
genericUDF.setArgument(serializedArgs);
46+
try {
47+
return apiInstance.submitGenericUDFString(namespace, genericUDF, "none");
48+
} catch (ApiException e) {
49+
System.err.println("Exception when calling UdfApi#submitGenericUDF");
50+
System.err.println("Status code: " + e.getCode());
51+
System.err.println("Reason: " + e.getResponseBody());
52+
System.err.println("Response headers: " + e.getResponseHeaders());
53+
e.printStackTrace();
54+
}
55+
return null;
56+
}
57+
58+
/**
59+
* Executes a generic-UDF. A generic-UDF is a UDF that is not using a TIleDB array.
60+
*
61+
* @param genericUDF The generic UDF definition
62+
* @param arguments The UDF arguments
63+
* @return The result in JSON format
64+
*/
65+
public JSONObject executeGenericJSON(GenericUDF genericUDF, HashMap<String, Object> arguments){
66+
String serializedArgs = serializeArgs(arguments);
67+
genericUDF.setArgument(serializedArgs);
68+
genericUDF.setResultFormat(ResultFormat.JSON);
69+
try {
70+
String jsonString = apiInstance.submitGenericUDFString(namespace, genericUDF, "none");
71+
return new JSONObject(jsonString);
72+
} catch (ApiException e) {
73+
System.err.println("Exception when calling UdfApi#submitGenericUDF");
74+
System.err.println("Status code: " + e.getCode());
75+
System.err.println("Reason: " + e.getResponseBody());
76+
System.err.println("Response headers: " + e.getResponseHeaders());
77+
e.printStackTrace();
78+
}
79+
return null;
80+
}
81+
82+
/**
83+
* Executes a generic-UDF. A generic-UDF is a UDF that is not using a TIleDB array.
84+
*
85+
* @param genericUDF The generic UDF definition
86+
* @param arguments The UDF arguments
87+
* @return A pair that consists of an ArrayList of all valueVectors and the number of batches read.
88+
*/
89+
public io.tiledb.java.api.Pair<ArrayList<ValueVector>, Integer> executeGenericArrow(GenericUDF genericUDF, HashMap<String, Object> arguments){
90+
String serializedArgs = serializeArgs(arguments);
91+
genericUDF.setArgument(serializedArgs);
92+
genericUDF.setResultFormat(ResultFormat.ARROW);
93+
try {
94+
byte[] bytes = apiInstance.submitGenericUDFBytes(namespace, genericUDF, "none");
95+
ArrayList<ValueVector> valueVectors = null;
96+
int readBatchesCount = 0;
97+
98+
RootAllocator allocator = new RootAllocator(RootAllocator.configBuilder().allocationManagerFactory(UnsafeAllocationManager.FACTORY).build());
99+
ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(bytes), allocator, CommonsCompressionFactory.INSTANCE);
100+
101+
VectorSchemaRoot root = reader.getVectorSchemaRoot();
102+
103+
while(reader.loadNextBatch()) {
104+
readBatchesCount++;
105+
valueVectors = new ArrayList<>();
106+
for (FieldVector f : root.getFieldVectors()) {
107+
// transfer will not copy data but transfer ownership of memory
108+
// from ArrowStreamReader to TileDBSQL. This is necessary because
109+
// otherwise we are not able to close the reader and retain the
110+
// data.
111+
TransferPair t = f.getTransferPair(allocator);
112+
t.transfer();
113+
valueVectors.add(t.getTo());
114+
}
115+
}
116+
reader.close();
117+
return new io.tiledb.java.api.Pair<>(valueVectors, readBatchesCount);
118+
} catch (IOException | ApiException e) {
119+
e.printStackTrace();
120+
}
121+
return null;
122+
}
123+
124+
/**
125+
* Executes an array-UDF. An array-UDF is a UDF applied to a TileDB array
126+
*
127+
* @param multiArrayUDF The array-UDF. Can reference one arrays
128+
* @param arguments The UDF arguments
129+
* @param arrayURI The array URI
130+
* @param xPayer Name of organization or user who should be charged for this request
131+
* @return
132+
*/
133+
public String executeSingleArray(MultiArrayUDF multiArrayUDF, HashMap<String, Object> arguments, String arrayURI, String xPayer){
134+
String serializedArgs = serializeArgs(arguments);
135+
multiArrayUDF.setArgument(serializedArgs);
136+
//split uri to get namespace and array name
137+
arrayURI = arrayURI.replaceAll("tiledb://", ""); //remove tiledb prefix
138+
String[] split = arrayURI.split("/");
139+
if (split.length != 2)
140+
throw new RuntimeException(
141+
"TileDB URI is in the wrong format. The format should be: tiledb://namespace/array_name");
142+
try {
143+
return apiInstance.submitUDFString(split[0], split[1], multiArrayUDF, xPayer, "none", "");
144+
} catch (ApiException e) {
145+
System.err.println("Exception when calling UdfApi#submitUDF");
146+
System.err.println("Status code: " + e.getCode());
147+
System.err.println("Reason: " + e.getResponseBody());
148+
System.err.println("Response headers: " + e.getResponseHeaders());
149+
e.printStackTrace();
150+
}
151+
return null;
152+
}
153+
154+
/**
155+
* Serializes the arguments to a String
156+
*
157+
* @param arguments The input arguments in a HashMap
158+
* @return The arguments in a String
159+
*/
160+
private String serializeArgs(HashMap<String, Object> arguments) {
161+
if (arguments == null || arguments.isEmpty()) return "";
162+
Gson gson = new Gson();
163+
return gson.toJson(arguments);
164+
}
165+
}

src/main/java/io/tiledb/cloud/TileDBUDFManager.java

Lines changed: 0 additions & 85 deletions
This file was deleted.

src/main/java/io/tiledb/cloud/rest_api/api/UdfApi.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1132,6 +1132,87 @@ public ApiResponse<File> submitGenericUDFWithHttpInfo(String namespace, GenericU
11321132
return localVarApiClient.execute(localVarCall, localVarReturnType);
11331133
}
11341134

1135+
/**
1136+
*
1137+
* submit a generic UDF in the given namespace
1138+
* @param namespace namespace array is in (an organization name or user&#39;s username) (required)
1139+
* @param udf UDF to run (required)
1140+
* @param acceptEncoding Encoding to use (optional)
1141+
* @return byte[]
1142+
* @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body
1143+
* @http.response.details
1144+
<table summary="Response Details" border="1">
1145+
<tr><td> Status Code </td><td> Description </td><td> Response Headers </td></tr>
1146+
<tr><td> 200 </td><td> UDF completed and the UDF-type specific result is returned </td><td> * X-TILEDB-CLOUD-TASK-ID - Task ID for just completed request <br> </td></tr>
1147+
<tr><td> 0 </td><td> error response </td><td> * X-TILEDB-CLOUD-TASK-ID - Task ID for just completed request <br> </td></tr>
1148+
</table>
1149+
*/
1150+
public byte[] submitGenericUDFBytes(String namespace, GenericUDF udf, String acceptEncoding) throws ApiException {
1151+
ApiResponse<byte[]> localVarResp = submitGenericUDFWithHttpInfoBytes(namespace, udf, acceptEncoding);
1152+
return localVarResp.getData();
1153+
}
1154+
1155+
/**
1156+
*
1157+
* submit a generic UDF in the given namespace
1158+
* @param namespace namespace array is in (an organization name or user&#39;s username) (required)
1159+
* @param udf UDF to run (required)
1160+
* @param acceptEncoding Encoding to use (optional)
1161+
* @return ApiResponse&lt;File&gt;
1162+
* @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body
1163+
* @http.response.details
1164+
<table summary="Response Details" border="1">
1165+
<tr><td> Status Code </td><td> Description </td><td> Response Headers </td></tr>
1166+
<tr><td> 200 </td><td> UDF completed and the UDF-type specific result is returned </td><td> * X-TILEDB-CLOUD-TASK-ID - Task ID for just completed request <br> </td></tr>
1167+
<tr><td> 0 </td><td> error response </td><td> * X-TILEDB-CLOUD-TASK-ID - Task ID for just completed request <br> </td></tr>
1168+
</table>
1169+
*/
1170+
public ApiResponse<byte[]> submitGenericUDFWithHttpInfoBytes(String namespace, GenericUDF udf, String acceptEncoding) throws ApiException {
1171+
okhttp3.Call localVarCall = submitGenericUDFValidateBeforeCall(namespace, udf, acceptEncoding, null);
1172+
Type localVarReturnType = new TypeToken<byte[]>(){}.getType();
1173+
return localVarApiClient.execute(localVarCall, localVarReturnType);
1174+
}
1175+
1176+
/**
1177+
*
1178+
* submit a generic UDF in the given namespace
1179+
* @param namespace namespace array is in (an organization name or user&#39;s username) (required)
1180+
* @param udf UDF to run (required)
1181+
* @param acceptEncoding Encoding to use (optional)
1182+
* @return list<Object>
1183+
* @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body
1184+
* @http.response.details
1185+
<table summary="Response Details" border="1">
1186+
<tr><td> Status Code </td><td> Description </td><td> Response Headers </td></tr>
1187+
<tr><td> 200 </td><td> UDF completed and the UDF-type specific result is returned </td><td> * X-TILEDB-CLOUD-TASK-ID - Task ID for just completed request <br> </td></tr>
1188+
<tr><td> 0 </td><td> error response </td><td> * X-TILEDB-CLOUD-TASK-ID - Task ID for just completed request <br> </td></tr>
1189+
</table>
1190+
*/
1191+
public Object submitGenericUDFObj(String namespace, GenericUDF udf, String acceptEncoding) throws ApiException {
1192+
ApiResponse<Object> localVarResp = submitGenericUDFWithHttpInfoObj(namespace, udf, acceptEncoding);
1193+
return localVarResp.getData();
1194+
}
1195+
1196+
/**
1197+
*
1198+
* submit a generic UDF in the given namespace
1199+
* @param namespace namespace array is in (an organization name or user&#39;s username) (required)
1200+
* @param udf UDF to run (required)
1201+
* @param acceptEncoding Encoding to use (optional)
1202+
* @return ApiResponse&lt;File&gt;
1203+
* @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body
1204+
* @http.response.details
1205+
<table summary="Response Details" border="1">
1206+
<tr><td> Status Code </td><td> Description </td><td> Response Headers </td></tr>
1207+
<tr><td> 200 </td><td> UDF completed and the UDF-type specific result is returned </td><td> * X-TILEDB-CLOUD-TASK-ID - Task ID for just completed request <br> </td></tr>
1208+
<tr><td> 0 </td><td> error response </td><td> * X-TILEDB-CLOUD-TASK-ID - Task ID for just completed request <br> </td></tr>
1209+
</table>
1210+
*/
1211+
public ApiResponse<Object> submitGenericUDFWithHttpInfoObj(String namespace, GenericUDF udf, String acceptEncoding) throws ApiException {
1212+
okhttp3.Call localVarCall = submitGenericUDFValidateBeforeCall(namespace, udf, acceptEncoding, null);
1213+
Type localVarReturnType = new TypeToken<Object>(){}.getType();
1214+
return localVarApiClient.execute(localVarCall, localVarReturnType);
1215+
}
11351216
/**
11361217
* (asynchronously)
11371218
* submit a generic UDF in the given namespace

0 commit comments

Comments
 (0)