Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions pkg/adaptation/adaptation_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package adaptation_test

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/containerd/nri/pkg/api"
"github.com/containerd/nri/pkg/plugin"
validator "github.com/containerd/nri/plugins/default-validator/builtin"
"github.com/containerd/ttrpc"
rspec "github.com/opencontainers/runtime-spec/specs-go"
)

Expand Down Expand Up @@ -94,6 +96,51 @@ var _ = Describe("Configuration", func() {
Expect(plugin.Start(s.dir)).ToNot(Succeed())
})
})

When("early connection loss during plugin startup", func() {
BeforeEach(func() {
nri.SetPluginRegistrationTimeout(1 * time.Nanosecond)

s.Prepare(
&mockRuntime{},
&mockPlugin{
idx: "00",
name: "test",
},
)
})

AfterEach(func() {
nri.SetPluginRegistrationTimeout(nri.DefaultPluginRegistrationTimeout)
})

It("should not cause a plugin to get stuck", func() {
var (
runtime = s.runtime
plugin = s.plugins[0]
errCh = make(chan error, 1)
err error
)

Expect(runtime.Start(s.dir)).To(Succeed())

go func() {
err := plugin.Start(s.dir)
errCh <- err
}()

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

select {
case <-ctx.Done():
err = ctx.Err()
case err = <-errCh:
}

Expect(errors.Is(err, ttrpc.ErrClosed)).To(BeTrue())
})
})
})

var _ = Describe("Adaptation", func() {
Expand Down
11 changes: 10 additions & 1 deletion pkg/stub/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,12 @@ func (stub *stub) register(ctx context.Context) error {

// Handle a lost connection.
func (stub *stub) connClosed() {
select {
// if our connection gets closed before we get Configure()'d, let Start() know
case stub.cfgErrC <- ttrpc.ErrClosed:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are overloading the cfgErrC is there any chance we end up with two writers to the channel in one Start?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if connClosed writes to cfgErrc while Configure is running, the deferred send in Configure would block forever and leak a goroutine?

What if we make the send in Configure non-blocking as well? That would ensure whichever function gets there first wins.

defer func() {
	select {
	case stub.cfgErrC <- retErr:
	default:
	}
}()

What do you think?

Copy link
Member Author

@klihub klihub Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure, because I don't know (and did not try to check/test) how racy a socket connection close soon after a ttrpc message sending over the same socket can end up being wrt. ttrpc delivering the message or an onClose() callback. Therefore I wanted to err on the side of safety, so although cfgErrC is a buffered channel with a capacity of 1, I still decided to do an attempted/non-blocking send here with a select, so we can't get stuck here. If sending here fails, it means that there is already a pending/unreceived cfgErr in the channel, which will nudge Start() out of the wait-receive, so no harm is done if the extra ttrpc.ErrClosed attempted to send here is lost. And if Configure() has not failed yet, or the failure error has not been delivered over the channel yet, sending will succeed as the channel is empty, which again should nudge Start() out of the wait-receive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if connClosed writes to cfgErrc while Configure is running, the deferred send in Configure would block forever and leak a goroutine?

Hmm, I think it shouldn't. The channel is buffered with a capacity of 1. We always read/receive 1 error, sent from Configure() or connClosed(), whichever comes first, and we try to send at most 2. So if we send 2, then 1 stays buffered in the channel, which should not be a problem either, because if the stub is ever re-Start()ed (so we try to go through this again), it creates a new cfgErrC channel.

What if we make the send in Configure non-blocking as well? That would ensure whichever function gets there first wins.

defer func() {
	select {
	case stub.cfgErrC <- retErr:
	default:
	}
}()

What do you think?

Yes, I think that is a good idea. I updated the PR accordingly.

default:
}

stub.Lock()
stub.close()
stub.Unlock()
Expand Down Expand Up @@ -628,7 +634,10 @@ func (stub *stub) Configure(ctx context.Context, req *api.ConfigureRequest) (rpl
stub.requestTimeout = time.Duration(req.RequestTimeout * int64(time.Millisecond))

defer func() {
stub.cfgErrC <- retErr
select {
case stub.cfgErrC <- retErr:
default:
}
}()

if handler := stub.handlers.Configure; handler == nil {
Expand Down
Loading