Skip to content

Commit db5edc6

Browse files
committed
Implement Consistent Hashing with Bounded Loads
1 parent 5628f76 commit db5edc6

File tree

10 files changed

+953
-8
lines changed

10 files changed

+953
-8
lines changed

internal/ingress/annotations/upstreamhashby/main.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ type upstreamhashby struct {
2929

3030
// Config contains the Consistent hash configuration to be used in the Ingress
3131
type Config struct {
32-
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
33-
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
34-
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
32+
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
33+
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
34+
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
35+
UpstreamHashByBalanceFactor float32 `json:"upstream-hash-by-balance-factor,omitempty"`
3536
}
3637

3738
// NewParser creates a new UpstreamHashBy annotation parser
@@ -44,10 +45,11 @@ func (a upstreamhashby) Parse(ing *networking.Ingress) (interface{}, error) {
4445
upstreamHashBy, _ := parser.GetStringAnnotation("upstream-hash-by", ing)
4546
upstreamHashBySubset, _ := parser.GetBoolAnnotation("upstream-hash-by-subset", ing)
4647
upstreamHashbySubsetSize, _ := parser.GetIntAnnotation("upstream-hash-by-subset-size", ing)
48+
upstreamHashByBalanceFactor, _ := parser.GetFloatAnnotation("upstream-hash-by-balance-factor", ing)
4749

4850
if upstreamHashbySubsetSize == 0 {
4951
upstreamHashbySubsetSize = 3
5052
}
5153

52-
return &Config{upstreamHashBy, upstreamHashBySubset, upstreamHashbySubsetSize}, nil
54+
return &Config{upstreamHashBy, upstreamHashBySubset, upstreamHashbySubsetSize, upstreamHashByBalanceFactor}, nil
5355
}

internal/ingress/controller/controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,6 +1023,7 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B
10231023
upstreams[name].UpstreamHashBy.UpstreamHashBy = anns.UpstreamHashBy.UpstreamHashBy
10241024
upstreams[name].UpstreamHashBy.UpstreamHashBySubset = anns.UpstreamHashBy.UpstreamHashBySubset
10251025
upstreams[name].UpstreamHashBy.UpstreamHashBySubsetSize = anns.UpstreamHashBy.UpstreamHashBySubsetSize
1026+
upstreams[name].UpstreamHashBy.UpstreamHashByBalanceFactor = anns.UpstreamHashBy.UpstreamHashByBalanceFactor
10261027

10271028
upstreams[name].LoadBalancing = anns.LoadBalancing
10281029
if upstreams[name].LoadBalancing == "" {

internal/ingress/defaults/main.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,18 @@ type Backend struct {
132132
// Default 3
133133
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size"`
134134

135+
// Configures percentage of average cluster load to bound per upstream host.
136+
// For example, with a value of 1.5 no upstream host will get a load more than 1.5x times
137+
// the average load of all the hosts in the cluster.
138+
//
139+
// This is implemented based on the method described in the paper https://arxiv.org/abs/1608.01350
140+
// This is an O(N) algorithm, unlike other load balancers.
141+
// Using a lower hash_balance_factor results in more hosts being probed,
142+
// so use a higher value if you require better performance.
143+
//
144+
// Defaults to 2 (meaning a host might be overloaded 2x compared to average)
145+
UpstreamHashByBalanceFactor float32 `json:"upstream-hash-by-balance-factor"`
146+
135147
// Let's us choose a load balancing algorithm per ingress
136148
LoadBalancing string `json:"load-balance"`
137149

pkg/apis/ingress/types.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,10 @@ type CookieSessionAffinity struct {
168168

169169
// UpstreamHashByConfig described setting from the upstream-hash-by* annotations.
170170
type UpstreamHashByConfig struct {
171-
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
172-
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
173-
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
171+
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
172+
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
173+
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
174+
UpstreamHashByBalanceFactor float32 `json:"upstream-hash-by-balance-factor,omitempty"`
174175
}
175176

176177
// Endpoint describes a kubernetes endpoint in a backend

rootfs/etc/nginx/lua/balancer.lua

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ local configuration = require("configuration")
66
local round_robin = require("balancer.round_robin")
77
local chash = require("balancer.chash")
88
local chashsubset = require("balancer.chashsubset")
9+
local chashboundedloads = require("balancer.chashboundedloads")
910
local sticky_balanced = require("balancer.sticky_balanced")
1011
local sticky_persistent = require("balancer.sticky_persistent")
1112
local ewma = require("balancer.ewma")
@@ -29,6 +30,7 @@ local IMPLEMENTATIONS = {
2930
round_robin = round_robin,
3031
chash = chash,
3132
chashsubset = chashsubset,
33+
chashboundedloads = chashboundedloads,
3234
sticky_balanced = sticky_balanced,
3335
sticky_persistent = sticky_persistent,
3436
ewma = ewma,
@@ -55,7 +57,9 @@ local function get_implementation(backend)
5557

5658
elseif backend["upstreamHashByConfig"] and
5759
backend["upstreamHashByConfig"]["upstream-hash-by"] then
58-
if backend["upstreamHashByConfig"]["upstream-hash-by-subset"] then
60+
if backend["upstreamHashByConfig"]["upstream-hash-by-balance-factor"] then
61+
name = "chashboundedloads"
62+
elseif backend["upstreamHashByConfig"]["upstream-hash-by-subset"] then
5963
name = "chashsubset"
6064
else
6165
name = "chash"
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
-- Implements Consistent Hashing with Bounded Loads based on the paper [1].
2+
-- For the specified hash-balance-factor, requests to any upstream host are capped
3+
-- at hash_balance_factor times the average number of requests across the cluster.
4+
-- When a request arrives for an upstream host that is currently serving at its max capacity,
5+
-- linear probing is used to identify the next eligible host.
6+
--
7+
-- This is an O(N) algorithm, unlike other load balancers. Using a lower hash-balance-factor
8+
-- results in more hosts being probed, so use a higher value if you require better performance.
9+
--
10+
-- [1]: https://arxiv.org/abs/1608.01350
11+
12+
local resty_roundrobin = require("resty.roundrobin")
13+
local resty_chash = require("resty.chash")
14+
local setmetatable = setmetatable
15+
local lrucache = require("resty.lrucache")
16+
17+
local util = require("util")
18+
local split = require("util.split")
19+
local reverse_table = util.reverse_table
20+
21+
local string_format = string.format
22+
local INFO = ngx.INFO
23+
local ngx_ERR = ngx.ERR
24+
local ngx_WARN = ngx.WARN
25+
local ngx_log = ngx.log
26+
local math_ceil = math.ceil
27+
local ipairs = ipairs
28+
local ngx = ngx
29+
30+
local DEFAULT_HASH_BALANCE_FACTOR = 2
31+
32+
local HOST_SEED = util.get_host_seed()
33+
34+
-- Controls how many "tenants" we'll keep track of
35+
-- to avoid routing them to alternative_backends
36+
-- as they were already consistently routed to some endpoint.
37+
-- Lowering this value will increases the chances of more
38+
-- tenants being routed to alternative_backends.
39+
-- Similarly, increasing this value will keep more tenants
40+
-- consistently routed to the same endpoint in the main backend.
41+
local SEEN_LRU_SIZE = 1000
42+
43+
local _M = {}
44+
45+
local function incr_req_stats(self, endpoint)
46+
if not self.requests_by_endpoint[endpoint] then
47+
self.requests_by_endpoint[endpoint] = 1
48+
else
49+
self.requests_by_endpoint[endpoint] = self.requests_by_endpoint[endpoint] + 1
50+
end
51+
self.total_requests = self.total_requests + 1
52+
end
53+
54+
local function decr_req_stats(self, endpoint)
55+
if self.requests_by_endpoint[endpoint] then
56+
self.requests_by_endpoint[endpoint] = self.requests_by_endpoint[endpoint] - 1
57+
if self.requests_by_endpoint[endpoint] == 0 then
58+
self.requests_by_endpoint[endpoint] = nil
59+
end
60+
end
61+
self.total_requests = self.total_requests - 1
62+
end
63+
64+
local function get_hash_by_value(self)
65+
if not ngx.ctx.chash_hash_by_value then
66+
ngx.ctx.chash_hash_by_value = util.generate_var_value(self.hash_by)
67+
end
68+
69+
local v = ngx.ctx.chash_hash_by_value
70+
if v == "" then
71+
return nil
72+
end
73+
return v
74+
end
75+
76+
local function endpoint_eligible(self, endpoint)
77+
-- (num_requests * hash-balance-factor / num_servers)
78+
local allowed = math_ceil(
79+
(self.total_requests + 1) * self.balance_factor / self.total_endpoints)
80+
local current = self.requests_by_endpoint[endpoint]
81+
if current == nil then
82+
return true, 0, allowed
83+
else
84+
return current < allowed, current, allowed
85+
end
86+
end
87+
88+
local function update_balance_factor(self, backend)
89+
local balance_factor = backend["upstreamHashByConfig"]["upstream-hash-by-balance-factor"]
90+
if balance_factor and balance_factor <= 1 then
91+
ngx_log(ngx_WARN,
92+
"upstream-hash-by-balance-factor must be > 1. Forcing it to the default value of ",
93+
DEFAULT_HASH_BALANCE_FACTOR)
94+
balance_factor = DEFAULT_HASH_BALANCE_FACTOR
95+
end
96+
self.balance_factor = balance_factor or DEFAULT_HASH_BALANCE_FACTOR
97+
end
98+
99+
local function normalize_endpoints(endpoints)
100+
local b = {}
101+
for i, endpoint in ipairs(endpoints) do
102+
b[i] = string_format("%s:%s", endpoint.address, endpoint.port)
103+
end
104+
return b
105+
end
106+
107+
local function update_endpoints(self, endpoints)
108+
self.endpoints = endpoints
109+
self.endpoints_reverse = reverse_table(endpoints)
110+
self.total_endpoints = #endpoints
111+
self.ring_seed = util.array_mod(HOST_SEED, self.total_endpoints)
112+
end
113+
114+
function _M.is_affinitized(self)
115+
-- alternative_backends might contain a canary backend that gets a percentage of traffic.
116+
-- If a tenant has already been consistently routed to a endpoint, we want to stick to that
117+
-- to keep a higher cache ratio, rather than routing it to an alternative backend.
118+
-- This would mean that alternative backends (== canary) would mostly be seeing "new" tenants.
119+
120+
if not self.alternative_backends or not self.alternative_backends[1] then
121+
return false
122+
end
123+
124+
local hash_by_value = get_hash_by_value(self)
125+
if not hash_by_value then
126+
return false
127+
end
128+
129+
return self.seen_hash_by_values:get(hash_by_value) ~= nil
130+
end
131+
132+
function _M.new(self, backend)
133+
local nodes = util.get_nodes(backend.endpoints)
134+
135+
local complex_val, err =
136+
util.parse_complex_value(backend["upstreamHashByConfig"]["upstream-hash-by"])
137+
if err ~= nil then
138+
ngx_log(ngx_ERR, "could not parse the value of the upstream-hash-by: ", err)
139+
end
140+
141+
local o = {
142+
name = "chashboundedloads",
143+
144+
chash = resty_chash:new(nodes),
145+
roundrobin = resty_roundrobin:new(nodes),
146+
alternative_backends = backend.alternativeBackends,
147+
hash_by = complex_val,
148+
149+
requests_by_endpoint = {},
150+
total_requests = 0,
151+
seen_hash_by_values = lrucache.new(SEEN_LRU_SIZE)
152+
}
153+
154+
update_endpoints(o, normalize_endpoints(backend.endpoints))
155+
update_balance_factor(o, backend)
156+
157+
setmetatable(o, self)
158+
self.__index = self
159+
return o
160+
end
161+
162+
function _M.sync(self, backend)
163+
self.alternative_backends = backend.alternativeBackends
164+
165+
update_balance_factor(self, backend)
166+
167+
local new_endpoints = normalize_endpoints(backend.endpoints)
168+
169+
if util.deep_compare(self.endpoints, new_endpoints) then
170+
ngx_log(INFO, "endpoints did not change for backend", backend.name)
171+
return
172+
end
173+
174+
ngx_log(INFO, string_format("[%s] endpoints have changed for backend %s",
175+
self.name, backend.name))
176+
177+
update_endpoints(self, new_endpoints)
178+
179+
local nodes = util.get_nodes(backend.endpoints)
180+
self.chash:reinit(nodes)
181+
self.roundrobin:reinit(nodes)
182+
183+
self.seen_hash_by_values = lrucache.new(SEEN_LRU_SIZE)
184+
185+
ngx_log(INFO, string_format("[%s] nodes have changed for backend %s", self.name, backend.name))
186+
end
187+
188+
function _M.balance(self)
189+
local hash_by_value = get_hash_by_value(self)
190+
191+
-- Tenant key not available, falling back to round-robin
192+
if not hash_by_value then
193+
local endpoint = self.roundrobin:find()
194+
ngx.var.chashbl_debug = "fallback_round_robin"
195+
return endpoint
196+
end
197+
198+
self.seen_hash_by_values:set(hash_by_value, true)
199+
200+
local tried_endpoints
201+
if not ngx.ctx.balancer_chashbl_tried_endpoints then
202+
tried_endpoints = {}
203+
ngx.ctx.balancer_chashbl_tried_endpoints = tried_endpoints
204+
else
205+
tried_endpoints = ngx.ctx.balancer_chashbl_tried_endpoints
206+
end
207+
208+
local first_endpoint = self.chash:find(hash_by_value)
209+
local index = self.endpoints_reverse[first_endpoint]
210+
211+
-- By design, resty.chash always points to the same element of the ring,
212+
-- regardless of the environment. In this algorithm, we want the consistency
213+
-- to be "seeded" based on the host where it's running.
214+
-- That's how both Envoy and Haproxy implement this.
215+
-- For convenience, we keep resty.chash but manually introduce the seed.
216+
index = util.array_mod(index + self.ring_seed, self.total_endpoints)
217+
218+
for i=0, self.total_endpoints-1 do
219+
local j = util.array_mod(index + i, self.total_endpoints)
220+
local endpoint = self.endpoints[j]
221+
222+
if not tried_endpoints[endpoint] then
223+
local eligible, current, allowed = endpoint_eligible(self, endpoint)
224+
225+
if eligible then
226+
ngx.var.chashbl_debug = string_format(
227+
"attempt=%d score=%d allowed=%d total_requests=%d hash_by_value=%s",
228+
i, current, allowed, self.total_requests, hash_by_value)
229+
230+
incr_req_stats(self, endpoint)
231+
tried_endpoints[endpoint] = true
232+
return endpoint
233+
end
234+
end
235+
end
236+
237+
-- Normally, this case should never be reach out because with balance_factor > 1
238+
-- there should always be an eligible endpoint.
239+
-- This would get reached only if the number of endpoints is less or equal
240+
-- than max Nginx retries and tried_endpoints contains all endpoints.
241+
incr_req_stats(self, first_endpoint)
242+
ngx.var.chashbl_debug = "fallback_first_endpoint"
243+
return first_endpoint
244+
end
245+
246+
function _M.after_balance(self)
247+
local tried_upstreams = split.split_upstream_var(ngx.var.upstream_addr)
248+
if (not tried_upstreams) or (not get_hash_by_value(self)) then
249+
return
250+
end
251+
252+
for _, addr in ipairs(tried_upstreams) do
253+
decr_req_stats(self, addr)
254+
end
255+
end
256+
257+
return _M

0 commit comments

Comments
 (0)