Skip to content

Commit d8515b0

Browse files
committed
stub: cancel Start() for early lost connection.
If our connection gets closed before we had a chance to get Configure()'d by the runtime, cancel Start()'s wait for the result by letting it know about the failure. Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
1 parent 9b8befa commit d8515b0

File tree

3 files changed

+77
-4
lines changed

3 files changed

+77
-4
lines changed

pkg/adaptation/adaptation_suite_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package adaptation_test
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"os"
2324
"path/filepath"
@@ -38,6 +39,7 @@ import (
3839
"github.com/containerd/nri/pkg/api"
3940
"github.com/containerd/nri/pkg/plugin"
4041
validator "github.com/containerd/nri/plugins/default-validator/builtin"
42+
"github.com/containerd/ttrpc"
4143
rspec "github.com/opencontainers/runtime-spec/specs-go"
4244
)
4345

@@ -94,6 +96,57 @@ var _ = Describe("Configuration", func() {
9496
Expect(plugin.Start(s.dir)).ToNot(Succeed())
9597
})
9698
})
99+
100+
When("early connection loss during plugin startup", func() {
101+
BeforeEach(func() {
102+
nri.SetPluginRegistrationTimeout(1 * time.Second)
103+
nri.SetPluginRequestTimeout(1 * time.Second)
104+
105+
s.Prepare(
106+
&mockRuntime{},
107+
&mockPlugin{
108+
idx: "00",
109+
name: "test",
110+
configure: func(_ *mockPlugin, _, _, _ string) (api.EventMask, error) {
111+
time.Sleep(3 * time.Second)
112+
return 0, nil
113+
},
114+
},
115+
)
116+
})
117+
118+
AfterEach(func() {
119+
nri.SetPluginRegistrationTimeout(nri.DefaultPluginRegistrationTimeout)
120+
nri.SetPluginRequestTimeout(nri.DefaultPluginRequestTimeout)
121+
})
122+
123+
It("should not cause a plugin to get stuck", func() {
124+
var (
125+
runtime = s.runtime
126+
plugin = s.plugins[0]
127+
errCh = make(chan error, 1)
128+
err error
129+
)
130+
131+
Expect(runtime.Start(s.dir)).To(Succeed())
132+
133+
go func() {
134+
err := plugin.Start(s.dir)
135+
errCh <- err
136+
}()
137+
138+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
139+
defer cancel()
140+
141+
select {
142+
case <-ctx.Done():
143+
err = ctx.Err()
144+
case err = <-errCh:
145+
}
146+
147+
Expect(errors.Is(err, ttrpc.ErrClosed)).To(BeTrue())
148+
})
149+
})
97150
})
98151

99152
var _ = Describe("Adaptation", func() {

pkg/adaptation/suite_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ type mockPlugin struct {
378378
pods map[string]*api.PodSandbox
379379
ctrs map[string]*api.Container
380380

381+
configure func(*mockPlugin, string, string, string) (stub.EventMask, error)
381382
runPodSandbox func(*mockPlugin, *api.PodSandbox, *api.Container) error
382383
updatePodSandbox func(*mockPlugin, *api.PodSandbox, *api.LinuxResources, *api.LinuxResources) error
383384
postUpdatePodSandbox func(*mockPlugin, *api.PodSandbox, *api.Container) error
@@ -559,18 +560,28 @@ func (m *mockPlugin) onClose() {
559560
}
560561
}
561562

562-
func (m *mockPlugin) Configure(_ context.Context, _, runtime, version string) (stub.EventMask, error) {
563+
func (m *mockPlugin) Configure(_ context.Context, cfg, runtime, version string) (stub.EventMask, error) {
564+
var (
565+
events stub.EventMask
566+
err error
567+
)
568+
563569
m.q.Add(PluginConfigured)
564570

565571
m.runtime = runtime
566572
m.version = version
567573

568-
events := m.mask
574+
if m.configure != nil {
575+
events, err = m.configure(m, cfg, runtime, version)
576+
} else {
577+
events, err = m.mask, nil
578+
}
579+
569580
if m.validateAdjustment == nil {
570581
events.Clear(api.Event_VALIDATE_CONTAINER_ADJUSTMENT)
571582
}
572583

573-
return events, nil
584+
return events, err
574585
}
575586

576587
func (m *mockPlugin) Synchronize(_ context.Context, pods []*api.PodSandbox, ctrs []*api.Container) ([]*api.ContainerUpdate, error) {

pkg/stub/stub.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,12 @@ func (stub *stub) register(ctx context.Context) error {
584584

585585
// Handle a lost connection.
586586
func (stub *stub) connClosed() {
587+
select {
588+
// if our connection gets closed before we get Configure()'d, let Start() know
589+
case stub.cfgErrC <- ttrpc.ErrClosed:
590+
default:
591+
}
592+
587593
stub.Lock()
588594
stub.close()
589595
stub.Unlock()
@@ -628,7 +634,10 @@ func (stub *stub) Configure(ctx context.Context, req *api.ConfigureRequest) (rpl
628634
stub.requestTimeout = time.Duration(req.RequestTimeout * int64(time.Millisecond))
629635

630636
defer func() {
631-
stub.cfgErrC <- retErr
637+
select {
638+
case stub.cfgErrC <- retErr:
639+
default:
640+
}
632641
}()
633642

634643
if handler := stub.handlers.Configure; handler == nil {

0 commit comments

Comments
 (0)