1
+ /* eslint-disable id-length */
1
2
import { Buffer } from 'node:buffer' ;
2
3
import { once } from 'node:events' ;
3
4
import { clearInterval , clearTimeout , setInterval , setTimeout } from 'node:timers' ;
4
5
import { setTimeout as sleep } from 'node:timers/promises' ;
5
6
import { URLSearchParams } from 'node:url' ;
6
7
import { TextDecoder } from 'node:util' ;
7
- import type * as nativeZlib from 'node:zlib' ;
8
+ import { inflate } from 'node:zlib' ;
8
9
import { Collection } from '@discordjs/collection' ;
9
10
import { lazy , shouldUseGlobalFetchAndWebSocket } from '@discordjs/util' ;
10
11
import { AsyncQueue } from '@sapphire/async-queue' ;
@@ -20,20 +21,13 @@ import {
20
21
type GatewaySendPayload ,
21
22
} from 'discord-api-types/v10' ;
22
23
import { WebSocket , type Data } from 'ws' ;
23
- import type * as ZlibSync from 'zlib-sync' ;
24
- import type { IContextFetchingStrategy } from '../strategies/context/IContextFetchingStrategy' ;
25
- import {
26
- CompressionMethod ,
27
- CompressionParameterMap ,
28
- ImportantGatewayOpcodes ,
29
- getInitialSendRateLimitState ,
30
- } from '../utils/constants.js' ;
24
+ import type { Inflate } from 'zlib-sync' ;
25
+ import type { IContextFetchingStrategy } from '../strategies/context/IContextFetchingStrategy.js' ;
26
+ import { ImportantGatewayOpcodes , getInitialSendRateLimitState } from '../utils/constants.js' ;
31
27
import type { SessionInfo } from './WebSocketManager.js' ;
32
28
33
- /* eslint-disable promise/prefer-await-to-then */
29
+ // eslint-disable-next-line promise/prefer-await-to-then
34
30
const getZlibSync = lazy ( async ( ) => import ( 'zlib-sync' ) . then ( ( mod ) => mod . default ) . catch ( ( ) => null ) ) ;
35
- const getNativeZlib = lazy ( async ( ) => import ( 'node:zlib' ) . then ( ( mod ) => mod ) . catch ( ( ) => null ) ) ;
36
- /* eslint-enable promise/prefer-await-to-then */
37
31
38
32
export enum WebSocketShardEvents {
39
33
Closed = 'closed' ,
@@ -92,9 +86,9 @@ const WebSocketConstructor: typeof WebSocket = shouldUseGlobalFetchAndWebSocket(
92
86
export class WebSocketShard extends AsyncEventEmitter < WebSocketShardEventsMap > {
93
87
private connection : WebSocket | null = null ;
94
88
95
- private nativeInflate : nativeZlib . Inflate | null = null ;
89
+ private useIdentifyCompress = false ;
96
90
97
- private zLibSyncInflate : ZlibSync . Inflate | null = null ;
91
+ private inflate : Inflate | null = null ;
98
92
99
93
private readonly textDecoder = new TextDecoder ( ) ;
100
94
@@ -126,18 +120,6 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
126
120
127
121
#status: WebSocketShardStatus = WebSocketShardStatus . Idle ;
128
122
129
- private identifyCompressionEnabled = false ;
130
-
131
- /**
132
- * @privateRemarks
133
- *
134
- * This is needed because `this.strategy.options.compression` is not an actual reflection of the compression method
135
- * used, but rather the compression method that the user wants to use. This is because the libraries could just be missing.
136
- */
137
- private get transportCompressionEnabled ( ) {
138
- return this . strategy . options . compression !== null && ( this . nativeInflate ?? this . zLibSyncInflate ) !== null ;
139
- }
140
-
141
123
public get status ( ) : WebSocketShardStatus {
142
124
return this . #status;
143
125
}
@@ -179,63 +161,21 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
179
161
throw new Error ( "Tried to connect a shard that wasn't idle" ) ;
180
162
}
181
163
182
- const { version, encoding, compression, useIdentifyCompression } = this . strategy . options ;
183
- this . identifyCompressionEnabled = useIdentifyCompression ;
184
-
185
- // eslint-disable-next-line id-length
164
+ const { version, encoding, compression } = this . strategy . options ;
186
165
const params = new URLSearchParams ( { v : version , encoding } ) ;
187
- if ( compression !== null ) {
188
- if ( useIdentifyCompression ) {
189
- console . warn ( 'WebSocketShard: transport compression is enabled, disabling identify compression' ) ;
190
- this . identifyCompressionEnabled = false ;
191
- }
192
-
193
- params . append ( 'compress' , CompressionParameterMap [ compression ] ) ;
194
-
195
- switch ( compression ) {
196
- case CompressionMethod . ZlibNative : {
197
- const zlib = await getNativeZlib ( ) ;
198
- if ( zlib ) {
199
- const inflate = zlib . createInflate ( {
200
- chunkSize : 65_535 ,
201
- flush : zlib . constants . Z_SYNC_FLUSH ,
202
- } ) ;
203
-
204
- inflate . on ( 'error' , ( error ) => {
205
- this . emit ( WebSocketShardEvents . Error , { error } ) ;
206
- } ) ;
207
-
208
- this . nativeInflate = inflate ;
209
- } else {
210
- console . warn ( 'WebSocketShard: Compression is set to native but node:zlib is not available.' ) ;
211
- params . delete ( 'compress' ) ;
212
- }
213
-
214
- break ;
215
- }
216
-
217
- case CompressionMethod . ZlibSync : {
218
- const zlib = await getZlibSync ( ) ;
219
- if ( zlib ) {
220
- this . zLibSyncInflate = new zlib . Inflate ( {
221
- chunkSize : 65_535 ,
222
- to : 'string' ,
223
- } ) ;
224
- } else {
225
- console . warn ( 'WebSocketShard: Compression is set to zlib-sync, but it is not installed.' ) ;
226
- params . delete ( 'compress' ) ;
227
- }
228
-
229
- break ;
230
- }
231
- }
232
- }
233
-
234
- if ( this . identifyCompressionEnabled ) {
235
- const zlib = await getNativeZlib ( ) ;
236
- if ( ! zlib ) {
237
- console . warn ( 'WebSocketShard: Identify compression is enabled, but node:zlib is not available.' ) ;
238
- this . identifyCompressionEnabled = false ;
166
+ if ( compression ) {
167
+ const zlib = await getZlibSync ( ) ;
168
+ if ( zlib ) {
169
+ params . append ( 'compress' , compression ) ;
170
+ this . inflate = new zlib . Inflate ( {
171
+ chunkSize : 65_535 ,
172
+ to : 'string' ,
173
+ } ) ;
174
+ } else if ( ! this . useIdentifyCompress ) {
175
+ this . useIdentifyCompress = true ;
176
+ console . warn (
177
+ 'WebSocketShard: Compression is enabled but zlib-sync is not installed, falling back to identify compress' ,
178
+ ) ;
239
179
}
240
180
}
241
181
@@ -511,29 +451,28 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
511
451
`shard id: ${ this . id . toString ( ) } ` ,
512
452
`shard count: ${ this . strategy . options . shardCount } ` ,
513
453
`intents: ${ this . strategy . options . intents } ` ,
514
- `compression: ${ this . transportCompressionEnabled ? CompressionParameterMap [ this . strategy . options . compression ! ] : this . identifyCompressionEnabled ? 'identify' : 'none' } ` ,
454
+ `compression: ${ this . inflate ? 'zlib-stream' : this . useIdentifyCompress ? 'identify' : 'none' } ` ,
515
455
] ) ;
516
456
517
- const data : GatewayIdentifyData = {
457
+ const d : GatewayIdentifyData = {
518
458
token : this . strategy . options . token ,
519
459
properties : this . strategy . options . identifyProperties ,
520
460
intents : this . strategy . options . intents ,
521
- compress : this . identifyCompressionEnabled ,
461
+ compress : this . useIdentifyCompress ,
522
462
shard : [ this . id , this . strategy . options . shardCount ] ,
523
463
} ;
524
464
525
465
if ( this . strategy . options . largeThreshold ) {
526
- data . large_threshold = this . strategy . options . largeThreshold ;
466
+ d . large_threshold = this . strategy . options . largeThreshold ;
527
467
}
528
468
529
469
if ( this . strategy . options . initialPresence ) {
530
- data . presence = this . strategy . options . initialPresence ;
470
+ d . presence = this . strategy . options . initialPresence ;
531
471
}
532
472
533
473
await this . send ( {
534
474
op : GatewayOpcodes . Identify ,
535
- // eslint-disable-next-line id-length
536
- d : data ,
475
+ d,
537
476
} ) ;
538
477
539
478
await this . waitForEvent ( WebSocketShardEvents . Ready , this . strategy . options . readyTimeout ) ;
@@ -551,7 +490,6 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
551
490
this . replayedEvents = 0 ;
552
491
return this . send ( {
553
492
op : GatewayOpcodes . Resume ,
554
- // eslint-disable-next-line id-length
555
493
d : {
556
494
token : this . strategy . options . token ,
557
495
seq : session . sequence ,
@@ -569,22 +507,13 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
569
507
570
508
await this . send ( {
571
509
op : GatewayOpcodes . Heartbeat ,
572
- // eslint-disable-next-line id-length
573
510
d : session ?. sequence ?? null ,
574
511
} ) ;
575
512
576
513
this . lastHeartbeatAt = Date . now ( ) ;
577
514
this . isAck = false ;
578
515
}
579
516
580
- private parseInflateResult ( result : any ) : GatewayReceivePayload | null {
581
- if ( ! result ) {
582
- return null ;
583
- }
584
-
585
- return JSON . parse ( typeof result === 'string' ? result : this . textDecoder . decode ( result ) ) as GatewayReceivePayload ;
586
- }
587
-
588
517
private async unpackMessage ( data : Data , isBinary : boolean ) : Promise < GatewayReceivePayload | null > {
589
518
// Deal with no compression
590
519
if ( ! isBinary ) {
@@ -599,12 +528,10 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
599
528
const decompressable = new Uint8Array ( data as ArrayBuffer ) ;
600
529
601
530
// Deal with identify compress
602
- if ( this . identifyCompressionEnabled ) {
603
- // eslint-disable-next-line no-async-promise-executor
604
- return new Promise ( async ( resolve , reject ) => {
605
- const zlib = ( await getNativeZlib ( ) ) ! ;
531
+ if ( this . useIdentifyCompress ) {
532
+ return new Promise ( ( resolve , reject ) => {
606
533
// eslint-disable-next-line promise/prefer-await-to-callbacks
607
- zlib . inflate ( decompressable , { chunkSize : 65_535 } , ( err , result ) => {
534
+ inflate ( decompressable , { chunkSize : 65_535 } , ( err , result ) => {
608
535
if ( err ) {
609
536
reject ( err ) ;
610
537
return ;
@@ -615,50 +542,42 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
615
542
} ) ;
616
543
}
617
544
618
- // Deal with transport compression
619
- if ( this . transportCompressionEnabled ) {
545
+ // Deal with gw wide zlib-stream compression
546
+ if ( this . inflate ) {
547
+ const l = decompressable . length ;
620
548
const flush =
621
- decompressable . length >= 4 &&
622
- decompressable . at ( - 4 ) === 0x00 &&
623
- decompressable . at ( - 3 ) === 0x00 &&
624
- decompressable . at ( - 2 ) === 0xff &&
625
- decompressable . at ( - 1 ) === 0xff ;
549
+ l >= 4 &&
550
+ decompressable [ l - 4 ] === 0x00 &&
551
+ decompressable [ l - 3 ] === 0x00 &&
552
+ decompressable [ l - 2 ] === 0xff &&
553
+ decompressable [ l - 1 ] === 0xff ;
626
554
627
- if ( this . nativeInflate ) {
628
- this . nativeInflate . write ( decompressable , 'binary' ) ;
555
+ const zlib = ( await getZlibSync ( ) ) ! ;
556
+ this . inflate . push ( Buffer . from ( decompressable ) , flush ? zlib . Z_SYNC_FLUSH : zlib . Z_NO_FLUSH ) ;
629
557
630
- if ( ! flush ) {
631
- return null ;
632
- }
633
-
634
- const [ result ] = await once ( this . nativeInflate , 'data' ) ;
635
- return this . parseInflateResult ( result ) ;
636
- } else if ( this . zLibSyncInflate ) {
637
- const zLibSync = ( await getZlibSync ( ) ) ! ;
638
- this . zLibSyncInflate . push ( Buffer . from ( decompressable ) , flush ? zLibSync . Z_SYNC_FLUSH : zLibSync . Z_NO_FLUSH ) ;
639
-
640
- if ( this . zLibSyncInflate . err ) {
641
- this . emit ( WebSocketShardEvents . Error , {
642
- error : new Error (
643
- `${ this . zLibSyncInflate . err } ${ this . zLibSyncInflate . msg ? `: ${ this . zLibSyncInflate . msg } ` : '' } ` ,
644
- ) ,
645
- } ) ;
646
- }
558
+ if ( this . inflate . err ) {
559
+ this . emit ( WebSocketShardEvents . Error , {
560
+ error : new Error ( `${ this . inflate . err } ${ this . inflate . msg ? `: ${ this . inflate . msg } ` : '' } ` ) ,
561
+ } ) ;
562
+ }
647
563
648
- if ( ! flush ) {
649
- return null ;
650
- }
564
+ if ( ! flush ) {
565
+ return null ;
566
+ }
651
567
652
- const { result } = this . zLibSyncInflate ;
653
- return this . parseInflateResult ( result ) ;
568
+ const { result } = this . inflate ;
569
+ if ( ! result ) {
570
+ return null ;
654
571
}
572
+
573
+ return JSON . parse ( typeof result === 'string' ? result : this . textDecoder . decode ( result ) ) as GatewayReceivePayload ;
655
574
}
656
575
657
576
this . debug ( [
658
577
'Received a message we were unable to decompress' ,
659
578
`isBinary: ${ isBinary . toString ( ) } ` ,
660
- `identifyCompressionEnabled : ${ this . identifyCompressionEnabled . toString ( ) } ` ,
661
- `inflate: ${ this . transportCompressionEnabled ? CompressionMethod [ this . strategy . options . compression ! ] : 'none' } ` ,
579
+ `useIdentifyCompress : ${ this . useIdentifyCompress . toString ( ) } ` ,
580
+ `inflate: ${ Boolean ( this . inflate ) . toString ( ) } ` ,
662
581
] ) ;
663
582
664
583
return null ;
@@ -919,7 +838,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
919
838
messages . length > 1
920
839
? `\n${ messages
921
840
. slice ( 1 )
922
- . map ( ( message ) => ` ${ message } ` )
841
+ . map ( ( m ) => ` ${ m } ` )
923
842
. join ( '\n' ) } `
924
843
: ''
925
844
} `;
0 commit comments