Skip to content

Commit 4b42a7f

Browse files
authored
Merge pull request #21 from oslabs-beta/ms/createStreamAs
Ms/create stream as
2 parents cebacee + a0d3747 commit 4b42a7f

File tree

4 files changed

+5443
-59
lines changed

4 files changed

+5443
-59
lines changed

__tests__/integrationtests.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ describe('--Integration Tests--', () => {
5353
const response = await client.insertStream('TESTJESTSTREAM', [
5454
{ "name": "stab-rabbit", "email": "123@mail.com", "age": 100 }
5555
]);
56-
console.log(response);
5756
const data = [];
5857
await client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES;', async (chunk) => {
5958
data.push(JSON.parse(chunk));

ksqljs/ksqlJS.js

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,6 @@ class ksqljs {
155155
* @return {Promise} a promise that completes once the server response is received, and returns a response object.
156156
*/
157157
createStream(name, columnsType, topic, value_format = 'json', partitions = 1, key) {
158-
// console.log(this.ksqldbURL);
159158
if (typeof name !== 'string' || typeof columnsType !== 'object' || typeof topic !== 'string' || typeof partitions !== 'number') {
160159
return console.log("invalid input(s)")
161160
}
@@ -170,6 +169,50 @@ class ksqljs {
170169
});
171170
}
172171

172+
/**
173+
*
174+
* @param {string} streamName
175+
* @param {string[]} selectColumns
176+
* @param {string} sourceStream
177+
* @param {object} propertiesObj
178+
* @param {string} conditions
179+
* @param {string} partitionBy
180+
* @returns {Promise}
181+
*/
182+
createStreamAs = (streamName, selectColumns, sourceStream, propertiesObj, conditions, partitionBy) => {
183+
const propertiesArgs = [];
184+
const selectColStr = selectColumns.reduce((result, current) => result + ', ' + current);
185+
// begin with first consistent portion of query
186+
let builderQuery = 'CREATE STREAM ? ';
187+
188+
// include properties in query if provided
189+
if(Object.keys(propertiesObj).length > 0) {
190+
builderQuery += 'WITH (';
191+
for (const [key, value] of Object.entries(propertiesObj)) {
192+
const justStarted = builderQuery[builderQuery.length - 1] === '(';
193+
194+
if (!justStarted) builderQuery += ', ';
195+
builderQuery += '? = ?';
196+
propertiesArgs.push([key], value);
197+
};
198+
builderQuery += ') ';
199+
}
200+
201+
// continue building the query to be sent to the builder
202+
builderQuery += `AS SELECT ${selectColStr} FROM ? `;
203+
if (conditions.indexOf(';') === -1) builderQuery += `WHERE ${conditions} `;
204+
builderQuery += partitionBy || '';
205+
builderQuery += 'EMIT CHANGES;'
206+
207+
// utilize query with variables to build actual query
208+
const query = builder.build(builderQuery, [streamName], ...propertiesArgs, [sourceStream]);
209+
210+
return axios.post(this.ksqldbURL + '/ksql', { ksql: query })
211+
.then(res => res.data[0].commandStatus.queryId)
212+
.catch(error => console.log(error));
213+
}
214+
215+
//---------------------Create tables-----------------
173216
/**
174217
* Executes a query to create a table.
175218
*

ksqljsTest.js

Lines changed: 32 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -17,41 +17,19 @@ let metadata;
1717
console.log('this is the result', result);
1818
}
1919
20-
pullTest();
20+
pullTest(); */
2121

2222
//---------------------Test Push Queries-------------------
23-
const pushTest = async () => {
24-
// metadata = await client.push('SELECT * FROM riderlocations EMIT CHANGES LIMIT 1;', (row) => console.log(row));
25-
// console.log('this is the metadata returned ', metadata);
26-
let pushActive = false;
27-
await client.createStream('TESTJESTSTREAM', ['age INTEGER'], 'testJestTopic', 'json', 1);
28-
// await client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES LIMIT 1;', (data) => {
29-
// console.log(data);
30-
// console.log('HERE IS DATA ', JSON.parse(data).queryId)
31-
// if(JSON.parse(data).queryId){
32-
// pushActive = true;
33-
// }
34-
// client.ksql(`TERMINATE ${JSON.parse(data).queryId};`)
35-
// client.ksql('DROP STREAM IF EXISTS TESTJESTSTREAM DELETE TOPIC;');
36-
// });
37-
// await client.ksql('DROP STREAM IF EXISTS TESTJESTSTREAM DELETE TOPIC;');
38-
39-
/* const pushTest = async () => {
23+
/* const pushTest = async () => {
4024
try {
4125
metadata = await client.push('SELECT * FROM riderlocations EMIT CHANGES LIMIT 1;', (row) => console.log(row));
4226
console.log('this is the metadata returned ', metadata);
4327
} catch (error) {
4428
console.log(error);
4529
}
46-
47-
metadata = await client.push('SELECT * FROM riderlocations EMIT CHANGES LIMIT 1;', (row) => console.log(row));
48-
console.log('this is the metadata returned ', metadata);
49-
50-
metadata = await client.push('SELECT * FROM riderlocations EMIT CHANGES LIMIT 1;', (row) => console.log(row));
51-
console.log('this is the metadata returned ', metadata);
5230
};
5331
54-
pushTest();
32+
pushTest(); */
5533

5634
//---------------------Test Termination of Queries-------------------
5735
/* const terminateTest = async () => {
@@ -71,11 +49,11 @@ setTimeout(() => terminateTest(metadata), 2000); */
7149
listQueries(); */
7250

7351
//---------------------Test Stream Creation-------------------
74-
// const createStreamTest = () => {
75-
// client.createStream('TestStream', ['name VARCHAR', 'email varchar', 'age INTEGER'], 'testTopic', 'json', 1);
76-
// }
52+
/* const createStreamTest = () => {
53+
client.createStream('TestStream', ['name VARCHAR', 'email varchar', 'age INTEGER'], 'testTopic', 'json', 1);
54+
}
7755
78-
// createStreamTest();
56+
createStreamTest(); */
7957

8058
//---------------------Test Table Creation-------------------
8159
/* const createTableTest = () => {
@@ -85,45 +63,42 @@ listQueries(); */
8563
createTableTest(); */
8664

8765
//---------------------Test Insert Stream-------------------
88-
/* const insertStreamTest = () => {
89-
client.insertStream('TestStream', [
90-
{ "name": "matt", "email": "123@mail.com", "age": 1000 },
91-
{ "name": "jonathan", "email": "234@mail.com", "age": 99 }
92-
]);
93-
/* const insertStreamTest = async () => {
66+
/* const insertStreamTest = async () => {
9467
const test = await client.insertStream('TestStream', [
9568
{ "name": "matt", "email": "123@mail.com", "age": 1000 },
9669
{ "name": "jonathan", "email": "234@mail.com", "age": 99 }
9770
]);
9871
console.log('returned array: ', test);
99-
*/
100-
101-
const insertStreamTest = async () => {
102-
// const test = await client.insertStream('TestStream', [
103-
// { "name": "Scrooge", "email": "mcduck@mail.com", "age": 59 },
104-
// { "name": "jonathan", "email": "234@mail.com", "age": 99 }
105-
// ]);
106-
// console.log('returned array: ', test);
107-
};
108-
109-
// insertStreamTest();
72+
};
73+
insertStreamTest(); */
11074

111-
const pullFromToTest = async () => {
75+
/* const pullFromToTest = async () => {
11276
const data = await client.pullFromTo('TESTSTREAM', 'America/Los_Angeles', ['2022-05-18', '00', '00', '00'], ['2022-05-20', '00', '00', '00']);
11377
// console.log(data);
11478
}
11579
116-
pullFromTo();
117-
// const insertStreamTest = async () => {
118-
// const test = await client.insertStream('TestStream', [
119-
// { "name": "matt", "email": "123@mail.com", "age": 1000 },
120-
// { "name": "jonathan", "email": "234@mail.com", "age": 99 }
121-
// ]);
122-
// // console.log('returned array: ', test);
123-
// };
80+
pullFromTo(); */
81+
82+
//---------------------Test Stream Creation As-------------------
83+
84+
const createStreamAsTest = async () => {
85+
const queryId = await client.createStreamAs('TestAsStream', ['latitude', 'longitude'], 'riderLocations', {
86+
kafka_topic: 'TestAsStream',
87+
value_format: 'json',
88+
partitions: 1
89+
},
90+
'latitude > 37');
91+
console.log('this is the queryId: ', queryId);
92+
};
12493

125-
// insertStreamTest();
94+
createStreamAsTest();
12695

96+
// (streamName, selectColumns, sourceStream, propertiesObj, conditionsObj)
97+
// const defaultProps = {
98+
// kafka_topic: streamName,
99+
// value_format: 'json',
100+
// partitions: 1
101+
// };
127102

128103
//---------------------Test Inspect query status -------------------
129104
// const inspectQueryStatusTest = async () => {
@@ -208,4 +183,4 @@ pullFromTo();
208183
// // should return true
209184
// };
210185

211-
// isValidPropertyTest();
186+
// isValidPropertyTest();

0 commit comments

Comments
 (0)