Skip to content

Commit 4f97333

Browse files
committed
add connection limits
1 parent ef7a615 commit 4f97333

File tree

11 files changed

+341
-73
lines changed

11 files changed

+341
-73
lines changed

README.MD

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,10 @@ A: `v<openbmclapi 版本>-<go-openbmclapi 构建计数>`, 例: `v1.6.7-60`
7070
debug: false
7171
# 是否打印访问信息, 默认为否 (这个选项对于压缩日志文件十分有用)
7272
record_serve_info: false
73+
# 是否禁用 https
7374
nohttps: false
75+
# 是否仅从主服务器下载文件
76+
noopen: false
7477
# 实际开放的公网主机名, 同 CLUSTER_IP
7578
public_host: example.com
7679
# 实际开放的公网端口, 同 CLUSTER_PUBLIC_PORT
@@ -83,6 +86,14 @@ cluster_id: ${CLUSTER_ID}
8386
cluster_secret: ${CLUSTER_SECRET}
8487
# 同步文件时最多打开的连接数量
8588
download_max_conn: 64
89+
# 服务器上行限制
90+
serve_limit:
91+
# 是否启用上行限制
92+
enable: false
93+
# 最大连接数量
94+
max_conn: 16384
95+
# 上行速率限制 (KiB/s), 0 表示无限制
96+
upload_rate: 0
8697

8798
## 特殊要求: 重定向到 OSS
8899
oss:

cluster.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,15 +161,15 @@ func (cr *Cluster) Connect(ctx context.Context) bool {
161161
cr.socket.ErrorHandle = func(*Socket) {
162162
connected()
163163
go func() {
164-
logWarn("Reconnecting due to SIO error")
165-
if cr.Disable(ctx) {
164+
if cr.disconnected() {
165+
logWarn("Reconnecting due to SIO error")
166166
if !cr.Connect(ctx) {
167167
logError("Cannot reconnect to server, exit.")
168-
os.Exit(1)
168+
os.Exit(0x08)
169169
}
170170
if err := cr.Enable(ctx); err != nil {
171171
logError("Cannot enable cluster:", err, "; exit.")
172-
os.Exit(1)
172+
os.Exit(0x08)
173173
}
174174
}
175175
}()
@@ -238,8 +238,6 @@ func (cr *Cluster) Enable(ctx context.Context) (err error) {
238238
ctx, cancel := context.WithTimeout(keepaliveCtx, KeepAliveInterval/2)
239239
defer cancel()
240240
if !cr.KeepAlive(ctx) {
241-
logError("TODO: Keep alive failed, exit.")
242-
os.Exit(0x80)
243241
if keepaliveCtx.Err() == nil {
244242
logInfo("Reconnecting due to keepalive failed")
245243
cr.Disable(keepaliveCtx)
@@ -282,6 +280,22 @@ func (cr *Cluster) KeepAlive(ctx context.Context) (ok bool) {
282280
return true
283281
}
284282

283+
func (cr *Cluster) disconnected() bool {
284+
cr.mux.Lock()
285+
defer cr.mux.Unlock()
286+
287+
if !cr.enabled.Swap(false) {
288+
return false
289+
}
290+
if cr.cancelKeepalive != nil {
291+
cr.cancelKeepalive()
292+
cr.cancelKeepalive = nil
293+
}
294+
cr.socket.Close()
295+
cr.socket = nil
296+
return true
297+
}
298+
285299
func (cr *Cluster) Disable(ctx context.Context) (ok bool) {
286300
cr.mux.Lock()
287301
defer cr.mux.Unlock()
@@ -290,14 +304,14 @@ func (cr *Cluster) Disable(ctx context.Context) (ok bool) {
290304
logDebug("Extra disable")
291305
return false
292306
}
293-
logInfo("Disabling cluster")
294307
if cr.cancelKeepalive != nil {
295308
cr.cancelKeepalive()
296309
cr.cancelKeepalive = nil
297310
}
298311
if cr.socket == nil {
299312
return false
300313
}
314+
logInfo("Disabling cluster")
301315
{
302316
logInfo("Making keepalive before disable")
303317
tctx, cancel := context.WithTimeout(ctx, time.Second*10)
@@ -326,6 +340,7 @@ func (cr *Cluster) Disable(ctx context.Context) (ok bool) {
326340
logError("Disable failed: ack non true value")
327341
return false
328342
}
343+
logWarn("Cluster disabled")
329344
return true
330345
}
331346

config.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ type OSSItem struct {
3737
working atomic.Bool
3838
}
3939

40+
type ServeLimitConfig struct {
41+
Enable bool `yaml:"enable"`
42+
MaxConn int `yaml:"max_conn"`
43+
UploadRate int `yaml:"upload_rate"`
44+
}
45+
4046
type OSSConfig struct {
4147
Enable bool `yaml:"enable"`
4248
List []*OSSItem `yaml:"list"`
@@ -51,18 +57,19 @@ type HijackConfig struct {
5157
}
5258

5359
type Config struct {
54-
Debug bool `yaml:"debug"`
55-
RecordServeInfo bool `yaml:"record_serve_info"`
56-
Nohttps bool `yaml:"nohttps"`
57-
NoOpen bool `yaml:"noopen"`
58-
PublicHost string `yaml:"public_host"`
59-
PublicPort uint16 `yaml:"public_port"`
60-
Port uint16 `yaml:"port"`
61-
ClusterId string `yaml:"cluster_id"`
62-
ClusterSecret string `yaml:"cluster_secret"`
63-
DownloadMaxConn int `yaml:"download_max_conn"`
64-
Oss OSSConfig `yaml:"oss"`
65-
Hijack HijackConfig `yaml:"hijack_port"`
60+
Debug bool `yaml:"debug"`
61+
RecordServeInfo bool `yaml:"record_serve_info"`
62+
Nohttps bool `yaml:"nohttps"`
63+
NoOpen bool `yaml:"noopen"`
64+
PublicHost string `yaml:"public_host"`
65+
PublicPort uint16 `yaml:"public_port"`
66+
Port uint16 `yaml:"port"`
67+
ClusterId string `yaml:"cluster_id"`
68+
ClusterSecret string `yaml:"cluster_secret"`
69+
DownloadMaxConn int `yaml:"download_max_conn"`
70+
ServeLimit ServeLimitConfig `yaml:"serve_limit"`
71+
Oss OSSConfig `yaml:"oss"`
72+
Hijack HijackConfig `yaml:"hijack_port"`
6673
}
6774

6875
func readConfig() (config Config) {
@@ -78,6 +85,11 @@ func readConfig() (config Config) {
7885
ClusterId: "${CLUSTER_ID}",
7986
ClusterSecret: "${CLUSTER_SECRET}",
8087
DownloadMaxConn: 64,
88+
ServeLimit: ServeLimitConfig{
89+
Enable: false,
90+
MaxConn: 16384,
91+
UploadRate: 1024 * 12, // 12MB
92+
},
8193

8294
Oss: OSSConfig{
8395
Enable: false,

config.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
debug: false
22
record_serve_info: false
33
nohttps: false
4+
noopen: false
45
public_host: example.com
56
public_port: 8080
67
port: 4000
78
cluster_id: ${CLUSTER_ID}
89
cluster_secret: ${CLUSTER_SECRET}
910
download_max_conn: 64
11+
serve_limit:
12+
enable: false
13+
max_conn: 0
14+
upload_rate: 0
1015
oss:
1116
enable: false
1217
list:

handler.go

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,21 +52,29 @@ func (r *countReader) Read(buf []byte) (n int, err error) {
5252
type statusResponseWriter struct {
5353
http.ResponseWriter
5454
status int
55+
wrote int64
5556
}
5657

5758
func (w *statusResponseWriter) WriteHeader(status int) {
5859
w.status = status
5960
w.ResponseWriter.WriteHeader(status)
6061
}
6162

63+
func (w *statusResponseWriter) Write(buf []byte) (n int, err error) {
64+
n, err = w.ResponseWriter.Write(buf)
65+
w.wrote += (int64)(n)
66+
return
67+
}
68+
6269
func (cr *Cluster) GetHandler() (handler http.Handler) {
6370
cr.handlerAPIv0 = http.StripPrefix("/api/v0", cr.initAPIv0())
6471

6572
handler = cr
6673
{
6774
type record struct {
68-
used float64
69-
ua string
75+
used float64
76+
bytes float64
77+
ua string
7078
}
7179
recordCh := make(chan record, 1024)
7280

@@ -86,7 +94,10 @@ func (cr *Cluster) GetHandler() (handler http.Handler) {
8694
} else if used > time.Second {
8795
used = used.Truncate(time.Microsecond)
8896
}
89-
logInfof("Serve %d | %12v | %-15s | %s | %-4s %s | %q", srw.status, used, addr, req.Proto, req.Method, req.RequestURI, ua)
97+
logInfof("Serve %d | %12v | %7s | %-15s | %s | %-4s %s | %q",
98+
srw.status, used, bytesToUnit((float64)(srw.wrote)),
99+
addr, req.Proto,
100+
req.Method, req.RequestURI, ua)
90101
}
91102
if srw.status < 200 && 400 <= srw.status {
92103
return
@@ -96,6 +107,7 @@ func (cr *Cluster) GetHandler() (handler http.Handler) {
96107
}
97108
var rec record
98109
rec.used = used.Seconds()
110+
rec.bytes = (float64)(srw.wrote)
99111
rec.ua, _ = split(ua, '/')
100112
select {
101113
case recordCh <- rec:
@@ -110,33 +122,35 @@ func (cr *Cluster) GetHandler() (handler http.Handler) {
110122
defer updateTicker.Stop()
111123

112124
var (
113-
total int64
114-
totalUsed float64
115-
uas = make(map[string]int, 10)
125+
total int
126+
totalUsed float64
127+
totalBytes float64
128+
uas = make(map[string]int, 10)
116129
)
117130
for {
118131
select {
119132
case <-updateTicker.C:
120133
cr.stats.mux.Lock()
121-
total = 0
122-
totalUsed = 0
134+
135+
logInfof("Served %d requests, %s, used %.2fs, %s/s", total, bytesToUnit(totalBytes), totalUsed, bytesToUnit(totalBytes/totalUsed))
123136
for ua, v := range uas {
124137
if ua == "" {
125138
ua = "[Unknown]"
126139
}
127140
cr.stats.Accesses[ua] += v
128141
}
142+
143+
total = 0
144+
totalUsed = 0
145+
totalBytes = 0
129146
clear(uas)
147+
130148
cr.stats.mux.Unlock()
131149
case rec := <-recordCh:
132150
total++
133151
totalUsed += rec.used
152+
totalBytes += rec.bytes
134153
uas[rec.ua]++
135-
136-
if total%100 == 0 {
137-
avg := (time.Duration)(totalUsed / (float64)(total) * (float64)(time.Second))
138-
logInfof("Served %d requests, total used %.2fs, avg %v", total, totalUsed, avg)
139-
}
140154
case <-disabled:
141155
return
142156
}

0 commit comments

Comments
 (0)