@@ -10,7 +10,9 @@ import {
10
10
SASProtocol ,
11
11
generateBlobSASQueryParameters ,
12
12
} from '@azure/storage-blob' ;
13
- import { DriverCapabilities , UnloadOptions , } from '@cubejs-backend/base-driver' ;
13
+ import {
14
+ DownloadTableCSVData ,
15
+ } from '@cubejs-backend/base-driver' ;
14
16
import {
15
17
JDBCDriver ,
16
18
JDBCDriverConfiguration ,
@@ -192,23 +194,6 @@ export class DatabricksDriver extends JDBCDriver {
192
194
return result ;
193
195
}
194
196
195
- private async queryColumnTypes ( sql : string , params : unknown [ ] ) {
196
- const result = [ ] ;
197
- // eslint-disable-next-line camelcase
198
- const response = await this . query < { col_name : string ; data_type : string } > ( `DESCRIBE QUERY ${ sql } ` , params ) ;
199
-
200
- for ( const column of response ) {
201
- // Databricks describe additional info by default after empty line.
202
- if ( column . col_name === '' ) {
203
- break ;
204
- }
205
-
206
- result . push ( { name : column . col_name , type : this . toGenericType ( column . data_type ) } ) ;
207
- }
208
-
209
- return result ;
210
- }
211
-
212
197
public async getTablesQuery ( schemaName : string ) {
213
198
const response = await this . query ( `SHOW TABLES IN ${ this . quoteIdentifier ( schemaName ) } ` , [ ] ) ;
214
199
@@ -263,22 +248,21 @@ export class DatabricksDriver extends JDBCDriver {
263
248
return this . config . exportBucket !== undefined ;
264
249
}
265
250
266
- public async unload ( tableName : string , options : UnloadOptions ) {
267
- if ( ! [ 'azure' , 's3' ] . includes ( this . config . bucketType as string ) ) {
268
- throw new Error ( `Unsupported export bucket type: ${
269
- this . config . bucketType
270
- } `) ;
271
- }
272
-
273
- const types = options . query ?
274
- await this . unloadWithSql ( tableName , options . query . sql , options . query . params ) :
275
- await this . unloadWithTable ( tableName ) ;
276
-
251
+ /**
252
+ * Saves pre-aggs table to the bucket and returns links to download
253
+ * results.
254
+ */
255
+ public async unload (
256
+ tableName : string ,
257
+ ) : Promise < DownloadTableCSVData > {
258
+ const types = await this . tableColumnTypes ( tableName ) ;
259
+ const columns = types . map ( t => t . name ) . join ( ', ' ) ;
277
260
const pathname = `${ this . config . exportBucket } /${ tableName } .csv` ;
278
261
const csvFile = await this . getCsvFiles (
262
+ tableName ,
263
+ columns ,
279
264
pathname ,
280
265
) ;
281
-
282
266
return {
283
267
csvFile,
284
268
types,
@@ -287,41 +271,21 @@ export class DatabricksDriver extends JDBCDriver {
287
271
}
288
272
289
273
/**
290
- * Create table with query and unload it to bucket
291
- */
292
- private async unloadWithSql ( tableName : string , sql : string , params : unknown [ ] ) {
293
- const types = await this . queryColumnTypes ( sql , params ) ;
294
-
295
- await this . createExternalTableFromSql ( tableName , sql , params ) ;
296
-
297
- return types ;
298
- }
299
-
300
- /**
301
- * Create table from preaggregation table with location and unload it to bucket
302
- */
303
- private async unloadWithTable ( tableName : string ) {
304
- const types = await this . tableColumnTypes ( tableName ) ;
305
- const columns = types . map ( t => t . name ) . join ( ', ' ) ;
306
-
307
- await this . createExternalTableFromTable ( tableName , columns ) ;
308
-
309
- return types ;
310
- }
311
-
312
- /**
313
- * return csv files signed URLs array.
274
+ * Unload table to bucket using Databricks JDBC query and returns (async)
275
+ * csv files signed URLs array.
314
276
*/
315
277
private async getCsvFiles (
278
+ table : string ,
279
+ columns : string ,
316
280
pathname : string ,
317
281
) : Promise < string [ ] > {
318
282
let res ;
319
283
switch ( this . config . bucketType ) {
320
284
case 'azure' :
321
- res = await this . getSignedAzureUrls ( pathname ) ;
285
+ res = await this . getAzureCsvFiles ( table , columns , pathname ) ;
322
286
break ;
323
287
case 's3' :
324
- res = await this . getSignedS3Urls ( pathname ) ;
288
+ res = await this . getS3CsvFiles ( table , columns , pathname ) ;
325
289
break ;
326
290
default :
327
291
throw new Error ( `Unsupported export bucket type: ${
@@ -331,6 +295,19 @@ export class DatabricksDriver extends JDBCDriver {
331
295
return res ;
332
296
}
333
297
298
+ /**
299
+ * Saves specified table to the Azure blob storage and returns (async)
300
+ * csv files signed URLs array.
301
+ */
302
+ private async getAzureCsvFiles (
303
+ table : string ,
304
+ columns : string ,
305
+ pathname : string ,
306
+ ) : Promise < string [ ] > {
307
+ await this . createExternalTable ( table , columns ) ;
308
+ return this . getSignedAzureUrls ( pathname ) ;
309
+ }
310
+
334
311
/**
335
312
* Returns Azure signed URLs of unloaded scv files.
336
313
*/
@@ -383,6 +360,19 @@ export class DatabricksDriver extends JDBCDriver {
383
360
return csvFile ;
384
361
}
385
362
363
+ /**
364
+ * Saves specified table to the S3 bucket and returns (async) csv files
365
+ * signed URLs array.
366
+ */
367
+ private async getS3CsvFiles (
368
+ table : string ,
369
+ columns : string ,
370
+ pathname : string ,
371
+ ) : Promise < string [ ] > {
372
+ await this . createExternalTable ( table , columns ) ;
373
+ return this . getSignedS3Urls ( pathname ) ;
374
+ }
375
+
386
376
/**
387
377
* Returns S3 signed URLs of unloaded scv files.
388
378
*/
@@ -439,19 +429,7 @@ export class DatabricksDriver extends JDBCDriver {
439
429
* `fs.s3a.access.key <aws-access-key>`
440
430
* `fs.s3a.secret.key <aws-secret-key>`
441
431
*/
442
- private async createExternalTableFromSql ( table : string , sql : string , params : unknown [ ] ) {
443
- await this . query (
444
- `
445
- CREATE TABLE ${ table } _csv_export
446
- USING CSV LOCATION '${ this . config . exportBucketMountDir || this . config . exportBucket } /${ table } .csv'
447
- OPTIONS (escape = '"')
448
- AS (${ sql } )
449
- ` ,
450
- params ,
451
- ) ;
452
- }
453
-
454
- private async createExternalTableFromTable ( table : string , columns : string ) {
432
+ private async createExternalTable ( table : string , columns : string , ) {
455
433
await this . query (
456
434
`
457
435
CREATE TABLE ${ table } _csv_export
@@ -462,8 +440,4 @@ export class DatabricksDriver extends JDBCDriver {
462
440
[ ] ,
463
441
) ;
464
442
}
465
-
466
- public capabilities ( ) : DriverCapabilities {
467
- return { unloadWithoutTempTable : true } ;
468
- }
469
443
}
0 commit comments