Skip to content

Commit 4acd80b

Browse files
authored
Add a simple postgresql storage provider (turt2live#350)
* Add a simple postgresql storage provider * Fix queries * Add unit test * Run postgres containers concurrently * Log postgres for debugging workflow * Adjust timeouts * Upgrade dependencies? * Adjust timeouts again
1 parent cbbd9f8 commit 4acd80b

File tree

6 files changed

+1455
-845
lines changed

6 files changed

+1455
-845
lines changed

.github/workflows/static_analysis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,6 @@ jobs:
6363
- run: yarn install
6464
- uses: nick-invision/retry@v2
6565
with:
66-
max_attempts: 5
67-
timeout_minutes: 5
66+
max_attempts: 3
67+
timeout_minutes: 15
6868
command: yarn test

package.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
"docs": "jsdoc -c jsdoc.json -P package.json -u docs/tutorials",
1818
"build": "tsc --listEmittedFiles -p tsconfig-release.json",
1919
"lint": "eslint \"{src,test,examples}/**/*.ts\"",
20+
"lint:fix": "eslint \"{src,test,examples}/**/*.ts\" --fix",
2021
"test": "jest",
2122
"build:examples": "tsc -p tsconfig-examples.json",
2223
"example:bot": "yarn build:examples && node lib/examples/bot.js",
@@ -65,6 +66,7 @@
6566
"lru-cache": "^10.0.1",
6667
"mkdirp": "^3.0.1",
6768
"morgan": "^1.10.0",
69+
"postgres": "^3.4.1",
6870
"request": "^2.88.2",
6971
"request-promise": "^4.2.6",
7072
"sanitize-html": "^2.11.0"
@@ -73,6 +75,7 @@
7375
"@babel/core": "^7.23.2",
7476
"@babel/eslint-parser": "^7.22.15",
7577
"@babel/eslint-plugin": "^7.22.10",
78+
"@testcontainers/postgresql": "^10.2.2",
7679
"@types/async-lock": "^1.4.1",
7780
"@types/jest": "^29.5.6",
7881
"@types/lowdb": "^1.0.14",
@@ -93,6 +96,7 @@
9396
"matrix-mock-request": "^2.6.0",
9497
"simple-mock": "^0.8.0",
9598
"taffydb": "^2.7.3",
99+
"testcontainers": "^10.2.2",
96100
"tmp": "^0.2.1",
97101
"ts-jest": "^29.1.1",
98102
"typescript": "^5.2.2"

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ export * from "./storage/MemoryStorageProvider";
9393
export * from "./storage/SimpleFsStorageProvider";
9494
export * from "./storage/ICryptoStorageProvider";
9595
export * from "./storage/RustSdkCryptoStorageProvider";
96+
export * from "./storage/SimplePostgresStorageProvider";
9697

9798
// Strategies
9899
export * from "./strategies/AppserviceJoinRoomStrategy";
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
import * as postgres from "postgres";
2+
3+
import { IStorageProvider } from "./IStorageProvider";
4+
import { IAppserviceStorageProvider } from "./IAppserviceStorageProvider";
5+
import { IFilterInfo } from "../IFilter";
6+
7+
/**
8+
* A barebones postgresql storage provider. It is not efficient, but it does work.
9+
* @category Storage providers
10+
*/
11+
export class SimplePostgresStorageProvider implements IStorageProvider, IAppserviceStorageProvider {
12+
private readonly db: postgres.Sql;
13+
private readonly waitPromise: Promise<void>;
14+
private completedTransactions = [];
15+
16+
/**
17+
* Creates a new simple postgresql storage provider.
18+
* @param connectionString The `postgres://` connection string to use.
19+
* @param trackTransactionsInMemory True (default) to track all received appservice transactions rather than on disk.
20+
* @param maxInMemoryTransactions The maximum number of transactions to hold in memory before rotating the oldest out. Defaults to 20.
21+
*/
22+
constructor(connectionString: string, private trackTransactionsInMemory = true, private maxInMemoryTransactions = 20) {
23+
this.db = postgres(connectionString);
24+
25+
this.waitPromise = Promise.all([
26+
this.db`
27+
CREATE TABLE IF NOT EXISTS bot_metadata (key TEXT NOT NULL PRIMARY KEY, value TEXT);
28+
`,
29+
this.db`
30+
CREATE TABLE IF NOT EXISTS bot_kv (key TEXT NOT NULL PRIMARY KEY, value TEXT);
31+
`,
32+
this.db`
33+
CREATE TABLE IF NOT EXISTS appservice_users (user_id TEXT NOT NULL PRIMARY KEY, registered BOOLEAN NOT NULL);
34+
`,
35+
this.db`
36+
CREATE TABLE IF NOT EXISTS appservice_transactions (txn_id TEXT NOT NULL PRIMARY KEY, completed BOOLEAN NOT NULL);
37+
`,
38+
]).then();
39+
}
40+
41+
public async setSyncToken(token: string | null): Promise<any> {
42+
await this.waitPromise;
43+
return this.db`
44+
INSERT INTO bot_metadata (key, value) VALUES ('syncToken', ${token})
45+
ON CONFLICT (key) DO UPDATE SET value = ${token};
46+
`;
47+
}
48+
49+
public async getSyncToken(): Promise<string | null> {
50+
await this.waitPromise;
51+
return (await this.db`
52+
SELECT value FROM bot_metadata WHERE key = 'syncToken';
53+
`)[0]?.value;
54+
}
55+
56+
public async setFilter(filter: IFilterInfo): Promise<any> {
57+
await this.waitPromise;
58+
const filterStr = filter ? JSON.stringify(filter) : null;
59+
return this.db`
60+
INSERT INTO bot_metadata (key, value) VALUES ('filter', ${filterStr})
61+
ON CONFLICT (key) DO UPDATE SET value = ${filterStr};
62+
`;
63+
}
64+
65+
public async getFilter(): Promise<IFilterInfo> {
66+
await this.waitPromise;
67+
const value = (await this.db`
68+
SELECT value FROM bot_metadata WHERE key = 'filter';
69+
`)[0]?.value;
70+
return typeof value === "string" ? JSON.parse(value) : value;
71+
}
72+
73+
public async addRegisteredUser(userId: string): Promise<any> {
74+
await this.waitPromise;
75+
return this.db`
76+
INSERT INTO appservice_users (user_id, registered) VALUES (${userId}, TRUE)
77+
ON CONFLICT (user_id) DO UPDATE SET registered = TRUE;
78+
`;
79+
}
80+
81+
public async isUserRegistered(userId: string): Promise<boolean> {
82+
await this.waitPromise;
83+
return !!(await this.db`
84+
SELECT registered FROM appservice_users WHERE user_id = ${userId};
85+
`)[0]?.registered;
86+
}
87+
88+
public async setTransactionCompleted(transactionId: string): Promise<any> {
89+
await this.waitPromise;
90+
if (this.trackTransactionsInMemory) {
91+
if (this.completedTransactions.indexOf(transactionId) === -1) {
92+
this.completedTransactions.push(transactionId);
93+
}
94+
if (this.completedTransactions.length > this.maxInMemoryTransactions) {
95+
this.completedTransactions = this.completedTransactions.reverse().slice(0, this.maxInMemoryTransactions).reverse();
96+
}
97+
return;
98+
}
99+
100+
return this.db`
101+
INSERT INTO appservice_transactions (txn_id, completed) VALUES (${transactionId}, TRUE)
102+
ON CONFLICT (txn_id) DO UPDATE SET completed = TRUE;
103+
`;
104+
}
105+
106+
public async isTransactionCompleted(transactionId: string): Promise<boolean> {
107+
await this.waitPromise;
108+
if (this.trackTransactionsInMemory) {
109+
return this.completedTransactions.includes(transactionId);
110+
}
111+
112+
return (await this.db`
113+
SELECT completed FROM appservice_transactions WHERE txn_id = ${transactionId};
114+
`)[0]?.completed;
115+
}
116+
117+
public async readValue(key: string): Promise<string | null | undefined> {
118+
await this.waitPromise;
119+
return (await this.db`
120+
SELECT value FROM bot_kv WHERE key = ${key};
121+
`)[0]?.value;
122+
}
123+
124+
public async storeValue(key: string, value: string): Promise<void> {
125+
await this.waitPromise;
126+
return this.db`
127+
INSERT INTO bot_kv (key, value) VALUES (${key}, ${value})
128+
ON CONFLICT (key) DO UPDATE SET value = ${value};
129+
`.then();
130+
}
131+
132+
public storageForUser(userId: string): IStorageProvider {
133+
return new NamespacedPostgresProvider(userId, this);
134+
}
135+
}
136+
137+
/**
138+
* A namespaced storage provider that uses postgres to store information.
139+
* @category Storage providers
140+
*/
141+
class NamespacedPostgresProvider implements IStorageProvider {
142+
constructor(private prefix: string, private parent: SimplePostgresStorageProvider) {
143+
}
144+
145+
public setFilter(filter: IFilterInfo): Promise<any> | void {
146+
return this.parent.storeValue(`${this.prefix}_internal_filter`, JSON.stringify(filter));
147+
}
148+
149+
public async getFilter(): Promise<IFilterInfo> {
150+
return this.parent.readValue(`${this.prefix}_internal_filter`).then(r => r ? JSON.parse(r) : r);
151+
}
152+
153+
public setSyncToken(token: string | null): Promise<any> | void {
154+
return this.parent.storeValue(`${this.prefix}_internal_syncToken`, token ?? "");
155+
}
156+
157+
public async getSyncToken(): Promise<string> {
158+
return this.parent.readValue(`${this.prefix}_internal_syncToken`).then(r => r === "" ? null : r);
159+
}
160+
161+
public storeValue(key: string, value: string): Promise<any> | void {
162+
return this.parent.storeValue(`${this.prefix}_internal_kv_${key}`, value);
163+
}
164+
165+
public readValue(key: string): string | Promise<string | null | undefined> | null | undefined {
166+
return this.parent.readValue(`${this.prefix}_internal_kv_${key}`);
167+
}
168+
}

0 commit comments

Comments
 (0)