@@ -5,7 +5,21 @@ const { ksqlDBError } = require("./customErrors.js");
5
5
const queryBuilder = require ( './queryBuilder.js' ) ;
6
6
const builder = new queryBuilder ( ) ;
7
7
8
- class ksqljs {
8
+ class ksqldb {
9
+ /**
10
+ * Constructor
11
+ * @param {object } config
12
+ *
13
+ * Config object can have these properties
14
+ *
15
+ * ksqldbURL: Connection URL or address
16
+ *
17
+ * API: Username or API key for basic authentication
18
+ *
19
+ * secret: Password or secret for basic authentication
20
+ *
21
+ * httpsAgent: httpsAgent for setting TLS properties
22
+ */
9
23
constructor ( config ) {
10
24
this . ksqldbURL = config . ksqldbURL ;
11
25
this . API = config . API ? config . API : null ;
@@ -261,7 +275,16 @@ class ksqljs {
261
275
// utilize query with variables to build actual query
262
276
const query = builder . build ( builderQuery , [ streamName ] , ...propertiesArgs , [ sourceStream ] ) ;
263
277
264
- return axios . post ( this . ksqldbURL + '/ksql' , { ksql : query } )
278
+ return axios . post ( this . ksqldbURL + '/ksql' , { ksql : query } , {
279
+ headers :
280
+ this . API && this . secret ?
281
+ {
282
+ "Authorization" : `Basic ${ Buffer . from ( this . API + ":" + this . secret , 'utf8' ) . toString ( 'base64' ) } ` ,
283
+ }
284
+ :
285
+ { } ,
286
+ httpsAgent : this . httpsAgentAxios ? this . httpsAgentAxios : null ,
287
+ } )
265
288
. then ( res => res . data [ 0 ] . commandStatus . queryId )
266
289
. catch ( error => console . log ( error ) ) ;
267
290
}
@@ -331,34 +354,34 @@ class ksqljs {
331
354
partitions : 1
332
355
} ;
333
356
Object . assign ( defaultProps , propertiesObj ) ;
334
-
357
+
335
358
// if there's no properties Obj, assign them all default values
336
359
337
360
// expect user to input a conditions object of format {WHERE: condition, GROUP_BY: condition, HAVING: condition};
338
361
// generate conditions string based on object
339
362
// const builder = new queryBuilder();
340
363
341
364
let conditionQuery = '' ;
342
- if ( conditionsObj ) {
365
+ if ( conditionsObj ) {
343
366
const conditionsArr = [ 'WHERE' , 'GROUP_BY' , 'HAVING' ] ;
344
367
const sqlClauses = [ ] ;
345
-
368
+
346
369
let i = 0 ;
347
- while ( conditionsArr . length ) {
348
- if ( conditionsObj [ conditionsArr [ 0 ] ] ) {
349
- sqlClauses [ i ] = [ conditionsArr [ 0 ] . replace ( '_' , ' ' ) ] ; // clause values are set as arrays for query builder
350
- sqlClauses [ i + 1 ] = [ ' ' + conditionsObj [ conditionsArr [ 0 ] ] + ' ' ] ;
370
+ while ( conditionsArr . length ) {
371
+ if ( conditionsObj [ conditionsArr [ 0 ] ] ) {
372
+ sqlClauses [ i ] = [ conditionsArr [ 0 ] . replace ( '_' , ' ' ) ] ; // clause values are set as arrays for query builder
373
+ sqlClauses [ i + 1 ] = [ ' ' + conditionsObj [ conditionsArr [ 0 ] ] + ' ' ] ;
351
374
}
352
375
else {
353
376
sqlClauses [ i ] = [ '' ] ;
354
- sqlClauses [ i + 1 ] = [ '' ] ;
377
+ sqlClauses [ i + 1 ] = [ '' ] ;
355
378
}
356
- i += 2 ;
379
+ i += 2 ;
357
380
conditionsArr . shift ( )
358
381
}
359
382
conditionQuery = builder . build ( '??????' , sqlClauses [ 0 ] , sqlClauses [ 1 ] , sqlClauses [ 2 ] , sqlClauses [ 3 ] , sqlClauses [ 4 ] , sqlClauses [ 5 ] ) ;
360
383
}
361
-
384
+
362
385
363
386
// reformat for builder
364
387
tableName = [ tableName ] ;
@@ -368,21 +391,30 @@ class ksqljs {
368
391
369
392
370
393
const query = builder . build ( `CREATE TABLE ? WITH (kafka_topic=?, value_format=?, partitions=?) AS SELECT ? FROM ? ?EMIT CHANGES;` , tableName , defaultProps . topic , defaultProps . value_format , defaultProps . partitions , selectColStr , source , conditionQuery )
371
- return axios . post ( this . ksqldbURL + '/ksql' , { ksql : query } )
372
- . catch ( error => console . log ( error ) ) ;
394
+ return axios . post ( this . ksqldbURL + '/ksql' , { ksql : query } , {
395
+ headers :
396
+ this . API && this . secret ?
397
+ {
398
+ "Authorization" : `Basic ${ Buffer . from ( this . API + ":" + this . secret , 'utf8' ) . toString ( 'base64' ) } ` ,
399
+ }
400
+ :
401
+ { } ,
402
+ httpsAgent : this . httpsAgentAxios ? this . httpsAgentAxios : null ,
403
+ } )
404
+ . catch ( error => console . log ( error ) ) ;
373
405
}
374
406
375
- /**
376
- * Inserts rows of data into a stream.
377
- *
378
- * <p>This method may be used to insert new rows of data into a stream.
379
- *
380
- * <p>This method is sql injection protected with the use of queryBuilder.
381
- *
382
- * @param {string } stream the name of the stream to insert data into.
383
- * @param {object } rows an array that contains data that is being inserted into the stream.
384
- * @return {Promise } this method returns a promise that resolves into an array describing the status of the row inserted.
385
- */
407
+ /**
408
+ * Inserts rows of data into a stream.
409
+ *
410
+ * <p>This method may be used to insert new rows of data into a stream.
411
+ *
412
+ * <p>This method is sql injection protected with the use of queryBuilder.
413
+ *
414
+ * @param {string } stream the name of the stream to insert data into.
415
+ * @param {object } rows an array that contains data that is being inserted into the stream.
416
+ * @return {Promise } this method returns a promise that resolves into an array describing the status of the row inserted.
417
+ */
386
418
//---------------------Insert Rows Into Existing Streams-----------------
387
419
insertStream = ( stream , rows ) => {
388
420
return new Promise ( ( resolve , reject ) => {
@@ -603,4 +635,4 @@ class ksqljs {
603
635
}
604
636
} ;
605
637
606
- module . exports = ksqljs ;
638
+ module . exports = ksqldb ;
0 commit comments