Skip to content

Commit 1c7a59f

Browse files
committed
Implement a circuit breaker filter
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent cf80203 commit 1c7a59f

File tree

2 files changed

+72
-0
lines changed

2 files changed

+72
-0
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// A filter that let messages go through, unless too many messages are received within a given period
2+
//
3+
// This filter is configured by the following settings:
4+
// - tick_every_seconds: the frequency at which the sliding window is moved
5+
// - tick_count: size of the time windows
6+
// - too_many: how many messages is too many (received during the last tick_count*tick_every_seconds seconds)
7+
// - back_to_normal: how many messages is okay to reactivate the filter if bellow
8+
// - message_on_too_many: message sent when the upper threshold is crossed
9+
// - message_on_back_to_normal: message sent when the lower threshold is crossed
10+
// - stats_topic: topic for statistic messages
11+
class State {
12+
static open = false
13+
static total = 0
14+
static batch = [0]
15+
}
16+
17+
18+
export function process (timestamp, message, config) {
19+
State.total += 1
20+
State.batch[0] += 1
21+
if (State.open) {
22+
let back_to_normal = config?.back_to_normal || 100
23+
if (State.total < back_to_normal) {
24+
State.open = false
25+
if (config?.message_on_back_to_normal) {
26+
return [config?.message_on_back_to_normal, message]
27+
} else {
28+
return [message]
29+
}
30+
} else {
31+
return []
32+
}
33+
} else {
34+
let too_many = config?.too_many || 1000
35+
if (State.total < too_many) {
36+
return [message]
37+
} else {
38+
State.open = true
39+
if (config?.message_on_too_many) {
40+
return [config?.message_on_too_many]
41+
} else {
42+
return []
43+
}
44+
}
45+
}
46+
}
47+
48+
49+
export function tick(timestamp, config) {
50+
let max_batch_count = config?.tick_count || 10
51+
let new_batch_count = State.batch.unshift(0)
52+
if (new_batch_count > max_batch_count) {
53+
State.total -= State.batch.pop()
54+
}
55+
56+
if (config?.stats_topic) {
57+
return [{
58+
topic: config?.stats_topic,
59+
payload: `{"circuit-breaker-open": ${State.open}, "total": ${State.total}, "batch": ${State.batch}}`
60+
}]
61+
} else {
62+
return []
63+
}
64+
65+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# This pipeline is on purpose looping: the messages are published to the same topic
2+
input_topics = ["loopback/#"]
3+
4+
stages = [
5+
{ filter = "add_timestamp.js" },
6+
{ filter = "circuit-breaker.js", tick_every_seconds = 1, config = { stats_topic = "te/error", too_many = 10000, message_on_too_many = { topic = "te/device/main///a/too-many-messages", payload = "too many messages" }, message_on_back_to_normal = { topic = "te/device/main///a/too-many-messages", payload = "back to normal" } } }
7+
]

0 commit comments

Comments
 (0)