Skip to content

Commit bf267e3

Browse files
committed
stub: collect/handle split sync messages.
Handle split sync messages in the stub, opaque to the client. Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
1 parent ed78ae9 commit bf267e3

File tree

1 file changed

+43
-2
lines changed

1 file changed

+43
-2
lines changed

pkg/stub/stub.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ type stub struct {
273273
doneC chan struct{}
274274
srvErrC chan error
275275
cfgErrC chan error
276+
syncReq *api.SynchronizeRequest
276277

277278
registrationTimeout time.Duration
278279
requestTimeout time.Duration
@@ -469,6 +470,7 @@ func (stub *stub) close() {
469470

470471
stub.started = false
471472
stub.conn = nil
473+
stub.syncReq = nil
472474
}
473475

474476
// Run the plugin. Start event processing then wait for an error or getting stopped.
@@ -644,11 +646,50 @@ func (stub *stub) Configure(ctx context.Context, req *api.ConfigureRequest) (rpl
644646
func (stub *stub) Synchronize(ctx context.Context, req *api.SynchronizeRequest) (*api.SynchronizeResponse, error) {
645647
handler := stub.handlers.Synchronize
646648
if handler == nil {
647-
return &api.SynchronizeResponse{}, nil
649+
return &api.SynchronizeResponse{More: req.More}, nil
648650
}
649-
update, err := handler(ctx, req.Pods, req.Containers)
651+
652+
if req.More {
653+
return stub.collectSync(req)
654+
}
655+
656+
return stub.deliverSync(ctx, req)
657+
}
658+
659+
func (stub *stub) collectSync(req *api.SynchronizeRequest) (*api.SynchronizeResponse, error) {
660+
stub.Lock()
661+
defer stub.Unlock()
662+
663+
log.Debugf(noCtx, "collecting sync req with %d pods, %d containers...",
664+
len(req.Pods), len(req.Containers))
665+
666+
if stub.syncReq == nil {
667+
stub.syncReq = req
668+
} else {
669+
stub.syncReq.Pods = append(stub.syncReq.Pods, req.Pods...)
670+
stub.syncReq.Containers = append(stub.syncReq.Containers, req.Containers...)
671+
}
672+
673+
return &api.SynchronizeResponse{More: req.More}, nil
674+
}
675+
676+
func (stub *stub) deliverSync(ctx context.Context, req *api.SynchronizeRequest) (*api.SynchronizeResponse, error) {
677+
stub.Lock()
678+
syncReq := stub.syncReq
679+
stub.syncReq = nil
680+
stub.Unlock()
681+
682+
if syncReq == nil {
683+
syncReq = req
684+
} else {
685+
syncReq.Pods = append(syncReq.Pods, req.Pods...)
686+
syncReq.Containers = append(syncReq.Containers, req.Containers...)
687+
}
688+
689+
update, err := stub.handlers.Synchronize(ctx, syncReq.Pods, syncReq.Containers)
650690
return &api.SynchronizeResponse{
651691
Update: update,
692+
More: false,
652693
}, err
653694
}
654695

0 commit comments

Comments
 (0)