Skip to content

Commit e824520

Browse files
Initial Commit that contains about half of the client functionality
0 parents  commit e824520

File tree

263 files changed

+88385
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

263 files changed

+88385
-0
lines changed

docker-compose.yml

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
---
2+
version: "2"
3+
4+
services:
5+
zookeeper:
6+
image: confluentinc/cp-zookeeper:7.0.1
7+
hostname: zookeeper
8+
container_name: zookeeper
9+
ports:
10+
- "2181:2181"
11+
environment:
12+
ZOOKEEPER_CLIENT_PORT: 2181
13+
ZOOKEEPER_TICK_TIME: 2000
14+
15+
broker:
16+
image: confluentinc/cp-kafka:7.0.1
17+
hostname: broker
18+
container_name: broker
19+
depends_on:
20+
- zookeeper
21+
ports:
22+
- "29092:29092"
23+
environment:
24+
KAFKA_BROKER_ID: 1
25+
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
26+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
27+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
28+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
29+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
30+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
31+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
32+
33+
ksqldb-server:
34+
image: confluentinc/ksqldb-server:0.25.1
35+
hostname: ksqldb-server
36+
container_name: ksqldb-server
37+
depends_on:
38+
- broker
39+
ports:
40+
- "8088:8088"
41+
environment:
42+
KSQL_LISTENERS: http://0.0.0.0:8088
43+
KSQL_BOOTSTRAP_SERVERS: broker:9092
44+
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
45+
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
46+
47+
ksqldb-cli:
48+
image: confluentinc/ksqldb-cli:0.25.1
49+
container_name: ksqldb-cli
50+
depends_on:
51+
- broker
52+
- ksqldb-server
53+
entrypoint: /bin/sh
54+
tty: true

ksqljs/ksqlJS.js

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
const axios = require("axios");
2+
const http2 = require("http2");
3+
4+
const ksqljs = (ksqldbURL) => {
5+
return {
6+
//---------------------Pull queries (fetch a single batch of existing rows)-----------------
7+
pull: (query) => {
8+
return axios
9+
.post(ksqldbURL + "/query-stream", {
10+
sql: query,
11+
})
12+
.then((res) => res.data)
13+
.catch((error) => console.log(error));
14+
},
15+
//---------------------Push queries (continue to receive updates to stream-----------------
16+
push: (query, cb) => {
17+
let sentQueryId = false;
18+
let queryMetadata;
19+
const session = http2.connect(ksqldbURL);
20+
21+
session.on("error", (err) => console.error(err));
22+
23+
const req = session.request({
24+
":path": "/query-stream",
25+
":method": "POST",
26+
});
27+
28+
const reqBody = {
29+
sql: query,
30+
Accept: "application/json, application/vnd.ksqlapi.delimited.v1",
31+
};
32+
33+
req.write(JSON.stringify(reqBody), "utf8");
34+
req.end();
35+
req.setEncoding("utf8");
36+
37+
req.on("data", (chunk) => {
38+
if (!sentQueryId) {
39+
sentQueryId = true;
40+
queryMetadata = chunk;
41+
}
42+
cb(chunk);
43+
});
44+
45+
req.on("end", () => session.close());
46+
47+
return new Promise((resolve, reject) => {
48+
setTimeout(() => resolve(JSON.parse(queryMetadata)?.queryId), 1000);
49+
})
50+
},
51+
//---------------------Terminate existing push queries-----------------
52+
terminate: (queryId) => {
53+
return axios.post(ksqldbURL + '/close-query', {queryId: queryId})
54+
.then(res => res)
55+
.catch(error => console.log(error));
56+
},
57+
//---------------------List existing streams, tables, topics, and queries-----------------
58+
ksql: (query) => {
59+
return axios.post(ksqldbURL + '/ksql', {ksql: query})
60+
.then(res => res.data[0])
61+
.catch(error => console.log(error));
62+
},
63+
//---------------------Create streams-----------------
64+
createStream: (name, columnsType, topic, value_format = 'json', partitions=1, key) => {
65+
const columnsTypeString = columnsType.reduce((result, currentType) => result + ', ' + currentType);
66+
const query = `CREATE STREAM ${name} (${columnsTypeString}) WITH (kafka_topic='${topic}', value_format='${value_format}', partitions=${partitions});`;
67+
68+
axios.post(ksqldbURL + '/ksql', {ksql: query})
69+
.catch(error => console.log(error));
70+
},
71+
//---------------------Create tables-----------------
72+
createTable: (name, columnsType, topic, value_format = 'json') => {
73+
74+
}
75+
};
76+
};
77+
78+
module.exports = ksqljs;
79+

ksqljsTest.js

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
const ksqljs = require('./ksqlJS/ksqlJS.js');
2+
3+
const client = ksqljs('http://localhost:8088');
4+
let metadata;
5+
6+
//---------------------Test PUll Queries-------------------
7+
/* const pullTest = async () => {
8+
const result = await client.pull('SELECT * FROM usersStream;');
9+
console.log('this is the result', result);
10+
}
11+
12+
pullTest(); */
13+
14+
//---------------------Test Push Queries-------------------
15+
/* const pushTest = async () => {
16+
metadata = await client.push('SELECT * FROM usersStream EMIT CHANGES LIMIT 1;', (row) => console.log(row));
17+
console.log('this is the metadata returned ', metadata);
18+
};
19+
20+
pushTest(); */
21+
22+
//---------------------Test Termination of Queries-------------------
23+
/* const terminateTest = async () => {
24+
client.terminate(metadata);
25+
};
26+
27+
setTimeout(() => terminateTest(metadata), 2000); */
28+
29+
//---------------------Test List Queries-------------------
30+
/* const listQueries = async () => {
31+
console.log(await client.ksql('LIST QUERIES;'));
32+
console.log(await client.ksql('LIST STREAMS;'));
33+
console.log(await client.ksql('LIST TABLES;'));
34+
console.log(await client.ksql('LIST TOPICS;'));
35+
}
36+
37+
listQueries(); */
38+
39+
//---------------------Test Stream Creation-------------------
40+
/* const createStreamTest = () => {
41+
client.createStream('AnotherTestStream', ['name VARCHAR','email varchar','age INTEGER'], 'testTopic', 'json', 1);
42+
}
43+
44+
createStreamTest(); */
45+
46+
//---------------------Test Table Creation-------------------
47+
/* const createTableTest = () => {
48+
client.createTable();
49+
} */

node_modules/.package-lock.json

Lines changed: 185 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node_modules/asynckit/LICENSE

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)