Skip to content

Commit 8e8f500

Browse files
authored
feat: strategy without creating temp table (cube-js#5299)
* wip * fix * wip * fix * wip * wip * wip * added types * wip * wip * refactor * one more refactor * refactor databricks * fix * update types * capabilities fix * fix * fix * refactor * add ESCAPING
1 parent f4150cc commit 8e8f500

File tree

6 files changed

+298
-113
lines changed

6 files changed

+298
-113
lines changed

packages/cubejs-base-driver/src/BaseDriver.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,25 @@ import fs from 'fs';
33
import { getEnv, isFilePath, isSslKey, isSslCert } from '@cubejs-backend/shared';
44

55
import { cancelCombinator } from './utils';
6-
import { CreateTableIndex, DownloadQueryResultsOptions, DownloadQueryResultsResult, DownloadTableCSVData, DownloadTableData, DownloadTableMemoryData, DriverInterface, ExternalDriverCompatibilities, IndexesSQL, isDownloadTableMemoryData, QueryOptions, Row, Rows, TableColumn, TableColumnQueryResult, TableQueryResult, TableStructure } from './driver.interface';
6+
import {
7+
CreateTableIndex,
8+
DownloadQueryResultsOptions,
9+
DownloadQueryResultsResult,
10+
DownloadTableCSVData,
11+
DownloadTableData,
12+
DownloadTableMemoryData,
13+
DriverInterface,
14+
ExternalDriverCompatibilities,
15+
IndexesSQL,
16+
isDownloadTableMemoryData,
17+
QueryOptions,
18+
Row,
19+
TableColumn,
20+
TableColumnQueryResult,
21+
TableQueryResult,
22+
TableStructure,
23+
DriverCapabilities
24+
} from './driver.interface';
725

826
const sortByKeys = (unordered) => {
927
const ordered = {};
@@ -365,7 +383,7 @@ export abstract class BaseDriver implements DriverInterface {
365383
// override, if it's needed
366384
}
367385

368-
public capabilities(): ExternalDriverCompatibilities {
386+
public capabilities(): DriverCapabilities {
369387
return {};
370388
}
371389

packages/cubejs-base-driver/src/driver.interface.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ export interface ExternalDriverCompatibilities {
9797
csvImport?: true,
9898
streamImport?: true,
9999
}
100+
101+
export interface DriverCapabilities extends ExternalDriverCompatibilities {
102+
unloadWithoutTempTable?: true,
103+
}
104+
100105
export type StreamOptions = {
101106
highWaterMark: number;
102107
};
@@ -117,8 +122,14 @@ export type CreateTableIndex = {
117122
columns: string[]
118123
};
119124

125+
type UnloadQuery = {
126+
sql: string,
127+
params: unknown[]
128+
};
129+
120130
export type UnloadOptions = {
121131
maxFileSize: number,
132+
query?: UnloadQuery;
122133
};
123134

124135
export type QueryOptions = {
@@ -159,5 +170,5 @@ export interface DriverInterface {
159170
// Shutdown the driver
160171
release(): Promise<void>
161172

162-
capabilities(): ExternalDriverCompatibilities;
173+
capabilities(): DriverCapabilities;
163174
}

packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import {
88
BaseDriver,
99
DownloadTableCSVData,
1010
DownloadTableMemoryData, DriverInterface, IndexesSQL, CreateTableIndex,
11-
StreamTableData, ExternalDriverCompatibilities,
12-
StreamingSourceTableData, QueryOptions,
11+
StreamTableData,
12+
DriverCapabilities,
13+
StreamingSourceTableData,
14+
QueryOptions,
1315
} from '@cubejs-backend/base-driver';
1416
import { getEnv } from '@cubejs-backend/shared';
1517
import { format as formatSql } from 'sqlstring';
@@ -364,7 +366,7 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
364366
return CubeStoreQuery;
365367
}
366368

367-
public capabilities(): ExternalDriverCompatibilities {
369+
public capabilities(): DriverCapabilities {
368370
return {
369371
csvImport: true,
370372
streamImport: true,

packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts

Lines changed: 73 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@ import {
1010
SASProtocol,
1111
generateBlobSASQueryParameters,
1212
} from '@azure/storage-blob';
13-
import {
14-
DownloadTableCSVData,
15-
} from '@cubejs-backend/base-driver';
13+
import { DriverCapabilities, UnloadOptions, } from '@cubejs-backend/base-driver';
1614
import {
1715
JDBCDriver,
1816
JDBCDriverConfiguration,
@@ -194,6 +192,23 @@ export class DatabricksDriver extends JDBCDriver {
194192
return result;
195193
}
196194

195+
private async queryColumnTypes(sql: string, params: unknown[]) {
196+
const result = [];
197+
// eslint-disable-next-line camelcase
198+
const response = await this.query<{col_name: string; data_type: string}>(`DESCRIBE QUERY ${sql}`, params);
199+
200+
for (const column of response) {
201+
// Databricks describe additional info by default after empty line.
202+
if (column.col_name === '') {
203+
break;
204+
}
205+
206+
result.push({ name: column.col_name, type: this.toGenericType(column.data_type) });
207+
}
208+
209+
return result;
210+
}
211+
197212
public async getTablesQuery(schemaName: string) {
198213
const response = await this.query(`SHOW TABLES IN ${this.quoteIdentifier(schemaName)}`, []);
199214

@@ -248,21 +263,22 @@ export class DatabricksDriver extends JDBCDriver {
248263
return this.config.exportBucket !== undefined;
249264
}
250265

251-
/**
252-
* Saves pre-aggs table to the bucket and returns links to download
253-
* results.
254-
*/
255-
public async unload(
256-
tableName: string,
257-
): Promise<DownloadTableCSVData> {
258-
const types = await this.tableColumnTypes(tableName);
259-
const columns = types.map(t => t.name).join(', ');
266+
public async unload(tableName: string, options: UnloadOptions) {
267+
if (!['azure', 's3'].includes(this.config.bucketType as string)) {
268+
throw new Error(`Unsupported export bucket type: ${
269+
this.config.bucketType
270+
}`);
271+
}
272+
273+
const types = options.query ?
274+
await this.unloadWithSql(tableName, options.query.sql, options.query.params) :
275+
await this.unloadWithTable(tableName);
276+
260277
const pathname = `${this.config.exportBucket}/${tableName}.csv`;
261278
const csvFile = await this.getCsvFiles(
262-
tableName,
263-
columns,
264279
pathname,
265280
);
281+
266282
return {
267283
csvFile,
268284
types,
@@ -271,21 +287,41 @@ export class DatabricksDriver extends JDBCDriver {
271287
}
272288

273289
/**
274-
* Unload table to bucket using Databricks JDBC query and returns (async)
275-
* csv files signed URLs array.
290+
* Create table with query and unload it to bucket
291+
*/
292+
private async unloadWithSql(tableName: string, sql: string, params: unknown[]) {
293+
const types = await this.queryColumnTypes(sql, params);
294+
295+
await this.createExternalTableFromSql(tableName, sql, params);
296+
297+
return types;
298+
}
299+
300+
/**
301+
* Create table from preaggregation table with location and unload it to bucket
302+
*/
303+
private async unloadWithTable(tableName: string) {
304+
const types = await this.tableColumnTypes(tableName);
305+
const columns = types.map(t => t.name).join(', ');
306+
307+
await this.createExternalTableFromTable(tableName, columns);
308+
309+
return types;
310+
}
311+
312+
/**
313+
* return csv files signed URLs array.
276314
*/
277315
private async getCsvFiles(
278-
table: string,
279-
columns: string,
280316
pathname: string,
281317
): Promise<string[]> {
282318
let res;
283319
switch (this.config.bucketType) {
284320
case 'azure':
285-
res = await this.getAzureCsvFiles(table, columns, pathname);
321+
res = await this.getSignedAzureUrls(pathname);
286322
break;
287323
case 's3':
288-
res = await this.getS3CsvFiles(table, columns, pathname);
324+
res = await this.getSignedS3Urls(pathname);
289325
break;
290326
default:
291327
throw new Error(`Unsupported export bucket type: ${
@@ -295,19 +331,6 @@ export class DatabricksDriver extends JDBCDriver {
295331
return res;
296332
}
297333

298-
/**
299-
* Saves specified table to the Azure blob storage and returns (async)
300-
* csv files signed URLs array.
301-
*/
302-
private async getAzureCsvFiles(
303-
table: string,
304-
columns: string,
305-
pathname: string,
306-
): Promise<string[]> {
307-
await this.createExternalTable(table, columns);
308-
return this.getSignedAzureUrls(pathname);
309-
}
310-
311334
/**
312335
* Returns Azure signed URLs of unloaded scv files.
313336
*/
@@ -360,19 +383,6 @@ export class DatabricksDriver extends JDBCDriver {
360383
return csvFile;
361384
}
362385

363-
/**
364-
* Saves specified table to the S3 bucket and returns (async) csv files
365-
* signed URLs array.
366-
*/
367-
private async getS3CsvFiles(
368-
table: string,
369-
columns: string,
370-
pathname: string,
371-
): Promise<string[]> {
372-
await this.createExternalTable(table, columns);
373-
return this.getSignedS3Urls(pathname);
374-
}
375-
376386
/**
377387
* Returns S3 signed URLs of unloaded scv files.
378388
*/
@@ -429,7 +439,19 @@ export class DatabricksDriver extends JDBCDriver {
429439
* `fs.s3a.access.key <aws-access-key>`
430440
* `fs.s3a.secret.key <aws-secret-key>`
431441
*/
432-
private async createExternalTable(table: string, columns: string,) {
442+
private async createExternalTableFromSql(table: string, sql: string, params: unknown[]) {
443+
await this.query(
444+
`
445+
CREATE TABLE ${table}_csv_export
446+
USING CSV LOCATION '${this.config.exportBucketMountDir || this.config.exportBucket}/${table}.csv'
447+
OPTIONS (escape = '"')
448+
AS (${sql})
449+
`,
450+
params,
451+
);
452+
}
453+
454+
private async createExternalTableFromTable(table: string, columns: string) {
433455
await this.query(
434456
`
435457
CREATE TABLE ${table}_csv_export
@@ -440,4 +462,8 @@ export class DatabricksDriver extends JDBCDriver {
440462
[],
441463
);
442464
}
465+
466+
public capabilities(): DriverCapabilities {
467+
return { unloadWithoutTempTable: true };
468+
}
443469
}

0 commit comments

Comments
 (0)