@@ -2,6 +2,7 @@ const axios = require("axios");
2
2
const https = require ( 'node:https' ) ;
3
3
const http2 = require ( "http2" ) ;
4
4
const { ksqlDBError } = require ( "./customErrors.js" ) ;
5
+ const validateInputs = require ( './validateInputs.js' ) ;
5
6
const queryBuilder = require ( './queryBuilder.js' ) ;
6
7
const builder = new queryBuilder ( ) ;
7
8
@@ -48,10 +49,14 @@ class ksqldb {
48
49
* Example: [{object that contains the metadata}, [data], [data], ...}]
49
50
*/
50
51
pull = ( query ) => {
52
+ validateInputs ( [ query , 'string' , 'query' ] ) ;
53
+
54
+ const validatedQuery = builder . build ( query ) ;
55
+
51
56
return axios
52
57
. post ( this . ksqldbURL + "/query-stream" ,
53
58
{
54
- sql : query ,
59
+ sql : validatedQuery ,
55
60
} ,
56
61
{
57
62
headers :
@@ -65,8 +70,7 @@ class ksqldb {
65
70
} )
66
71
. then ( ( res ) => res . data )
67
72
. catch ( ( error ) => {
68
- console . error ( error ) ;
69
- throw new ksqlDBError ( error ) ;
73
+ throw error . response ?. data [ '@type' ] ? new ksqlDBError ( error . response . data ) : error ;
70
74
} ) ;
71
75
}
72
76
@@ -85,6 +89,9 @@ class ksqldb {
85
89
* result if successful.
86
90
*/
87
91
push ( query , cb ) {
92
+ validateInputs ( [ query , 'string' , 'query' , true ] , [ cb , 'function' , 'cb' , true ] ) ;
93
+ const validatedQuery = builder . build ( query ) ;
94
+
88
95
return new Promise ( ( resolve , reject ) => {
89
96
let sentQueryId = false ;
90
97
const session = http2 . connect (
@@ -109,7 +116,7 @@ class ksqldb {
109
116
) ;
110
117
111
118
const reqBody = {
112
- sql : query ,
119
+ sql : validatedQuery ,
113
120
Accept : "application/json, application/vnd.ksqlapi.delimited.v1" ,
114
121
} ;
115
122
@@ -118,6 +125,9 @@ class ksqldb {
118
125
req . setEncoding ( "utf8" ) ;
119
126
120
127
req . on ( "data" , ( chunk ) => {
128
+ // check for chunk containing errors
129
+ if ( JSON . parse ( chunk ) [ '@type' ] ?. includes ( 'error' ) ) throw new ksqlDBError ( JSON . parse ( chunk ) ) ;
130
+ // continue if chunk indicates a healthy response
121
131
if ( ! sentQueryId ) {
122
132
sentQueryId = true ;
123
133
cb ( chunk ) ;
@@ -145,6 +155,8 @@ class ksqldb {
145
155
* if the termination was successful.
146
156
*/
147
157
terminate ( queryId ) {
158
+ validateInputs ( [ queryId , 'string' , 'queryId' ] ) ;
159
+
148
160
return axios . post ( this . ksqldbURL + '/ksql' ,
149
161
{
150
162
ksql : `TERMINATE ${ queryId } ;`
@@ -161,8 +173,7 @@ class ksqldb {
161
173
} )
162
174
. then ( res => res . data [ 0 ] )
163
175
. catch ( error => {
164
- console . error ( error ) ;
165
- throw new ksqlDBError ( error ) ;
176
+ throw error . response ?. data [ '@type' ] ? new ksqlDBError ( error . response . data ) : error ;
166
177
} ) ;
167
178
}
168
179
@@ -177,9 +188,13 @@ class ksqldb {
177
188
* @return {Promise } a promise that completes once the server response is received, and returns the requested data.
178
189
*/
179
190
ksql ( query ) {
191
+ validateInputs ( [ query , 'string' , 'query' ] ) ;
192
+
193
+ const validatedQuery = builder . build ( query ) ;
194
+
180
195
return axios . post ( this . ksqldbURL + '/ksql' ,
181
196
{
182
- ksql : query
197
+ ksql : validatedQuery
183
198
} ,
184
199
{
185
200
headers :
@@ -193,8 +208,7 @@ class ksqldb {
193
208
} )
194
209
. then ( res => res . data [ 0 ] )
195
210
. catch ( error => {
196
- console . error ( error ) ;
197
- throw new ksqlDBError ( error ) ;
211
+ throw error . response ?. data [ '@type' ] ? new ksqlDBError ( error . response . data ) : error ;
198
212
} ) ;
199
213
}
200
214
@@ -214,9 +228,8 @@ class ksqldb {
214
228
* @return {Promise } a promise that completes once the server response is received, and returns a response object.
215
229
*/
216
230
createStream ( name , columnsType , topic , value_format = 'json' , partitions = 1 , key ) {
217
- if ( typeof name !== 'string' || typeof columnsType !== 'object' || typeof topic !== 'string' || typeof partitions !== 'number' ) {
218
- return console . log ( "invalid input(s)" )
219
- }
231
+ validateInputs ( [ name , 'string' , 'name' , true ] , [ columnsType , 'array' , 'columnsType' , true ] , [ topic , 'string' , 'topic' ] , [ partitions , 'number' , 'partitions' ] ) ;
232
+
220
233
const columnsTypeString = columnsType . reduce ( ( result , currentType ) => result + ', ' + currentType ) ;
221
234
const query = `CREATE STREAM ${ name } (${ columnsTypeString } ) WITH (kafka_topic='${ topic } ', value_format='${ value_format } ', partitions=${ partitions } );` ;
222
235
@@ -230,10 +243,8 @@ class ksqldb {
230
243
{ } ,
231
244
httpsAgent : this . httpsAgentAxios ? this . httpsAgentAxios : null ,
232
245
} )
233
- . then ( res => res )
234
246
. catch ( error => {
235
- console . error ( error ) ;
236
- throw new ksqlDBError ( error ) ;
247
+ throw error . response ?. data [ '@type' ] ? new ksqlDBError ( error . response . data ) : error ;
237
248
} ) ;
238
249
}
239
250
@@ -248,6 +259,8 @@ class ksqldb {
248
259
* @returns {Promise } - a promise that completes once the server response is received, and returns a query ID
249
260
*/
250
261
createStreamAs = ( streamName , selectColumns , sourceStream , propertiesObj , conditions , partitionBy ) => {
262
+ validateInputs ( [ streamName , 'string' , 'streamName' , true ] , [ selectColumns , 'array' , 'selectColumns' , true ] , [ sourceStream , 'string' , 'sourceStream' , true ] , [ propertiesObj , 'object' , 'propertiesObj' ] , [ conditions , 'string' , 'conditions' ] , [ partitionBy , 'string' , 'partitionBy' ] ) ;
263
+
251
264
const propertiesArgs = [ ] ;
252
265
const selectColStr = selectColumns . reduce ( ( result , current ) => result + ', ' + current ) ;
253
266
// begin with first consistent portion of query
@@ -286,7 +299,9 @@ class ksqldb {
286
299
httpsAgent : this . httpsAgentAxios ? this . httpsAgentAxios : null ,
287
300
} )
288
301
. then ( res => res . data [ 0 ] . commandStatus . queryId )
289
- . catch ( error => console . log ( error ) ) ;
302
+ . catch ( error => {
303
+ throw error . response ?. data [ '@type' ] ? new ksqlDBError ( error . response . data ) : error ;
304
+ } ) ;
290
305
}
291
306
292
307
//---------------------Create tables-----------------
@@ -305,6 +320,8 @@ class ksqldb {
305
320
* @return {Promise } a promise that completes once the server response is received, and returns a response object.
306
321
*/
307
322
createTable = ( name , columnsType , topic , value_format = 'json' , partitions ) => {
323
+ validateInputs ( [ name , 'string' , 'name' , true ] , [ columnsType , 'array' , 'columnsType' , true ] , [ topic , 'string' , 'topic' , true ] , [ value_format , 'string' , 'value_format' , true ] , [ partitions , 'number' , 'partitions' ] ) ;
324
+
308
325
const columnsTypeString = columnsType . reduce ( ( result , currentType ) => result + ', ' + currentType ) ;
309
326
const query = `CREATE TABLE ${ name } (${ columnsTypeString } ) WITH (kafka_topic='${ topic } ', value_format='${ value_format } ', partitions=${ partitions } );`
310
327
@@ -323,8 +340,7 @@ class ksqldb {
323
340
httpsAgent : this . httpsAgentAxios ? this . httpsAgentAxios : null ,
324
341
} )
325
342
. catch ( error => {
326
- console . error ( error ) ;
327
- throw new ksqlDBError ( error ) ;
343
+ throw error . response ?. data [ '@type' ] ? new ksqlDBError ( error . response . data ) : error ;
328
344
} ) ;
329
345
}
330
346
@@ -344,6 +360,8 @@ class ksqldb {
344
360
* @returns {Promise } a promise that completes once the server response is received, returning a response object
345
361
*/
346
362
createTableAs = ( tableName , source , selectArray , propertiesObj , conditionsObj ) => {
363
+ validateInputs ( [ tableName , 'string' , 'tableName' , true ] , [ source , 'string' , 'source' , true ] , [ selectArray , 'array' , 'selectArray' , true ] , [ propertiesObj , 'object' , 'propertiesObj' ] , [ conditionsObj , 'object' , 'conditionsObj' ] ) ;
364
+
347
365
let selectColStr = selectArray . reduce ( ( result , current ) => result + ', ' + current ) ;
348
366
349
367
// expect user to input properties object of format {topic: ... , value_format: ..., partitions: ...}
@@ -417,6 +435,8 @@ class ksqldb {
417
435
*/
418
436
//---------------------Insert Rows Into Existing Streams-----------------
419
437
insertStream = ( stream , rows ) => {
438
+ validateInputs ( [ stream , 'string' , 'stream' , true ] , [ rows , 'array' , 'rows' , true ] ) ;
439
+
420
440
return new Promise ( ( resolve , reject ) => {
421
441
const msgOutput = [ ] ;
422
442
@@ -452,6 +472,9 @@ class ksqldb {
452
472
req . setEncoding ( "utf8" ) ;
453
473
454
474
req . on ( "data" , ( chunk ) => {
475
+ // check for chunk containing errors
476
+ if ( JSON . parse ( chunk ) [ '@type' ] ?. includes ( 'error' ) ) throw new ksqlDBError ( JSON . parse ( chunk ) ) ;
477
+ // continue if chunk indicates a healthy response
455
478
msgOutput . push ( JSON . parse ( chunk ) ) ;
456
479
} ) ;
457
480
@@ -480,13 +503,8 @@ class ksqldb {
480
503
* the end of the array that includes the time that the data was inserted into the ksqldb.
481
504
*/
482
505
pullFromTo = async ( streamName , timezone = 'Greenwich' , from = [ undefined , '00' , '00' , '00' ] , to = [ '2200-03-14' , '00' , '00' , '00' ] ) => {
483
- if ( ! streamName || typeof timezone !== 'string' || ! from
484
- || typeof from [ 0 ] !== 'string' || typeof from [ 1 ] !== 'string' || typeof from [ 2 ] !== 'string' || typeof from [ 3 ] !== 'string'
485
- || typeof to [ 0 ] !== 'string' || typeof to [ 1 ] !== 'string' || typeof to [ 2 ] !== 'string' || typeof to [ 3 ] !== 'string'
486
- || from [ 0 ] . length !== 10 || to [ 0 ] . length !== 10 || from [ 1 ] . length !== 2 || to [ 1 ] . length !== 2 || from [ 2 ] . length !== 2 || to [ 2 ] . length !== 2 || from [ 3 ] . length !== 2 || to [ 3 ] . length !== 2
487
- ) {
488
- return new Error ( 'invalid inputs' ) ;
489
- }
506
+ validateInputs ( [ streamName , 'string' , 'streamName' , true ] , [ timezone , 'string' , 'timezone' , true ] , [ from , 'array' , 'from' , true ] , [ to , 'array' , 'to' , true ] ) ;
507
+
490
508
const userFrom = `${ from [ 0 ] } T${ from [ 1 ] } :${ from [ 2 ] } :${ from [ 3 ] } ` ;
491
509
const userTo = `${ to [ 0 ] } T${ to [ 1 ] } :${ to [ 2 ] } :${ to [ 3 ] } ` ;
492
510
const userFromUnix = new Date ( userFrom ) . getTime ( ) ;
@@ -518,11 +536,12 @@ class ksqldb {
518
536
* message (string): Detailed message regarding the status of the execution statement.
519
537
*/
520
538
inspectQueryStatus ( commandId ) {
539
+ validateInputs ( [ commandId , 'string' , 'commandId' , true ] ) ;
540
+
521
541
return axios . get ( this . ksqldbURL + `/status/${ commandId } ` )
522
542
. then ( response => response )
523
543
. catch ( error => {
524
- console . error ( error ) ;
525
- throw new ksqlDBError ( error ) ;
544
+ throw error . response ?. data [ '@type' ] ? new ksqlDBError ( error . response . data ) : error ;
526
545
} ) ;
527
546
}
528
547
@@ -539,8 +558,7 @@ class ksqldb {
539
558
return axios . get ( this . ksqldbURL + `/info` )
540
559
. then ( response => response )
541
560
. catch ( error => {
542
- console . error ( error ) ;
543
- throw new ksqlDBError ( error ) ;
561
+ throw error . response ?. data [ '@type' ] ? new ksqlDBError ( error . response . data ) : error ;
544
562
} ) ;
545
563
}
546
564
@@ -557,8 +575,7 @@ class ksqldb {
557
575
return axios . get ( this . ksqldbURL + `/healthcheck` )
558
576
. then ( response => response )
559
577
. catch ( error => {
560
- console . error ( error ) ;
561
- throw new ksqlDBError ( error ) ;
578
+ throw error . response ?. data [ '@type' ] ? new ksqlDBError ( error . response . data ) : error ;
562
579
} ) ;
563
580
}
564
581
@@ -576,8 +593,7 @@ class ksqldb {
576
593
return axios . get ( this . ksqldbURL + `/clusterStatus` )
577
594
. then ( response => response )
578
595
. catch ( error => {
579
- console . error ( error ) ;
580
- throw new ksqlDBError ( error ) ;
596
+ throw error . response ?. data [ '@type' ] ? new ksqlDBError ( error . response . data ) : error ;
581
597
} ) ;
582
598
}
583
599
@@ -592,6 +608,8 @@ class ksqldb {
592
608
* @return {Promise } this method returns a promise that returns a response object.
593
609
*/
594
610
terminateCluster ( topicsToDelete = [ ] ) {
611
+ validateInputs ( [ topicsToDelete , 'array' , 'topicsToDelete' , true ] ) ;
612
+
595
613
return axios . post ( this . ksqldbURL + `/ksql/terminate` , {
596
614
"deleteTopicList" : topicsToDelete
597
615
} , {
@@ -604,8 +622,7 @@ class ksqldb {
604
622
} )
605
623
. then ( response => response )
606
624
. catch ( error => {
607
- console . error ( error ) ;
608
- throw new ksqlDBError ( error ) ;
625
+ throw error . response ?. data [ '@type' ] ? new ksqlDBError ( error . response . data ) : error ;
609
626
} ) ;
610
627
}
611
628
@@ -623,14 +640,16 @@ class ksqldb {
623
640
* "message": "One or more properties overrides set locally are prohibited by the KSQL server (use UNSET to reset their default value): [ksql.service.id]"
624
641
* }
625
642
*
643
+ * @param {string } propertyName - the name of the property to validate
626
644
* @return {Promise } this method returns a promise that resolves to a boolean true if the property is allowed to be changed.
627
645
*/
628
646
isValidProperty ( propertyName ) {
647
+ validateInputs ( [ propertyName , 'string' , 'propertyName' , true ] ) ;
648
+
629
649
return axios . get ( this . ksqldbURL + `/is_valid_property/${ propertyName } ` )
630
650
. then ( response => response )
631
651
. catch ( error => {
632
- console . error ( error ) ;
633
- throw new ksqlDBError ( error ) ;
652
+ throw error . response ?. data [ '@type' ] ? new ksqlDBError ( error . response . data ) : error ;
634
653
} ) ;
635
654
}
636
655
} ;
0 commit comments