Skip to content

Commit f7ef341

Browse files
initial commit
0 parents  commit f7ef341

File tree

10 files changed

+5107
-0
lines changed

10 files changed

+5107
-0
lines changed

.github/workflows/ci.yml

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
name: CI
2+
3+
on:
4+
push:
5+
pull_request:
6+
schedule:
7+
- cron: '0 0 * * 0'
8+
9+
permissions:
10+
contents: read
11+
12+
jobs:
13+
test-node:
14+
runs-on: ubuntu-latest
15+
timeout-minutes: 10
16+
17+
strategy:
18+
matrix:
19+
node-version:
20+
- 20
21+
22+
steps:
23+
- name: Checkout repository
24+
uses: actions/checkout@v4
25+
26+
- name: Use Node.js ${{ matrix.node-version }}
27+
uses: actions/setup-node@v4
28+
with:
29+
node-version: ${{ matrix.node-version }}
30+
31+
- name: Install dependencies
32+
run: npm ci
33+
34+
- name: Run tests
35+
# would require Azure credentials
36+
# run: npm test
37+
run: npm run format:check && npm run compile

.gitignore

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
lib-cov
2+
*.seed
3+
*.log
4+
*.csv
5+
*.dat
6+
*.out
7+
*.pid
8+
*.gz
9+
10+
pids
11+
logs
12+
results
13+
14+
npm-debug.log
15+
node_modules
16+
.idea
17+
.nyc_output/
18+
dist/

LICENSE

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
Copyright (c) 2024 The Socket.IO team
2+
3+
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4+
5+
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6+
7+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

README.md

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Socket.IO Azure Service Bus adapter
2+
3+
The `@socket.io/azure-service-bus-adapter` package allows broadcasting packets between multiple Socket.IO servers.
4+
5+
**Table of contents**
6+
7+
- [Supported features](#supported-features)
8+
- [Installation](#installation)
9+
- [Usage](#usage)
10+
- [Options](#options)
11+
- [License](#license)
12+
13+
## Supported features
14+
15+
| Feature | `socket.io` version | Support |
16+
|---------------------------------|---------------------|------------------------------------------------|
17+
| Socket management | `4.0.0` | :white_check_mark: YES (since version `0.1.0`) |
18+
| Inter-server communication | `4.1.0` | :white_check_mark: YES (since version `0.1.0`) |
19+
| Broadcast with acknowledgements | `4.5.0` | :white_check_mark: YES (since version `0.1.0`) |
20+
| Connection state recovery | `4.6.0` | :x: NO |
21+
22+
## Installation
23+
24+
```
25+
npm install @socket.io/azure-service-bus-adapter
26+
```
27+
28+
## Usage
29+
30+
```js
31+
import { ServiceBusClient, ServiceBusAdministrationClient } from "@azure/service-bus";
32+
import { Server } from "socket.io";
33+
import { createAdapter } from "@socket.io/azure-service-bus-adapter";
34+
35+
const connectionString = "Endpoint=...";
36+
37+
const serviceBusClient = new ServiceBusClient(connectionString);
38+
const serviceBusAdminClient = new ServiceBusAdministrationClient(connectionString);
39+
40+
const io = new Server({
41+
adapter: createAdapter(serviceBusClient, serviceBusAdminClient)
42+
});
43+
44+
// wait for the creation of the pub/sub subscription
45+
await io.of("/").adapter.init();
46+
47+
io.listen(3000);
48+
```
49+
50+
## Options
51+
52+
| Name | Description | Default value |
53+
|----------------------|--------------------------------------------------------------------------------------------------------|---------------|
54+
| `topicName` | The name of the topic. | `socket.io` |
55+
| `topicOptions` | The options used to create the topic. | `-` |
56+
| `subscriptionPrefix` | The prefix of the subscription (one subscription will be created per Socket.IO server in the cluster). | `socket.io` |
57+
| `receiverOptions` | The options used to create the subscription. | `-` |
58+
| `topicOptions` | The options used to create the receiver. | `-` |
59+
| `heartbeatInterval` | The number of ms between two heartbeats. | `5_000` |
60+
| `heartbeatTimeout` | The number of ms without heartbeat before we consider a node down. | `10_000` |
61+
62+
## License
63+
64+
[MIT](LICENSE)

lib/index.ts

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
import { ClusterAdapterWithHeartbeat } from "socket.io-adapter";
2+
import type {
3+
ClusterAdapterOptions,
4+
ClusterMessage,
5+
ClusterResponse,
6+
Offset,
7+
ServerId,
8+
} from "socket.io-adapter";
9+
import { encode, decode } from "@msgpack/msgpack";
10+
import { randomBytes } from "node:crypto";
11+
import type {
12+
CreateSubscriptionOptions,
13+
CreateTopicOptions,
14+
ProcessErrorArgs,
15+
ServiceBusAdministrationClient,
16+
ServiceBusClient,
17+
ServiceBusMessage,
18+
ServiceBusReceivedMessage,
19+
ServiceBusReceiverOptions,
20+
ServiceBusSender,
21+
} from "@azure/service-bus";
22+
23+
const debug = require("debug")("socket.io-azure-service-bus-adapter");
24+
25+
function randomId() {
26+
return randomBytes(8).toString("hex");
27+
}
28+
29+
export interface AdapterOptions extends ClusterAdapterOptions {
30+
/**
31+
* The name of the topic.
32+
* @default "socket.io"
33+
*/
34+
topicName?: string;
35+
/**
36+
* The options used to create the topic.
37+
*/
38+
topicOptions?: CreateTopicOptions;
39+
/**
40+
* The prefix of the subscription (one subscription will be created per Socket.IO server in the cluster).
41+
* @default "socket.io"
42+
*/
43+
subscriptionPrefix?: string;
44+
/**
45+
* The options used to create the subscription.
46+
*/
47+
subscriptionOptions?: CreateSubscriptionOptions;
48+
/**
49+
* The options used to create the receiver.
50+
*/
51+
receiverOptions?: ServiceBusReceiverOptions;
52+
}
53+
54+
async function createSubscription(
55+
adminClient: ServiceBusAdministrationClient,
56+
topicName: string,
57+
subscriptionName: string,
58+
opts: AdapterOptions
59+
) {
60+
try {
61+
await adminClient.getTopic(topicName);
62+
63+
debug("topic [%s] already exists", topicName);
64+
} catch (e) {
65+
debug("topic [%s] does not exist", topicName);
66+
67+
await adminClient.createTopic(topicName, opts.topicOptions);
68+
69+
debug("topic [%s] was successfully created", topicName);
70+
}
71+
72+
debug("creating subscription [%s]", subscriptionName);
73+
74+
await adminClient.createSubscription(
75+
topicName,
76+
subscriptionName,
77+
opts.subscriptionOptions
78+
);
79+
80+
debug("subscription [%s] was successfully created", subscriptionName);
81+
82+
return {
83+
topicName,
84+
subscriptionName,
85+
};
86+
}
87+
88+
/**
89+
* Returns a function that will create a {@link PubSubAdapter} instance.
90+
*
91+
* @param client - a ServiceBusClient instance from the `@azure/service-bus` package
92+
* @param adminClient - a ServiceBusAdministrationClient instance from the `@azure/service-bus` package
93+
* @param opts - additional options
94+
*
95+
* @see https://learn.microsoft.com/en-us/azure/service-bus-messaging
96+
*
97+
* @public
98+
*/
99+
export function createAdapter(
100+
client: ServiceBusClient,
101+
adminClient: ServiceBusAdministrationClient,
102+
opts: AdapterOptions = {}
103+
) {
104+
const namespaceToAdapters = new Map<string, PubSubAdapter>();
105+
106+
const topicName = opts.topicName || "socket.io";
107+
const subscriptionName = `${
108+
opts.subscriptionPrefix || "socket.io"
109+
}-${randomId()}`;
110+
111+
const sender = client.createSender(topicName);
112+
const receiver = client.createReceiver(
113+
topicName,
114+
subscriptionName,
115+
opts.receiverOptions
116+
);
117+
118+
const subscriptionCreation = createSubscription(
119+
adminClient,
120+
topicName,
121+
subscriptionName,
122+
opts
123+
)
124+
.then(() => {
125+
receiver.subscribe({
126+
async processMessage(
127+
message: ServiceBusReceivedMessage
128+
): Promise<void> {
129+
if (
130+
!message.applicationProperties ||
131+
typeof message.applicationProperties["nsp"] !== "string"
132+
) {
133+
debug("ignore malformed message");
134+
return;
135+
}
136+
const namespace = message.applicationProperties["nsp"];
137+
138+
namespaceToAdapters.get(namespace)?.onRawMessage(message);
139+
140+
if (receiver.receiveMode === "peekLock") {
141+
await receiver.completeMessage(message);
142+
}
143+
},
144+
async processError(args: ProcessErrorArgs): Promise<void> {
145+
debug("an error has occurred: %s", args.error.message);
146+
},
147+
});
148+
})
149+
.catch((err) => {
150+
debug(
151+
"an error has occurred while creating the subscription: %s",
152+
err.message
153+
);
154+
});
155+
156+
return function (nsp: any) {
157+
const adapter = new PubSubAdapter(nsp, sender, opts);
158+
159+
namespaceToAdapters.set(nsp.name, adapter);
160+
161+
const defaultInit = adapter.init;
162+
163+
adapter.init = () => {
164+
return subscriptionCreation.then(() => {
165+
defaultInit.call(adapter);
166+
});
167+
};
168+
169+
const defaultClose = adapter.close;
170+
171+
adapter.close = async () => {
172+
defaultClose.call(adapter);
173+
174+
namespaceToAdapters.delete(nsp.name);
175+
176+
if (namespaceToAdapters.size === 0) {
177+
debug("deleting subscription [%s]", subscriptionName);
178+
179+
return Promise.all([
180+
receiver.close(),
181+
sender.close(),
182+
adminClient
183+
.deleteSubscription(topicName, subscriptionName)
184+
.then(() => {
185+
debug(
186+
"subscription [%s] was successfully deleted",
187+
subscriptionName
188+
);
189+
})
190+
.catch((err) => {
191+
debug(
192+
"an error has occurred while deleting the subscription: %s",
193+
err.message
194+
);
195+
}),
196+
]);
197+
}
198+
};
199+
200+
return adapter;
201+
};
202+
}
203+
204+
export class PubSubAdapter extends ClusterAdapterWithHeartbeat {
205+
private readonly sender: ServiceBusSender;
206+
/**
207+
* Adapter constructor.
208+
*
209+
* @param nsp - the namespace
210+
* @param sender - a ServiceBus sender
211+
* @param opts - additional options
212+
*
213+
* @public
214+
*/
215+
constructor(nsp: any, sender: ServiceBusSender, opts: ClusterAdapterOptions) {
216+
super(nsp, opts);
217+
this.sender = sender;
218+
}
219+
220+
protected doPublish(message: ClusterMessage): Promise<Offset> {
221+
return this.sender
222+
.sendMessages({
223+
body: encode(message),
224+
applicationProperties: {
225+
nsp: this.nsp.name,
226+
uid: this.uid,
227+
},
228+
})
229+
.then();
230+
}
231+
232+
protected doPublishResponse(
233+
requesterUid: ServerId,
234+
response: ClusterResponse
235+
): Promise<void> {
236+
return this.sender
237+
.sendMessages({
238+
body: encode(response),
239+
applicationProperties: {
240+
nsp: this.nsp.name,
241+
uid: this.uid,
242+
requesterUid,
243+
},
244+
})
245+
.then();
246+
}
247+
248+
public onRawMessage(rawMessage: ServiceBusMessage) {
249+
if (rawMessage.applicationProperties!["uid"] === this.uid) {
250+
debug("ignore message from self");
251+
return;
252+
}
253+
254+
const requesterUid = rawMessage.applicationProperties!["requesterUid"];
255+
if (requesterUid && requesterUid !== this.uid) {
256+
debug("ignore response for another node");
257+
return;
258+
}
259+
260+
const decoded = decode(rawMessage.body);
261+
debug("received %j", decoded);
262+
263+
if (requesterUid) {
264+
this.onResponse(decoded as ClusterResponse);
265+
} else {
266+
this.onMessage(decoded as ClusterMessage);
267+
}
268+
}
269+
}

0 commit comments

Comments
 (0)