Skip to content

Commit 1b69e9f

Browse files
authored
Merge pull request #196 from VKCOM/e.khalilov/zeromq-upgrade/QA-16136
upgrade ZeroMQ from v5 to v6
2 parents fba2a68 + 10c707d commit 1b69e9f

File tree

5 files changed

+169
-48
lines changed

5 files changed

+169
-48
lines changed

.eslintrc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
"accessor-pairs": 2,
2121
"block-scoped-var": 2,
2222
"complexity": 0,
23-
"consistent-return": 1,
23+
"consistent-return": 0,
2424
"curly": 2,
2525
"dot-location": [2, "property"], // defaults to "object"
2626
"dot-notation": 2,
@@ -50,14 +50,13 @@
5050
"no-native-reassign": 2,
5151
"no-new-func": 2,
5252
"no-new-wrappers": 2,
53-
"no-new": 2,
53+
"no-new": 0,
5454
"no-octal-escape": 2,
5555
"no-octal": 1, // TODO: accept until we use ES6 0o755 notation
5656
"no-param-reassign": 2,
5757
"no-process-env": 0, // `2` is recommended
5858
"no-proto": 2,
5959
"no-redeclare": [2, {"builtinGlobals": true}], // `2` is recommended and actually defaults to `[2, {"builtinGlobals": false}]`
60-
"no-return-assign": [1, "except-parens"],
6160
"no-script-url": 2,
6261
"no-self-compare": 2,
6362
"no-sequences": 2,
@@ -154,7 +153,8 @@
154153
"no-path-concat": 2, // `2` is default
155154
"no-process-exit": 0, // `2` is default
156155
"no-restricted-modules": 0, // no default, optionally set `[2, "fs", "os"]`
157-
"no-sync": 1 // `2` is default
156+
"no-sync": 0, // `2` is default
157+
"no-async-promise-executor": 0
158158

159159
// eslint v2
160160
// "keyword-spacing": 2

.github/workflows/api_tests.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,5 @@ jobs:
2222
run: docker compose -f docker-compose-test.yaml up --exit-code-from devicehub-pytest --abort-on-container-exit devicehub-pytest --remove-orphans
2323

2424
- name: Check generate device logs
25+
if: always()
2526
run: docker compose -f docker-compose-test.yaml logs devicehub-generate-fake-device

lib/util/zmqutil.js

Lines changed: 138 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,138 @@
1-
//
2-
// Copyright © 2022 contains code contributed by Orange SA, authors: Denis Barbaron - Licensed under the Apache license 2.0
3-
//
4-
5-
// ISSUE-100 (https://github.com/openstf/stf/issues/100)
6-
7-
// In some networks TCP Connection dies if kept idle for long.
8-
// Setting TCP_KEEPALIVE option true, to all the zmq sockets
9-
// won't let it die
10-
11-
import zmq from 'zeromq'
12-
import logger from './logger.js'
13-
const log = logger.createLogger('util:zmqutil')
14-
15-
export const socket = function(...args) {
16-
let sock = zmq.socket(...args)
17-
18-
;['ZMQ_TCP_KEEPALIVE', 'ZMQ_TCP_KEEPALIVE_IDLE', 'ZMQ_IPV6'].forEach(function(opt) {
19-
if (process.env[opt]) {
20-
try {
21-
sock.setsockopt(zmq[opt], Number(process.env[opt]))
22-
}
23-
catch (err) {
24-
log.warn('ZeroMQ library too old, no support for %s', opt)
25-
}
26-
}
27-
})
28-
29-
return sock
30-
}
1+
//
2+
// Copyright © 2025 contains code contributed by V Kontakte LLC - Licensed under the Apache license 2.0
3+
//
4+
// This wrapper is designed to make 0MQ v6 backwards compatible with v5
5+
6+
import * as zmq from 'zeromq'
7+
import logger from './logger.js'
8+
import {EventEmitter} from 'events'
9+
const log = logger.createLogger('util:zmqutil')
10+
11+
const socketTypeMap = {
12+
pub: zmq.Publisher
13+
, sub: zmq.Subscriber
14+
, push: zmq.Push
15+
, pull: zmq.Pull
16+
, dealer: zmq.Dealer
17+
, router: zmq.Router
18+
, pair: zmq.Pair
19+
, req: zmq.Request
20+
, reply: zmq.Reply
21+
}
22+
23+
class SocketWrapper extends EventEmitter {
24+
#sendQueue = Promise.resolve()
25+
26+
constructor(type) {
27+
super()
28+
29+
if (!(type in socketTypeMap)) {
30+
throw new Error(`Unsupported socket type: ${type}`)
31+
}
32+
33+
this.type = type
34+
this.isActive = true
35+
this.endpoints = new Set()
36+
37+
const SocketClass = socketTypeMap[type]
38+
this.socket = new SocketClass()
39+
}
40+
41+
bindSync = (address) => this.socket.bindSync(address)
42+
43+
connect(endpoint) {
44+
this.socket.connect(endpoint)
45+
this.endpoints.add(endpoint)
46+
log.verbose('Socket connected to:', endpoint)
47+
48+
return this
49+
}
50+
51+
subscribe(topic) {
52+
if (this.type === 'sub') {
53+
this.socket.subscribe(
54+
typeof topic === 'string' ? Buffer.from(topic) : topic
55+
)
56+
}
57+
58+
return this
59+
}
60+
61+
unsubscribe(topic) {
62+
if (this.type === 'sub') {
63+
this.socket.unsubscribe(
64+
typeof topic === 'string' ? Buffer.from(topic) : topic
65+
)
66+
}
67+
return this
68+
}
69+
70+
async sendAsync(args) {
71+
try {
72+
await this.socket.send(
73+
(Array.isArray(args) ? args : [args])
74+
.map(arg => Buffer.isBuffer(arg) ? arg : Buffer.from(String(arg)))
75+
)
76+
}
77+
catch (err) {
78+
log.error('Error on send:', err && (err.message || err.toString()) || err)
79+
}
80+
}
81+
82+
send(args) {
83+
this.#sendQueue = this.#sendQueue.then(() => this.sendAsync(args))
84+
return this
85+
}
86+
87+
close() {
88+
this.isActive = false
89+
this.socket.close()
90+
91+
return this
92+
}
93+
94+
async startReceiveLoop() {
95+
const isValidType =
96+
this.type === 'sub' ||
97+
this.type === 'pull' ||
98+
this.type === 'dealer' ||
99+
this.type === 'router' ||
100+
this.type === 'reply'
101+
102+
if (!this.isActive || !isValidType) {
103+
return
104+
}
105+
106+
try {
107+
const iterator = this.socket[Symbol.asyncIterator]()
108+
let result
109+
110+
while (this.isActive && !(result = await iterator.next()).done) {
111+
const message = result.value
112+
113+
if (Array.isArray(message) && !!message[0]?.toString) {
114+
super.emit(
115+
'message'
116+
, message[0].toString()
117+
, ...message.slice(1)
118+
)
119+
}
120+
}
121+
}
122+
catch (err) {
123+
log.error('Error in message receive loop:', err && (err.message || err.toString()) || 'Unknown error')
124+
return this.startReceiveLoop()
125+
}
126+
}
127+
}
128+
129+
export const socket = (type) => {
130+
if (!(type in socketTypeMap)) {
131+
throw new Error(`Unsupported socket type: ${type}`)
132+
}
133+
134+
const wrappedSocket = new SocketWrapper(type)
135+
wrappedSocket.startReceiveLoop()
136+
137+
return wrappedSocket
138+
}

package-lock.json

Lines changed: 25 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@
115115
"yaml": "2.4.2",
116116
"yargs": "^17.7.2",
117117
"yauzl": "2.10.0",
118-
"zeromq": "^5.0.0"
118+
"zeromq": "6.4.1"
119119
},
120120
"devDependencies": {
121121
"@eslint/eslintrc": "^3.1.0",

0 commit comments

Comments
 (0)