Skip to content

Commit afe3066

Browse files
Integrated custom ksqlDB error
1 parent b1392a8 commit afe3066

File tree

4 files changed

+128
-187
lines changed

4 files changed

+128
-187
lines changed

.gitignore

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ dist
103103
# TernJS port file
104104
.tern-port
105105

106-
<<<<<<< HEAD
107106
# Local
108107
ksqljsTest.js
109108
package-lock.json
@@ -113,7 +112,3 @@ local_ignore/
113112
# KSQLDB docker server settings
114113
ksqldb_server_config/
115114

116-
=======
117-
# Local files
118-
./ksqljs.js
119-
>>>>>>> jonTest

ksqljs/customErrors.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ class ksqlDBError extends Error {
1010

1111
// Ensure the name of this error is the same as the class name
1212
this.name = this.constructor.name
13-
13+
1414
// capturing the stack trace keeps the reference to your error class
1515
Error.captureStackTrace(this, this.constructor);
16-
16+
1717
// you may also assign additional properties to your error
1818
//this.status = 404
19+
Object.keys(error).forEach(property => {this[property] = error[property]});
1920
}
2021
}
2122

ksqljs/ksqlJS.js

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ class ksqljs {
5151
})
5252
.then((res) => res.data)
5353
.catch((error) => {
54-
console.error(error);
55-
throw new ksqlDBError(error);
54+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
5655
});
5756
}
5857

@@ -104,6 +103,9 @@ class ksqljs {
104103
req.setEncoding("utf8");
105104

106105
req.on("data", (chunk) => {
106+
// check for chunk containing errors
107+
if (JSON.parse(chunk)['@type']?.includes('error')) throw new ksqlDBError(JSON.parse(chunk));
108+
// continue if chunk indicates a healthy response
107109
if (!sentQueryId) {
108110
sentQueryId = true;
109111
cb(chunk);
@@ -147,8 +149,7 @@ class ksqljs {
147149
})
148150
.then(res => res.data[0])
149151
.catch(error => {
150-
console.error(error);
151-
throw new ksqlDBError(error);
152+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
152153
});
153154
}
154155

@@ -179,8 +180,7 @@ class ksqljs {
179180
})
180181
.then(res => res.data[0])
181182
.catch(error => {
182-
console.error(error);
183-
throw new ksqlDBError(error);
183+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
184184
});
185185
}
186186

@@ -218,8 +218,7 @@ class ksqljs {
218218
})
219219
.then(res => res)
220220
.catch(error => {
221-
console.error(error);
222-
throw new ksqlDBError(error);
221+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
223222
});
224223
}
225224

@@ -263,7 +262,9 @@ class ksqljs {
263262

264263
return axios.post(this.ksqldbURL + '/ksql', { ksql: query })
265264
.then(res => res.data[0].commandStatus.queryId)
266-
.catch(error => console.log(error));
265+
.catch(error => {
266+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
267+
});
267268
}
268269

269270
//---------------------Create tables-----------------
@@ -300,8 +301,7 @@ class ksqljs {
300301
httpsAgent: this.httpsAgentAxios ? this.httpsAgentAxios : null,
301302
})
302303
.catch(error => {
303-
console.error(error);
304-
throw new ksqlDBError(error);
304+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
305305
});
306306
}
307307

@@ -369,7 +369,9 @@ class ksqljs {
369369

370370
const query = builder.build(`CREATE TABLE ? WITH (kafka_topic=?, value_format=?, partitions=?) AS SELECT ? FROM ? ?EMIT CHANGES;`, tableName, defaultProps.topic, defaultProps.value_format, defaultProps.partitions, selectColStr, source, conditionQuery)
371371
return axios.post(this.ksqldbURL + '/ksql', { ksql: query })
372-
.catch(error => console.log(error));
372+
.catch(error =>{
373+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
374+
});
373375
}
374376

375377
/**
@@ -420,6 +422,9 @@ class ksqljs {
420422
req.setEncoding("utf8");
421423

422424
req.on("data", (chunk) => {
425+
// check for chunk containing errors
426+
if (JSON.parse(chunk)['@type']?.includes('error')) throw new ksqlDBError(JSON.parse(chunk));
427+
// continue if chunk indicates a healthy response
423428
msgOutput.push(JSON.parse(chunk));
424429
});
425430

@@ -489,8 +494,7 @@ class ksqljs {
489494
return axios.get(this.ksqldbURL + `/status/${commandId}`)
490495
.then(response => response)
491496
.catch(error => {
492-
console.error(error);
493-
throw new ksqlDBError(error);
497+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
494498
});
495499
}
496500

@@ -507,8 +511,7 @@ class ksqljs {
507511
return axios.get(this.ksqldbURL + `/info`)
508512
.then(response => response)
509513
.catch(error => {
510-
console.error(error);
511-
throw new ksqlDBError(error);
514+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
512515
});
513516
}
514517

@@ -525,8 +528,7 @@ class ksqljs {
525528
return axios.get(this.ksqldbURL + `/healthcheck`)
526529
.then(response => response)
527530
.catch(error => {
528-
console.error(error);
529-
throw new ksqlDBError(error);
531+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
530532
});
531533
}
532534

@@ -544,8 +546,7 @@ class ksqljs {
544546
return axios.get(this.ksqldbURL + `/clusterStatus`)
545547
.then(response => response)
546548
.catch(error => {
547-
console.error(error);
548-
throw new ksqlDBError(error);
549+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
549550
});
550551
}
551552

@@ -572,8 +573,7 @@ class ksqljs {
572573
})
573574
.then(response => response)
574575
.catch(error => {
575-
console.error(error);
576-
throw new ksqlDBError(error);
576+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
577577
});
578578
}
579579

@@ -597,8 +597,7 @@ class ksqljs {
597597
return axios.get(this.ksqldbURL + `/is_valid_property/${propertyName}`)
598598
.then(response => response)
599599
.catch(error => {
600-
console.error(error);
601-
throw new ksqlDBError(error);
600+
throw error.response?.data['@type'] ? new ksqlDBError(error.response.data) : error;
602601
});
603602
}
604603
};

0 commit comments

Comments
 (0)