Skip to content

Commit 94cc02a

Browse files
refactor: native zlib support (discordjs#10316)
Revert "revert: refactor: native zlib support (discordjs#10314)" This reverts commit 4ea73bb. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
1 parent 17d4c78 commit 94cc02a

File tree

4 files changed

+158
-61
lines changed

4 files changed

+158
-61
lines changed

packages/ws/README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ const manager = new WebSocketManager({
5050
intents: 0, // for no intents
5151
rest,
5252
// uncomment if you have zlib-sync installed and want to use compression
53-
// compression: CompressionMethod.ZlibStream,
53+
// compression: CompressionMethod.ZlibSync,
54+
55+
// alternatively, we support compression using node's native `node:zlib` module:
56+
// compression: CompressionMethod.ZlibNative,
5457
});
5558

5659
manager.on(WebSocketShardEvents.Dispatch, (event) => {

packages/ws/src/utils/constants.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,19 @@ export enum Encoding {
1818
* Valid compression methods
1919
*/
2020
export enum CompressionMethod {
21-
ZlibStream = 'zlib-stream',
21+
ZlibNative,
22+
ZlibSync,
2223
}
2324

2425
export const DefaultDeviceProperty = `@discordjs/ws [VI]{{inject}}[/VI]` as `@discordjs/ws ${string}`;
2526

2627
const getDefaultSessionStore = lazy(() => new Collection<number, SessionInfo | null>());
2728

29+
export const CompressionParameterMap = {
30+
[CompressionMethod.ZlibNative]: 'zlib-stream',
31+
[CompressionMethod.ZlibSync]: 'zlib-stream',
32+
} as const satisfies Record<CompressionMethod, string>;
33+
2834
/**
2935
* Default options used by the manager
3036
*/
@@ -46,6 +52,7 @@ export const DefaultWebSocketManagerOptions = {
4652
version: APIVersion,
4753
encoding: Encoding.JSON,
4854
compression: null,
55+
useIdentifyCompression: false,
4956
retrieveSessionInfo(shardId) {
5057
const store = getDefaultSessionStore();
5158
return store.get(shardId) ?? null;

packages/ws/src/ws/WebSocketManager.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ export interface OptionalWebSocketManagerOptions {
9696
*/
9797
buildStrategy(manager: WebSocketManager): IShardingStrategy;
9898
/**
99-
* The compression method to use
99+
* The transport compression method to use - mutually exclusive with `useIdentifyCompression`
100100
*
101-
* @defaultValue `null` (no compression)
101+
* @defaultValue `null` (no transport compression)
102102
*/
103103
compression: CompressionMethod | null;
104104
/**
@@ -176,6 +176,12 @@ export interface OptionalWebSocketManagerOptions {
176176
* Function used to store session information for a given shard
177177
*/
178178
updateSessionInfo(shardId: number, sessionInfo: SessionInfo | null): Awaitable<void>;
179+
/**
180+
* Whether to use the `compress` option when identifying
181+
*
182+
* @defaultValue `false`
183+
*/
184+
useIdentifyCompression: boolean;
179185
/**
180186
* The gateway version to use
181187
*

packages/ws/src/ws/WebSocketShard.ts

Lines changed: 138 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
/* eslint-disable id-length */
21
import { Buffer } from 'node:buffer';
32
import { once } from 'node:events';
43
import { clearInterval, clearTimeout, setInterval, setTimeout } from 'node:timers';
54
import { setTimeout as sleep } from 'node:timers/promises';
65
import { URLSearchParams } from 'node:url';
76
import { TextDecoder } from 'node:util';
8-
import { inflate } from 'node:zlib';
7+
import type * as nativeZlib from 'node:zlib';
98
import { Collection } from '@discordjs/collection';
109
import { lazy, shouldUseGlobalFetchAndWebSocket } from '@discordjs/util';
1110
import { AsyncQueue } from '@sapphire/async-queue';
@@ -21,13 +20,20 @@ import {
2120
type GatewaySendPayload,
2221
} from 'discord-api-types/v10';
2322
import { WebSocket, type Data } from 'ws';
24-
import type { Inflate } from 'zlib-sync';
25-
import type { IContextFetchingStrategy } from '../strategies/context/IContextFetchingStrategy.js';
26-
import { ImportantGatewayOpcodes, getInitialSendRateLimitState } from '../utils/constants.js';
23+
import type * as ZlibSync from 'zlib-sync';
24+
import type { IContextFetchingStrategy } from '../strategies/context/IContextFetchingStrategy';
25+
import {
26+
CompressionMethod,
27+
CompressionParameterMap,
28+
ImportantGatewayOpcodes,
29+
getInitialSendRateLimitState,
30+
} from '../utils/constants.js';
2731
import type { SessionInfo } from './WebSocketManager.js';
2832

29-
// eslint-disable-next-line promise/prefer-await-to-then
33+
/* eslint-disable promise/prefer-await-to-then */
3034
const getZlibSync = lazy(async () => import('zlib-sync').then((mod) => mod.default).catch(() => null));
35+
const getNativeZlib = lazy(async () => import('node:zlib').then((mod) => mod).catch(() => null));
36+
/* eslint-enable promise/prefer-await-to-then */
3137

3238
export enum WebSocketShardEvents {
3339
Closed = 'closed',
@@ -86,9 +92,9 @@ const WebSocketConstructor: typeof WebSocket = shouldUseGlobalFetchAndWebSocket(
8692
export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
8793
private connection: WebSocket | null = null;
8894

89-
private useIdentifyCompress = false;
95+
private nativeInflate: nativeZlib.Inflate | null = null;
9096

91-
private inflate: Inflate | null = null;
97+
private zLibSyncInflate: ZlibSync.Inflate | null = null;
9298

9399
private readonly textDecoder = new TextDecoder();
94100

@@ -120,6 +126,18 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
120126

121127
#status: WebSocketShardStatus = WebSocketShardStatus.Idle;
122128

129+
private identifyCompressionEnabled = false;
130+
131+
/**
132+
* @privateRemarks
133+
*
134+
* This is needed because `this.strategy.options.compression` is not an actual reflection of the compression method
135+
* used, but rather the compression method that the user wants to use. This is because the libraries could just be missing.
136+
*/
137+
private get transportCompressionEnabled() {
138+
return this.strategy.options.compression !== null && (this.nativeInflate ?? this.zLibSyncInflate) !== null;
139+
}
140+
123141
public get status(): WebSocketShardStatus {
124142
return this.#status;
125143
}
@@ -161,21 +179,63 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
161179
throw new Error("Tried to connect a shard that wasn't idle");
162180
}
163181

164-
const { version, encoding, compression } = this.strategy.options;
182+
const { version, encoding, compression, useIdentifyCompression } = this.strategy.options;
183+
this.identifyCompressionEnabled = useIdentifyCompression;
184+
185+
// eslint-disable-next-line id-length
165186
const params = new URLSearchParams({ v: version, encoding });
166-
if (compression) {
167-
const zlib = await getZlibSync();
168-
if (zlib) {
169-
params.append('compress', compression);
170-
this.inflate = new zlib.Inflate({
171-
chunkSize: 65_535,
172-
to: 'string',
173-
});
174-
} else if (!this.useIdentifyCompress) {
175-
this.useIdentifyCompress = true;
176-
console.warn(
177-
'WebSocketShard: Compression is enabled but zlib-sync is not installed, falling back to identify compress',
178-
);
187+
if (compression !== null) {
188+
if (useIdentifyCompression) {
189+
console.warn('WebSocketShard: transport compression is enabled, disabling identify compression');
190+
this.identifyCompressionEnabled = false;
191+
}
192+
193+
params.append('compress', CompressionParameterMap[compression]);
194+
195+
switch (compression) {
196+
case CompressionMethod.ZlibNative: {
197+
const zlib = await getNativeZlib();
198+
if (zlib) {
199+
const inflate = zlib.createInflate({
200+
chunkSize: 65_535,
201+
flush: zlib.constants.Z_SYNC_FLUSH,
202+
});
203+
204+
inflate.on('error', (error) => {
205+
this.emit(WebSocketShardEvents.Error, { error });
206+
});
207+
208+
this.nativeInflate = inflate;
209+
} else {
210+
console.warn('WebSocketShard: Compression is set to native but node:zlib is not available.');
211+
params.delete('compress');
212+
}
213+
214+
break;
215+
}
216+
217+
case CompressionMethod.ZlibSync: {
218+
const zlib = await getZlibSync();
219+
if (zlib) {
220+
this.zLibSyncInflate = new zlib.Inflate({
221+
chunkSize: 65_535,
222+
to: 'string',
223+
});
224+
} else {
225+
console.warn('WebSocketShard: Compression is set to zlib-sync, but it is not installed.');
226+
params.delete('compress');
227+
}
228+
229+
break;
230+
}
231+
}
232+
}
233+
234+
if (this.identifyCompressionEnabled) {
235+
const zlib = await getNativeZlib();
236+
if (!zlib) {
237+
console.warn('WebSocketShard: Identify compression is enabled, but node:zlib is not available.');
238+
this.identifyCompressionEnabled = false;
179239
}
180240
}
181241

@@ -451,28 +511,29 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
451511
`shard id: ${this.id.toString()}`,
452512
`shard count: ${this.strategy.options.shardCount}`,
453513
`intents: ${this.strategy.options.intents}`,
454-
`compression: ${this.inflate ? 'zlib-stream' : this.useIdentifyCompress ? 'identify' : 'none'}`,
514+
`compression: ${this.transportCompressionEnabled ? CompressionParameterMap[this.strategy.options.compression!] : this.identifyCompressionEnabled ? 'identify' : 'none'}`,
455515
]);
456516

457-
const d: GatewayIdentifyData = {
517+
const data: GatewayIdentifyData = {
458518
token: this.strategy.options.token,
459519
properties: this.strategy.options.identifyProperties,
460520
intents: this.strategy.options.intents,
461-
compress: this.useIdentifyCompress,
521+
compress: this.identifyCompressionEnabled,
462522
shard: [this.id, this.strategy.options.shardCount],
463523
};
464524

465525
if (this.strategy.options.largeThreshold) {
466-
d.large_threshold = this.strategy.options.largeThreshold;
526+
data.large_threshold = this.strategy.options.largeThreshold;
467527
}
468528

469529
if (this.strategy.options.initialPresence) {
470-
d.presence = this.strategy.options.initialPresence;
530+
data.presence = this.strategy.options.initialPresence;
471531
}
472532

473533
await this.send({
474534
op: GatewayOpcodes.Identify,
475-
d,
535+
// eslint-disable-next-line id-length
536+
d: data,
476537
});
477538

478539
await this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout);
@@ -490,6 +551,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
490551
this.replayedEvents = 0;
491552
return this.send({
492553
op: GatewayOpcodes.Resume,
554+
// eslint-disable-next-line id-length
493555
d: {
494556
token: this.strategy.options.token,
495557
seq: session.sequence,
@@ -507,13 +569,22 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
507569

508570
await this.send({
509571
op: GatewayOpcodes.Heartbeat,
572+
// eslint-disable-next-line id-length
510573
d: session?.sequence ?? null,
511574
});
512575

513576
this.lastHeartbeatAt = Date.now();
514577
this.isAck = false;
515578
}
516579

580+
private parseInflateResult(result: any): GatewayReceivePayload | null {
581+
if (!result) {
582+
return null;
583+
}
584+
585+
return JSON.parse(typeof result === 'string' ? result : this.textDecoder.decode(result)) as GatewayReceivePayload;
586+
}
587+
517588
private async unpackMessage(data: Data, isBinary: boolean): Promise<GatewayReceivePayload | null> {
518589
// Deal with no compression
519590
if (!isBinary) {
@@ -528,10 +599,12 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
528599
const decompressable = new Uint8Array(data as ArrayBuffer);
529600

530601
// Deal with identify compress
531-
if (this.useIdentifyCompress) {
532-
return new Promise((resolve, reject) => {
602+
if (this.identifyCompressionEnabled) {
603+
// eslint-disable-next-line no-async-promise-executor
604+
return new Promise(async (resolve, reject) => {
605+
const zlib = (await getNativeZlib())!;
533606
// eslint-disable-next-line promise/prefer-await-to-callbacks
534-
inflate(decompressable, { chunkSize: 65_535 }, (err, result) => {
607+
zlib.inflate(decompressable, { chunkSize: 65_535 }, (err, result) => {
535608
if (err) {
536609
reject(err);
537610
return;
@@ -542,42 +615,50 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
542615
});
543616
}
544617

545-
// Deal with gw wide zlib-stream compression
546-
if (this.inflate) {
547-
const l = decompressable.length;
618+
// Deal with transport compression
619+
if (this.transportCompressionEnabled) {
548620
const flush =
549-
l >= 4 &&
550-
decompressable[l - 4] === 0x00 &&
551-
decompressable[l - 3] === 0x00 &&
552-
decompressable[l - 2] === 0xff &&
553-
decompressable[l - 1] === 0xff;
621+
decompressable.length >= 4 &&
622+
decompressable.at(-4) === 0x00 &&
623+
decompressable.at(-3) === 0x00 &&
624+
decompressable.at(-2) === 0xff &&
625+
decompressable.at(-1) === 0xff;
554626

555-
const zlib = (await getZlibSync())!;
556-
this.inflate.push(Buffer.from(decompressable), flush ? zlib.Z_SYNC_FLUSH : zlib.Z_NO_FLUSH);
627+
if (this.nativeInflate) {
628+
this.nativeInflate.write(decompressable, 'binary');
557629

558-
if (this.inflate.err) {
559-
this.emit(WebSocketShardEvents.Error, {
560-
error: new Error(`${this.inflate.err}${this.inflate.msg ? `: ${this.inflate.msg}` : ''}`),
561-
});
562-
}
630+
if (!flush) {
631+
return null;
632+
}
563633

564-
if (!flush) {
565-
return null;
566-
}
634+
const [result] = await once(this.nativeInflate, 'data');
635+
return this.parseInflateResult(result);
636+
} else if (this.zLibSyncInflate) {
637+
const zLibSync = (await getZlibSync())!;
638+
this.zLibSyncInflate.push(Buffer.from(decompressable), flush ? zLibSync.Z_SYNC_FLUSH : zLibSync.Z_NO_FLUSH);
639+
640+
if (this.zLibSyncInflate.err) {
641+
this.emit(WebSocketShardEvents.Error, {
642+
error: new Error(
643+
`${this.zLibSyncInflate.err}${this.zLibSyncInflate.msg ? `: ${this.zLibSyncInflate.msg}` : ''}`,
644+
),
645+
});
646+
}
567647

568-
const { result } = this.inflate;
569-
if (!result) {
570-
return null;
571-
}
648+
if (!flush) {
649+
return null;
650+
}
572651

573-
return JSON.parse(typeof result === 'string' ? result : this.textDecoder.decode(result)) as GatewayReceivePayload;
652+
const { result } = this.zLibSyncInflate;
653+
return this.parseInflateResult(result);
654+
}
574655
}
575656

576657
this.debug([
577658
'Received a message we were unable to decompress',
578659
`isBinary: ${isBinary.toString()}`,
579-
`useIdentifyCompress: ${this.useIdentifyCompress.toString()}`,
580-
`inflate: ${Boolean(this.inflate).toString()}`,
660+
`identifyCompressionEnabled: ${this.identifyCompressionEnabled.toString()}`,
661+
`inflate: ${this.transportCompressionEnabled ? CompressionMethod[this.strategy.options.compression!] : 'none'}`,
581662
]);
582663

583664
return null;
@@ -838,7 +919,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
838919
messages.length > 1
839920
? `\n${messages
840921
.slice(1)
841-
.map((m) => ` ${m}`)
922+
.map((message) => ` ${message}`)
842923
.join('\n')}`
843924
: ''
844925
}`;

0 commit comments

Comments
 (0)