Skip to content

[Node] Undici WebSocket & Diagnostics #621

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/nasty-steaks-yell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/node': minor
---

Switch to undici WebSocket for Dispatcher and diagnostics_channel support. This now adds support for the `ALL_PROXY` environment variable by default, as well as `WSS_PROXY` for websocket connections.
5 changes: 5 additions & 0 deletions .changeset/swift-waves-tease.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Preserve more details on websocket errors.
2 changes: 2 additions & 0 deletions demos/example-node/.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
BACKEND=http://localhost:6060
SYNC_SERVICE=http://localhost:8080
POWERSYNC_TOKEN=
POWERSYNC_DEBUG=1
5 changes: 3 additions & 2 deletions demos/example-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
"scripts": {
"build": "tsc -b",
"watch": "tsc -b -w",
"start": "node --loader ts-node/esm -r dotenv/config src/main.ts"
"start": "node --import ./register.mjs src/main.ts"
},
"dependencies": {
"@powersync/node": "workspace:*",
"dotenv": "^16.4.7"
"dotenv": "^16.4.7",
"undici": "^7.10.0"
},
"devDependencies": {
"ts-node": "^10.9.2",
Expand Down
6 changes: 6 additions & 0 deletions demos/example-node/register.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// For cli usage: node --import ./register.mjs src/main.ts
import { register } from 'node:module';
import { pathToFileURL } from 'node:url';
import 'dotenv/config';

register('ts-node/esm', pathToFileURL('./'));
151 changes: 151 additions & 0 deletions demos/example-node/src/UndiciDiagnostics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import * as diagnostics_channel from 'node:diagnostics_channel';
import type { DiagnosticsChannel } from 'undici';

/**
* Enable Undici diagnostics channel instrumentation for detailed connection and request logging.
*
* This includes fetch requests and websocket connections.
*
* Usage: enableUncidiDiagnostics();
*/
export function enableUncidiDiagnostics() {
new UndiciDiagnostics().enable();
}

class UndiciDiagnostics {
private requestCounter: number = 0;
private activeRequests: WeakMap<any, number> = new WeakMap();

enable() {
// Available events are documented here:
// https://github.com/nodejs/undici/blob/main/docs/docs/api/DiagnosticsChannel.md

diagnostics_channel.subscribe('undici:request:create', (message: DiagnosticsChannel.RequestCreateMessage) => {
const requestId = ++this.requestCounter;
const request = message.request;
this.activeRequests.set(message.request, requestId);

console.log(`πŸ”„ [DIAG-${requestId}] REQUEST CREATE:`, {
host: request.origin,
path: request.path,
method: request.method,
headers: formatHeaders(request.headers),
contentType: (request as any).contentType,
contentLength: (request as any).contentLength
});
});

diagnostics_channel.subscribe('undici:request:bodySent', (message: DiagnosticsChannel.RequestBodySentMessage) => {
const requestId = this.activeRequests.get(message.request);
console.log(`πŸ“€ [DIAG-${requestId}] REQUEST BODY SENT`);
});

diagnostics_channel.subscribe('undici:request:headers', (message: DiagnosticsChannel.RequestHeadersMessage) => {
const requestId = this.activeRequests.get(message.request);
console.log(`πŸ“₯ [DIAG-${requestId}] RESPONSE HEADERS:`, {
statusCode: message.response.statusCode,
statusText: message.response.statusText,
headers: formatHeaders(message.response.headers)
});
});

diagnostics_channel.subscribe('undici:request:trailers', (message: DiagnosticsChannel.RequestTrailersMessage) => {
const requestId = this.activeRequests.get(message.request);
console.log(`🏁 [DIAG-${requestId}] REQUEST TRAILERS:`, {
trailers: message.trailers
});
});

diagnostics_channel.subscribe('undici:request:error', (message: DiagnosticsChannel.RequestErrorMessage) => {
const requestId = this.activeRequests.get(message.request);
console.log(`❌ [DIAG-${requestId}] REQUEST ERROR:`, {
error: message.error
});

// Clean up tracking
this.activeRequests.delete(message.request);
});

// Client connection events
diagnostics_channel.subscribe(
'undici:client:sendHeaders',
(message: DiagnosticsChannel.ClientSendHeadersMessage) => {
console.log(`πŸ“‘ [DIAG] CLIENT SEND HEADERS:`, {
headers: formatHeaders(message.headers)
});
}
);

diagnostics_channel.subscribe(
'undici:client:beforeConnect',
(message: DiagnosticsChannel.ClientBeforeConnectMessage) => {
console.log(`πŸ”Œ [DIAG] CLIENT BEFORE CONNECT:`, {
connectParams: message.connectParams
});
}
);

diagnostics_channel.subscribe('undici:client:connected', (message: DiagnosticsChannel.ClientConnectedMessage) => {
console.log(`βœ… [DIAG] CLIENT CONNECTED:`, {
connectParams: message.connectParams,
connector: message.connector?.name,
socket: {
localAddress: message.socket?.localAddress,
localPort: message.socket?.localPort,
remoteAddress: message.socket?.remoteAddress,
remotePort: message.socket?.remotePort
}
});
});

diagnostics_channel.subscribe(
'undici:client:connectError',
(message: DiagnosticsChannel.ClientConnectErrorMessage) => {
console.log(`❌ [DIAG] CLIENT CONNECT ERROR:`, {
connectParams: message.connectParams,
error: message.error
});
}
);

// WebSocket events
diagnostics_channel.subscribe('undici:websocket:open', (message: any) => {
console.log(`🌐 [DIAG] WEBSOCKET OPEN:`, {
address: message.address,
protocol: message.protocol,
extensions: message.extensions
});
});

diagnostics_channel.subscribe('undici:websocket:close', (message: any) => {
console.log(`🌐 [DIAG] WEBSOCKET CLOSE:`, {
websocket: message.websocket?.url,
code: message.code,
reason: message.reason
});
});

diagnostics_channel.subscribe('undici:websocket:socket_error', (message: any) => {
console.log(`❌ [DIAG] WEBSOCKET SOCKET ERROR:`, {
websocket: message.websocket?.url,
error: message.error
});
});
}
}

function formatHeaders(headers: any[] | string | undefined) {
if (typeof headers === 'string') {
return headers;
}

return headers?.map((header) => {
if (typeof header == 'string') {
return header;
} else if (Buffer.isBuffer(header)) {
return header.toString('utf-8');
} else {
return header;
}
});
}
30 changes: 27 additions & 3 deletions demos/example-node/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@ import repl_factory from 'node:repl';
import { createBaseLogger, createLogger, PowerSyncDatabase, SyncStreamConnectionMethod } from '@powersync/node';
import { exit } from 'node:process';
import { AppSchema, DemoConnector } from './powersync.js';
import { enableUncidiDiagnostics } from './UndiciDiagnostics.js';

const main = async () => {
const baseLogger = createBaseLogger();
const logger = createLogger('PowerSyncDemo');
baseLogger.useDefaults({ defaultLevel: logger.WARN });
const debug = process.env.POWERSYNC_DEBUG == '1';
baseLogger.useDefaults({ defaultLevel: debug ? logger.TRACE : logger.WARN });

if (!('BACKEND' in process.env) || !('SYNC_SERVICE' in process.env)) {
// Enable detailed request/response logging for debugging purposes.
if (debug) {
enableUncidiDiagnostics();
}

if (!('SYNC_SERVICE' in process.env)) {
console.warn(
'Set the BACKEND and SYNC_SERVICE environment variables to point to a sync service and a running demo backend.'
);
Expand All @@ -26,7 +33,24 @@ const main = async () => {
});
console.log(await db.get('SELECT powersync_rs_version();'));

await db.connect(new DemoConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
await db.connect(new DemoConnector(), {
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET
});
// Example using a proxy agent for more control over the connection:
// const proxyAgent = new (await import('undici')).ProxyAgent({
// uri: 'http://localhost:8080',
// requestTls: {
// ca: '<CA for the service>'
// },
// proxyTls: {
// ca: '<CA for the proxy>'
// }
// });
// await db.connect(new DemoConnector(), {
// connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET,
// dispatcher: proxyAgent
// });

await db.waitForFirstSync();
console.log('First sync complete!');

Expand Down
7 changes: 7 additions & 0 deletions demos/example-node/src/powersync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ import { AbstractPowerSyncDatabase, column, PowerSyncBackendConnector, Schema, T

export class DemoConnector implements PowerSyncBackendConnector {
async fetchCredentials() {
if (process.env.POWERSYNC_TOKEN) {
return {
endpoint: process.env.SYNC_SERVICE!,
token: process.env.POWERSYNC_TOKEN
};
}

const response = await fetch(`${process.env.BACKEND}/api/auth/token`);
if (response.status != 200) {
throw 'Could not fetch token';
Expand Down
19 changes: 5 additions & 14 deletions packages/common/src/client/sync/stream/AbstractRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,18 +312,12 @@ export abstract class AbstractRemote {
// automatically as a header.
const userAgent = this.getUserAgent();

let socketCreationError: Error | undefined;

const url = this.options.socketUrlTransformer(request.url);
const connector = new RSocketConnector({
transport: new WebsocketClientTransport({
url: this.options.socketUrlTransformer(request.url),
url,
wsCreator: (url) => {
const s = this.createSocket(url);
s.addEventListener('error', (e: Event) => {
socketCreationError = new Error('Failed to create connection to websocket: ', (e.target as any).url ?? '');
this.logger.warn('Socket error', e);
});
return s;
return this.createSocket(url);
}
}),
setup: {
Expand All @@ -347,11 +341,8 @@ export abstract class AbstractRemote {
try {
rsocket = await connector.connect();
} catch (ex) {
/**
* On React native the connection exception can be `undefined` this causes issues
* with detecting the exception inside async-mutex
*/
throw new Error(`Could not connect to PowerSync instance: ${JSON.stringify(ex ?? socketCreationError)}`);
this.logger.error(`Failed to connect WebSocket`, ex);
throw ex;
}

const stream = new DataStream({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,17 @@ export class WebsocketClientTransport implements ClientTransport {

const errorListener = (ev: ErrorEvent) => {
removeListeners();
reject(ev.error);
// We add a default error in that case.
if (ev.error != null) {
// undici typically provides an error object
reject(ev.error);
} else if (ev.message != null) {
// React Native typically does not provide an error object, but does provide a message
reject(new Error(`Failed to create websocket connection: ${ev.message}`));
} else {
// Browsers often provide no details at all
reject(new Error(`Failed to create websocket connection to ${this.url}`));
}
};

/**
Expand Down
4 changes: 1 addition & 3 deletions packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@
"async-lock": "^1.4.0",
"bson": "^6.6.0",
"comlink": "^4.4.2",
"proxy-agent": "^6.5.0",
"undici": "^7.8.0",
"ws": "^8.18.1"
"undici": "^7.10.0"
},
"devDependencies": {
"@powersync/drizzle-driver": "workspace:*",
Expand Down
13 changes: 3 additions & 10 deletions packages/node/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ import {
SQLOpenFactory
} from '@powersync/common';

import { NodeRemote } from '../sync/stream/NodeRemote.js';
import { NodeCustomConnectionOptions, NodeRemote } from '../sync/stream/NodeRemote.js';
import { NodeStreamingSyncImplementation } from '../sync/stream/NodeStreamingSyncImplementation.js';

import { Dispatcher } from 'undici';
import { BetterSQLite3DBAdapter } from './BetterSQLite3DBAdapter.js';
import { NodeSQLOpenOptions } from './options.js';

Expand All @@ -30,13 +29,7 @@ export type NodePowerSyncDatabaseOptions = PowerSyncDatabaseOptions & {
remoteOptions?: Partial<AbstractRemoteOptions>;
};

export type NodeAdditionalConnectionOptions = AdditionalConnectionOptions & {
/**
* Optional custom dispatcher for HTTP connections (e.g. using undici).
* Only used when the connection method is SyncStreamConnectionMethod.HTTP
*/
dispatcher?: Dispatcher;
};
export type NodeAdditionalConnectionOptions = AdditionalConnectionOptions & NodeCustomConnectionOptions;

export type NodePowerSyncConnectionOptions = PowerSyncConnectionOptions & NodeAdditionalConnectionOptions;

Expand Down Expand Up @@ -76,7 +69,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {

connect(
connector: PowerSyncBackendConnector,
options?: PowerSyncConnectionOptions & { dispatcher?: Dispatcher }
options?: PowerSyncConnectionOptions & NodeCustomConnectionOptions
): Promise<void> {
return super.connect(connector, options);
}
Expand Down
Loading