Skip to content

Commit 759c0da

Browse files
committed
update createtableas method to allow for string in a where clause
1 parent 16f69e0 commit 759c0da

File tree

1 file changed

+11
-12
lines changed

1 file changed

+11
-12
lines changed

ksqljs/ksqldb/ksqldb.js

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class ksqldb {
5252
validateInputs([query, 'string', 'query']);
5353

5454
const validatedQuery = builder.build(query);
55-
55+
5656
return axios
5757
.post(this.ksqldbURL + "/query-stream",
5858
{
@@ -70,7 +70,7 @@ class ksqldb {
7070
})
7171
.then((res) => res.data)
7272
.catch((error) => {
73-
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
73+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
7474
});
7575
}
7676

@@ -173,7 +173,7 @@ class ksqldb {
173173
})
174174
.then(res => res.data[0])
175175
.catch(error => {
176-
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
176+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
177177
});
178178
}
179179

@@ -191,7 +191,7 @@ class ksqldb {
191191
validateInputs([query, 'string', 'query']);
192192

193193
const validatedQuery = builder.build(query);
194-
194+
195195
return axios.post(this.ksqldbURL + '/ksql',
196196
{
197197
ksql: validatedQuery
@@ -208,7 +208,7 @@ class ksqldb {
208208
})
209209
.then(res => res.data[0])
210210
.catch(error => {
211-
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
211+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
212212
});
213213
}
214214

@@ -244,7 +244,7 @@ class ksqldb {
244244
httpsAgent: this.httpsAgentAxios ? this.httpsAgentAxios : null,
245245
})
246246
.catch(error => {
247-
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
247+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
248248
});
249249
}
250250

@@ -300,7 +300,7 @@ class ksqldb {
300300
})
301301
.then(res => res.data[0].commandStatus.queryId)
302302
.catch(error => {
303-
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
303+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
304304
});
305305
}
306306

@@ -340,7 +340,7 @@ class ksqldb {
340340
httpsAgent: this.httpsAgentAxios ? this.httpsAgentAxios : null,
341341
})
342342
.catch(error => {
343-
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
343+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
344344
});
345345
}
346346

@@ -397,18 +397,17 @@ class ksqldb {
397397
i += 2;
398398
conditionsArr.shift()
399399
}
400-
conditionQuery = builder.build('??????', sqlClauses[0], sqlClauses[1], sqlClauses[2], sqlClauses[3], sqlClauses[4], sqlClauses[5]);
400+
conditionQuery = builder.build(`${sqlClauses[0][0]}${sqlClauses[1][0]}????`, sqlClauses[2], sqlClauses[3], sqlClauses[4], sqlClauses[5]);
401401
}
402402

403-
404403
// reformat for builder
405404
tableName = [tableName];
406405
selectColStr = [selectColStr];
407406
source = [source];
408407
conditionQuery = [conditionQuery]
409408

410409

411-
const query = builder.build(`CREATE TABLE ? WITH (kafka_topic=?, value_format=?, partitions=?) AS SELECT ? FROM ? ?EMIT CHANGES;`, tableName, defaultProps.topic, defaultProps.value_format, defaultProps.partitions, selectColStr, source, conditionQuery)
410+
const query = builder.build(`CREATE TABLE ? WITH (kafka_topic=?, value_format=?, partitions=?) AS SELECT ? FROM ? ${conditionQuery} EMIT CHANGES;`, tableName, defaultProps.topic, defaultProps.value_format, defaultProps.partitions, selectColStr, source)
412411
return axios.post(this.ksqldbURL + '/ksql', { ksql: query }, {
413412
headers:
414413
this.API && this.secret ?
@@ -541,7 +540,7 @@ class ksqldb {
541540
return axios.get(this.ksqldbURL + `/status/${commandId}`)
542541
.then(response => response)
543542
.catch(error => {
544-
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
543+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
545544
});
546545
}
547546

0 commit comments

Comments
 (0)