Skip to content

Commit ecb939b

Browse files
authored
x/ref/runtime/internal/rpc: ensure that a proxied server expands to use all available proxies (#186)
Prior to this PR, a server using the 'all' proxy policy would expand the set of proxy instances that it used when it encountered an error with any of existing proxy instances. This PR changes the behavior so that servers will monitor the set of available proxy instances and expand to use all available ones ahead of any errors being encountered. Also, include a fix for big sur whereby the error messages for connection refused appears to have changed.
1 parent 399567d commit ecb939b

File tree

4 files changed

+197
-23
lines changed

4 files changed

+197
-23
lines changed

x/ref/runtime/internal/rpc/proxymgr.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ func newProxyManager(s serverProxyAPI, proxyName string, policy rpc.ProxyPolicy,
5959
}
6060

6161
func (pm *proxyManager) selectRandomSubsetLocked(needed, available int) map[int]bool {
62+
if needed == 0 {
63+
return nil
64+
}
6265
selected := map[int]bool{}
6366
for {
6467
candidate := pm.rand.Intn(available)
@@ -160,6 +163,7 @@ func (pm *proxyManager) updateAvailableProxies(ctx *context.T) {
160163
pm.Lock()
161164
defer pm.Unlock()
162165
pm.proxies = updated
166+
163167
}
164168

165169
func (pm *proxyManager) markActive(ep naming.Endpoint) {
@@ -175,8 +179,6 @@ func (pm *proxyManager) markInActive(ep naming.Endpoint) {
175179
}
176180

177181
func (pm *proxyManager) connectToSingleProxy(ctx *context.T, name string, ep naming.Endpoint) {
178-
pm.markActive(ep)
179-
defer pm.markInActive(ep)
180182
for delay := pm.reconnectDelay; ; delay = nextDelay(delay) {
181183
if !pm.isAvailable(ep) {
182184
ctx.Infof("connectToSingleProxy(%q): proxy is no longer available\n", ep)
@@ -212,14 +214,26 @@ func (pm *proxyManager) tryConnections(ctx *context.T, notifyCh chan struct{}) b
212214
return false
213215
}
214216
for _, ep := range idle {
217+
if !pm.canGrow() {
218+
continue
219+
}
220+
pm.markActive(ep)
215221
go func(ep naming.Endpoint) {
216222
pm.connectToSingleProxy(ctx, pm.proxyName, ep)
217-
notifyCh <- struct{}{}
223+
pm.markInActive(ep)
224+
sendNotify(notifyCh)
218225
}(ep)
219226
}
220227
return true
221228
}
222229

230+
func sendNotify(ch chan struct{}) {
231+
select {
232+
case ch <- struct{}{}:
233+
default:
234+
}
235+
}
236+
223237
func drainNotifyChan(ch chan struct{}) {
224238
for {
225239
select {
@@ -230,9 +244,29 @@ func drainNotifyChan(ch chan struct{}) {
230244
}
231245
}
232246

247+
func (pm *proxyManager) watchForChanges(ctx *context.T, ch chan struct{}) {
248+
for {
249+
select {
250+
case <-ctx.Done():
251+
return
252+
case <-time.After(pm.resolveDelay):
253+
pm.updateAvailableProxies(ctx)
254+
if pm.shouldGrow() && pm.canGrow() {
255+
sendNotify(ch)
256+
}
257+
}
258+
}
259+
}
260+
233261
func (pm *proxyManager) manageProxyConnections(ctx *context.T) {
234262
notifyCh := make(chan struct{}, 10)
235263
pm.updateAvailableProxies(ctx)
264+
// Watch for changes in the set of available proxies so that for the
265+
// 'all' policy, the server will connect to new proxies as they appear.
266+
// For other policies reconnection may be little faster since the
267+
// new set of proxies is already available.
268+
go pm.watchForChanges(ctx, notifyCh)
269+
236270
for {
237271
select {
238272
case <-ctx.Done():
@@ -253,7 +287,7 @@ func (pm *proxyManager) manageProxyConnections(ctx *context.T) {
253287
drainNotifyChan(notifyCh)
254288
}
255289
// Wait for a change in the set of available proxies.
256-
if pm.shouldGrow() {
290+
if pm.shouldGrow() && !pm.canGrow() {
257291
for {
258292
select {
259293
case <-ctx.Done():

x/ref/runtime/internal/rpc/proxymgr_test.go

Lines changed: 71 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,19 @@ func TestSingleProxyConnections(t *testing.T) {
322322
}
323323
}
324324

325+
func waitForExpected(sm *mockServerAPI, proxyName string, expected int) {
326+
// Wait for the expected number of connections.
327+
for {
328+
time.Sleep(100 * time.Millisecond)
329+
sm.Lock()
330+
if len(sm.listening[proxyName]) == expected {
331+
sm.Unlock()
332+
break
333+
}
334+
sm.Unlock()
335+
}
336+
}
337+
325338
func TestMultipleProxyConnections(t *testing.T) {
326339
ctx, shutdown := test.V23Init()
327340
defer shutdown()
@@ -363,13 +376,7 @@ func TestMultipleProxyConnections(t *testing.T) {
363376
time.Sleep(time.Millisecond * 100)
364377
sm.setEndpoints(proxyName, eps...)
365378

366-
// Wait for the expected number of connections.
367-
for {
368-
time.Sleep(100 * time.Millisecond)
369-
if len(sm.listening[proxyName]) == tc.expected {
370-
break
371-
}
372-
}
379+
waitForExpected(sm, proxyName, tc.expected)
373380

374381
// Remove the endpoints and finish the current listeners.
375382
sm.setEndpoints(proxyName)
@@ -394,14 +401,65 @@ func TestMultipleProxyConnections(t *testing.T) {
394401
pm.updateAvailableProxies(ctx)
395402

396403
// Wait for the expected number of connections.
397-
for {
398-
time.Sleep(100 * time.Millisecond)
399-
if len(sm.listening[proxyName]) == tc.expected {
400-
break
401-
}
402-
}
404+
waitForExpected(sm, proxyName, tc.expected)
405+
406+
cancel()
407+
// Should immediately return if the context is already canceled.
408+
pm.manageProxyConnections(cctx)
409+
410+
wg.Wait()
411+
}
412+
413+
}
414+
415+
func TestMultipleProxyConnectionExpansion(t *testing.T) {
416+
ctx, shutdown := test.V23Init()
417+
defer shutdown()
418+
sm := newMockServer()
419+
ep1 := newEndpoint("5000")
420+
ep2 := newEndpoint("5001")
421+
ep3 := newEndpoint("5002")
422+
423+
fpm := newProxyManager(sm, "proxy0", rpc.UseFirstProxy, 1)
424+
rpm := newProxyManager(sm, "proxy1", rpc.UseRandomProxy, 1)
425+
apm := newProxyManager(sm, "proxy2", rpc.UseAllProxies, 3)
426+
427+
for i, tc := range []struct {
428+
pm *proxyManager
429+
initial int
430+
final int
431+
}{
432+
{fpm, 1, 1},
433+
{rpm, 1, 1},
434+
{apm, 1, 3},
435+
} {
436+
cctx, cancel := context.WithCancel(ctx)
437+
pm := tc.pm
438+
// tune down the delays
439+
pm.resolveDelay = time.Millisecond
440+
pm.reconnectDelay = time.Millisecond
441+
proxyName := fmt.Sprintf("proxy%v", i)
442+
ch := make(chan struct{})
443+
sm.setChan(proxyName, ch)
444+
445+
var wg sync.WaitGroup
446+
wg.Add(1)
447+
go func(pm *proxyManager) {
448+
pm.manageProxyConnections(cctx)
449+
wg.Done()
450+
}(pm)
451+
452+
sm.setEndpoints(proxyName, ep1)
453+
454+
waitForExpected(sm, proxyName, tc.initial)
455+
sm.setEndpoints(proxyName, ep1, ep2, ep3)
456+
time.Sleep(100 * time.Millisecond)
457+
458+
// Wait for the expected number of connections.
459+
waitForExpected(sm, proxyName, tc.final)
403460

404461
cancel()
462+
405463
// Should immediately return if the context is already canceled.
406464
pm.manageProxyConnections(cctx)
407465

x/ref/runtime/internal/rpc/test/client_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,15 @@ func TestStartCallErrors(t *testing.T) { //nolint:gocyclo
295295
if !errors.Is(err, verror.ErrNoServers) {
296296
t.Errorf("wrong error: %s", err)
297297
}
298-
if want := "connection refused"; !strings.Contains(verror.DebugString(err), want) {
299-
t.Errorf("wrong error: %s - doesn't contain %q", err, want)
298+
found := false
299+
allowed := []string{"connection reset by peer", "connection refused"}
300+
for _, want := range allowed {
301+
if strings.Contains(verror.DebugString(err), want) {
302+
found = true
303+
}
304+
}
305+
if !found {
306+
t.Errorf("wrong error: %s - doesn't contain one of %q", err, allowed)
300307
}
301308

302309
// This will fail with NoServers, but because there really is no

x/ref/services/xproxy/xproxyd/proxyd_v23_test.go

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ const (
4646
responseVar = "RESPONSE" // Name of the variable used by client program to output the first response
4747
responseVar1 = "RESPONSE1" // Name of the variable used by client program to output the second response
4848
downloadSize = 64 * 1024 * 1024
49+
using1OfMProxies = 1 // Use 1 out of the total available set of proxies when using all proxies.
4950
using2OfMProxies = 2 // Use 2 out of the total available set of proxies when using all proxies.
5051
)
5152

@@ -106,7 +107,7 @@ func TestV23MultipleProxyd(t *testing.T) {
106107
firstProxyAddress, firstProxyLog, _, err := startServer(t, sh, serverName, 1, runServer, serverCreds)
107108
assert("first proxy policy server", firstProxyLog)
108109

109-
allProxiesAddress, allProxiesLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxies, serverCreds)
110+
allProxiesAddress, allProxiesLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxiesLimit2, serverCreds)
110111
assert("all proxies policy server", allProxiesLog)
111112

112113
// Run all of the clients.
@@ -279,7 +280,7 @@ func TestV23MultiProxyResilience(t *testing.T) {
279280
assert("first two proxies", logsForProxies(first2)...)
280281

281282
// Start the server.
282-
serverAddress, serverLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxies, serverCreds)
283+
serverAddress, serverLog, _, err := startServer(t, sh, serverNameAll, using2OfMProxies, runServerAllProxiesLimit2, serverCreds)
283284
assert("server", serverLog)
284285

285286
// Run the client.
@@ -343,6 +344,74 @@ func TestV23MultiProxyResilience(t *testing.T) {
343344

344345
}
345346

347+
func TestV23MultiProxyExpansion(t *testing.T) {
348+
349+
v23test.SkipUnlessRunningIntegrationTests(t)
350+
sh := v23test.NewShell(t, nil)
351+
defer sh.Cleanup()
352+
sh.StartRootMountTable()
353+
354+
var (
355+
serverCreds = sh.ForkCredentials("server")
356+
clientCreds = sh.ForkCredentials("client")
357+
err error
358+
)
359+
360+
assert := func(msg string, logs ...*bytes.Buffer) {
361+
assertWithLog(t, err, msg, logs)
362+
}
363+
364+
ns := v23.GetNamespace(sh.Ctx)
365+
ns.CacheCtl(naming.DisableCache(true))
366+
367+
// Start a single proxy.
368+
first, _, firstStatsAddrs := startInitialSetOfProxies(t, sh, 1)
369+
assert("first proxy", logsForProxies(first)...)
370+
371+
// Start the server.
372+
serverAddress, serverLog, _, err := startServer(t, sh, serverNameAll, using1OfMProxies, runServerAllProxiesNoLimit, serverCreds)
373+
assert("server", serverLog)
374+
375+
// Run the client.
376+
err = runSingleClient(sh, runClientAllProxiesServer, clientCreds)
377+
assert("client")
378+
379+
// Gather stats and make sure the the server is using the first proxy.
380+
ctx, err := v23.WithPrincipal(sh.Ctx, serverCreds.Principal)
381+
assert("withPrincipal")
382+
383+
requests, _, _, err := gatherStats(ctx, firstStatsAddrs, serverAddress)
384+
assert("gatherStats")
385+
386+
used := proxiesUsedForServer(requests, serverAddress)
387+
if got, want := used, []int{0}; !reflect.DeepEqual(got, want) {
388+
t.Fatalf("got %v, want %v", got, want)
389+
}
390+
391+
// Start two more proxies and wait for the server to notice them.
392+
second := startProxy(t, sh)
393+
third := startProxy(t, sh)
394+
395+
_, err = waitForNMountedServers(t, sh.Ctx, ns, serverNameAll, 3)
396+
assert("second and third proxies and server log", second.log, third.log, serverLog)
397+
398+
_, proxyStatsAddresses, err := waitForNProxies(t, sh.Ctx, ns, 3)
399+
assert("wait for all three proxies to be in the mounttable")
400+
401+
// Run the client.
402+
err = runSingleClient(sh, runClientAllProxiesServer, clientCreds)
403+
assert("client with two proxies again")
404+
405+
requests, _, _, err = gatherStats(ctx, proxyStatsAddresses, serverAddress)
406+
assert("gatherStats")
407+
408+
used = proxiesUsedForServer(requests, serverAddress)
409+
if got, want := len(used), 3; !reflect.DeepEqual(got, want) {
410+
t.Fatalf("got %v, want %v", got, want)
411+
}
412+
413+
}
414+
346415
func TestV23SingleProxyResilience(t *testing.T) {
347416
v23test.SkipUnlessRunningIntegrationTests(t)
348417
sh := v23test.NewShell(t, nil)
@@ -701,11 +770,17 @@ var runServerRandomProxy = gosh.RegisterFunc(
701770
"runServerRandomProxy",
702771
createProxiedServer(serverNameRandom, proxyName, rpc.UseRandomProxy, 0),
703772
)
704-
var runServerAllProxies = gosh.RegisterFunc(
705-
"runServerAllProxies",
773+
774+
var runServerAllProxiesLimit2 = gosh.RegisterFunc(
775+
"runServerAllProxiesLimit2",
706776
createProxiedServer(serverNameAll, proxyName, rpc.UseAllProxies, using2OfMProxies),
707777
)
708778

779+
var runServerAllProxiesNoLimit = gosh.RegisterFunc(
780+
"runServerAllProxiesNoLimit",
781+
createProxiedServer(serverNameAll, proxyName, rpc.UseAllProxies, 0),
782+
)
783+
709784
func createClient(serverName string, iterations int) func() error {
710785
return func() error {
711786
ctx, shutdown := test.V23Init()

0 commit comments

Comments
 (0)