Skip to content

Commit 8d0648c

Browse files
Jonathan LuuJonathan Luu
authored andcommitted
removed spacecheck from querybuilder, integration test for createtableas in progress
1 parent 7a82add commit 8d0648c

File tree

5 files changed

+91
-234
lines changed

5 files changed

+91
-234
lines changed

__tests__/integrationtests.js

Lines changed: 78 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -12,76 +12,76 @@ server using the command 'docker compose-up'. This will spin up a ksqlDB server
1212
// Once the ksqlDB server is running, tests can be run with terminal line: (npm test)
1313

1414
describe('--Integration Tests--', () => {
15-
/* describe('--Method Tests--', () => {
16-
beforeAll((done) => {
17-
client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });
18-
done();
19-
});
15+
// describe('--Method Tests--', () => {
16+
// beforeAll((done) => {
17+
// client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });
18+
// done();
19+
// });
2020

21-
afterAll(async () => {
22-
await client.ksql('DROP STREAM IF EXISTS TESTJESTSTREAM DELETE TOPIC;');
23-
})
21+
// afterAll(async () => {
22+
// await client.ksql('DROP STREAM IF EXISTS TESTJESTSTREAM DELETE TOPIC;');
23+
// })
2424

25-
it('.createStream properly creates a stream', async () => {
26-
const result = await client.createStream('TESTJESTSTREAM', ['name VARCHAR', 'email varchar', 'age INTEGER'], 'testJestTopic', 'json', 1);
27-
const streams = await client.ksql('LIST STREAMS;');
28-
const allStreams = streams.streams;
29-
let streamExists = false;
30-
for (let i = 0; i < allStreams.length; i++) {
31-
if (allStreams[i].name === "TESTJESTSTREAM") {
32-
streamExists = true;
33-
break;
34-
}
35-
}
36-
expect(streamExists).toEqual(true);
37-
})
25+
// it('.createStream properly creates a stream', async () => {
26+
// const result = await client.createStream('TESTJESTSTREAM', ['name VARCHAR', 'email varchar', 'age INTEGER'], 'testJestTopic', 'json', 1);
27+
// const streams = await client.ksql('LIST STREAMS;');
28+
// const allStreams = streams.streams;
29+
// let streamExists = false;
30+
// for (let i = 0; i < allStreams.length; i++) {
31+
// if (allStreams[i].name === "TESTJESTSTREAM") {
32+
// streamExists = true;
33+
// break;
34+
// }
35+
// }
36+
// expect(streamExists).toEqual(true);
37+
// })
3838

39-
it('.push properly creates a push query', () => {
40-
let pushActive = false;
41-
client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES LIMIT 1;', async (data) => {
42-
if (JSON.parse(data).queryId) {
43-
pushActive = true;
44-
}
45-
expect(pushActive).toEqual(true)
46-
});
47-
})
39+
// it('.push properly creates a push query', () => {
40+
// let pushActive = false;
41+
// client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES LIMIT 1;', async (data) => {
42+
// if (JSON.parse(data).queryId) {
43+
// pushActive = true;
44+
// }
45+
// expect(pushActive).toEqual(true)
46+
// });
47+
// })
4848

49-
it('.terminate properly terminates a push query', () => {
50-
client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES LIMIT 3;', async (data) => {
51-
const terminateRes = await client.terminate(JSON.parse(data).queryId);
52-
expect(terminateRes.wasTerminated).toEqual(true);
53-
})
54-
})
49+
// it('.terminate properly terminates a push query', () => {
50+
// client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES LIMIT 3;', async (data) => {
51+
// const terminateRes = await client.terminate(JSON.parse(data).queryId);
52+
// expect(terminateRes.wasTerminated).toEqual(true);
53+
// })
54+
// })
5555

56-
it('.insertStream properly inserts a row into a stream', async () => {
57-
const response = await client.insertStream('TESTJESTSTREAM', [
58-
{ "name": "stab-rabbit", "email": "123@mail.com", "age": 100 }
59-
]);
60-
console.log(response);
61-
const data = [];
62-
await client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES;', async (chunk) => {
63-
data.push(JSON.parse(chunk));
64-
if (data[1]) {
65-
client.terminate(data[0].queryId);
66-
expect(data[1]).toEqual(["stab-rabbit", "123@mail.com", 100])
67-
}
68-
});
69-
})
56+
// it('.insertStream properly inserts a row into a stream', async () => {
57+
// const response = await client.insertStream('TESTJESTSTREAM', [
58+
// { "name": "stab-rabbit", "email": "123@mail.com", "age": 100 }
59+
// ]);
60+
// console.log(response);
61+
// const data = [];
62+
// await client.push('SELECT * FROM TESTJESTSTREAM EMIT CHANGES;', async (chunk) => {
63+
// data.push(JSON.parse(chunk));
64+
// if (data[1]) {
65+
// client.terminate(data[0].queryId);
66+
// expect(data[1]).toEqual(["stab-rabbit", "123@mail.com", 100])
67+
// }
68+
// });
69+
// })
7070

71-
it('.pull receives the correct data from a pull query', async () => {
72-
const pullData = await client.pull("SELECT * FROM TESTJESTSTREAM;");
73-
console.log(pullData[1]);
74-
expect(pullData[1]).toEqual(["stab-rabbit", "123@mail.com", 100]);
75-
})
71+
// it('.pull receives the correct data from a pull query', async () => {
72+
// const pullData = await client.pull("SELECT * FROM TESTJESTSTREAM;");
73+
// console.log(pullData[1]);
74+
// expect(pullData[1]).toEqual(["stab-rabbit", "123@mail.com", 100]);
75+
// })
7676

77-
it('.pullFromTo receives all the data', async () => {
78-
const pullData = await client.pull("SELECT * FROM TESTJESTSTREAM;");
79-
const data = await client.pullFromTo('TESTJESTSTREAM', 'America/Los_Angeles', ['2022-01-01', '00', '00', '00']);
80-
const expectPullData = pullData[1];
81-
const expectData = data[0].slice(0, 3);
82-
expect(expectPullData).toEqual(expectData);
83-
})
84-
}) */
77+
// it('.pullFromTo receives all the data', async () => {
78+
// const pullData = await client.pull("SELECT * FROM TESTJESTSTREAM;");
79+
// const data = await client.pullFromTo('TESTJESTSTREAM', 'America/Los_Angeles', ['2022-01-01', '00', '00', '00']);
80+
// const expectPullData = pullData[1];
81+
// const expectData = data[0].slice(0, 3);
82+
// expect(expectPullData).toEqual(expectData);
83+
// })
84+
// })
8585

8686
describe('--Materialized Views Test--', () => {
8787
beforeAll( async () => {
@@ -106,24 +106,23 @@ describe('--Integration Tests--', () => {
106106
expect(tableCheck).toEqual(true);
107107

108108
})
109-
// it('materialized table view updates with source stream', async () => {
110-
// let rowCheck = false;
111-
// // push query for the table
112-
// await client.push('SELECT * FROM TABLEOFSTREAM EMIT CHANGES LIMIT 1;', async (data) => {
113-
// if (Array.isArray(JSON.parse(data))){
114-
// console.log(JSON.parse(data))
115-
// if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25 && JSON.parse(data)[2] === "Seattle" && JSON.parse(data)[3] === 130){
116-
// rowCheck = true;
117-
// }
118-
// expect(rowCheck).toEqual(true);
119-
// }
120-
// })
121-
// await client.insertStream('TESTJESTSTREAM', [{"NAME":"firstTester", "AGE":25, "LOCATION": "Seattle", "WEIGHT": 130}]);
122-
123-
// })
109+
it('materialized table view updates with source stream', async () => {
110+
let rowCheck = false;
111+
// push query for the table
112+
await client.push('SELECT * FROM TABLEOFSTREAM EMIT CHANGES LIMIT 1;', async (data) => {
113+
if (Array.isArray(JSON.parse(data))){
114+
console.log('PARSED DATA HERE', JSON.parse(data))
115+
if (JSON.parse(data)[0] === "firstTester" && JSON.parse(data)[1] === 25 && JSON.parse(data)[2] === "Seattle" && JSON.parse(data)[3] === 130){
116+
rowCheck = true;
117+
}
118+
}
119+
})
120+
await client.insertStream('TESTJESTSTREAM', [{"NAME":"firstTester", "AGE":25, "LOCATION": "Seattle", "WEIGHT": 130}]);
121+
await waitFor(() => expect(rowCheck).toEqual(true));
122+
})
124123
})
125124

126-
/* describe('--Health Tests--', () => {
125+
/* describe('--Health Tests--', () => {
127126
beforeAll((done) => {
128127
client = new ksqljs({ ksqldbURL: 'http://localhost:8088' });
129128
done();

ksqljs/ksqlJS.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ class ksqljs {
317317
partitions: 1
318318
};
319319
Object.assign(defaultProps, propertiesObj);
320+
320321
// if there's no properties Obj, assign them all default values
321322

322323
// expect user to input a conditions object of format {WHERE: condition, GROUP_BY: condition, HAVING: condition};

ksqljs/queryBuilder.js

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,7 @@ class queryBuilder {
3333
return param;
3434
case "object":
3535
if (Array.isArray(param)) {
36-
//check if spaces
37-
if (param[0].includes(" ")) {
38-
throw new InappropriateStringParamError("string params not wrapped in quotes should not include spaces");
39-
}
40-
else if (param[0].includes(";")) {
36+
if (param[0].includes(";")) {
4137
throw new InappropriateStringParamError("string params not wrapped in quotes should not include semi-colons");
4238
}
4339
return `${param[0].replaceAll("'", "''")}`

ksqljsTest.js

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,18 +87,19 @@ createTableTest(); */
8787
//---------------------Test Table Create As-------------------
8888
const createTableAsTest = async () => {
8989

90-
await client.ksql('CREATE STREAM RIDERLOCATIONS (PROFILEID VARCHAR, LATITUDE DOUBLE, LONGITUDE DOUBLE) WITH (KAFKA_TOPIC=\'locations\', value_format=\'json\', partitions=1);')
91-
await client.createTableAs('currentlocation', 'riderlocations', ['profileid','LATEST_BY_OFFSET(latitude) AS la', 'LATEST_BY_OFFSET(longitude) AS lo'], {}, {GROUP_BY: 'profileId'})
90+
// await client.ksql('CREATE STREAM RIDERLOCATIONS (PROFILEID VARCHAR, LATITUDE DOUBLE, LONGITUDE DOUBLE) WITH (KAFKA_TOPIC=\'locations\', value_format=\'json\', partitions=1);')
91+
// await client.createTableAs('currentlocation', 'riderlocations', ['profileid','LATEST_BY_OFFSET(latitude) AS la', 'LATEST_BY_OFFSET(longitude) AS lo'], {}, {GROUP_BY: 'profileId'})
9292
// let x;
93-
// await client.push('SELECT * FROM CURRENTLOCATION EMIT CHANGES LIMIT 1;', async (data) => {
94-
// console.log('push query data', JSON.parse(data))
93+
await client.push('SELECT * FROM CURRENTLOCATION EMIT CHANGES LIMIT 3;', async (data) => {
94+
console.log('push query data', JSON.parse(data))
95+
})
9596
// // x = await client.ksql('LIST QUERIES;');
9697
// // console.log('querylist', x.queries)
9798
// })
9899

99100
// console.log(await client.ksql('LIST QUERIES;'));
100101
// console.log('first x', x)
101-
await client.insertStream('RIDERLOCATIONS', [{'PROFILEID':'abc123abcddzzz', 'latitude':9999999,'longitude':999999}])
102+
// await client.insertStream('RIDERLOCATIONS', [{'PROFILEID':'abc123abcddzzz', 'latitude':9999999,'longitude':999999}])
102103
// console.log('second x', x)
103104
// console.log(await client.pull('SELECT * FROM TABLEOFSTREAM;'));
104105
}

0 commit comments

Comments
 (0)