@@ -33,6 +33,8 @@ import (
3333 "github.com/containerd/nri/pkg/net"
3434 "github.com/containerd/nri/pkg/net/multiplex"
3535 "github.com/containerd/ttrpc"
36+ "google.golang.org/grpc/codes"
37+ "google.golang.org/grpc/status"
3638)
3739
3840const (
@@ -414,19 +416,101 @@ func (p *plugin) synchronize(ctx context.Context, pods []*PodSandbox, containers
414416 ctx , cancel := context .WithTimeout (ctx , getPluginRequestTimeout ())
415417 defer cancel ()
416418
417- req := & SynchronizeRequest {
418- Pods : pods ,
419- Containers : containers ,
420- }
421- rpl , err := p .stub .Synchronize (ctx , req )
422- if err != nil {
423- p .close ()
424- return nil , err
419+ var (
420+ podsToSend = pods
421+ ctrsToSend = containers
422+ podsPerMsg = len (pods )
423+ ctrsPerMsg = len (containers )
424+
425+ rpl * SynchronizeResponse
426+ err error
427+ )
428+
429+ for {
430+ req := & SynchronizeRequest {
431+ Pods : podsToSend [:podsPerMsg ],
432+ Containers : ctrsToSend [:ctrsPerMsg ],
433+ More : len (podsToSend ) > podsPerMsg || len (ctrsToSend ) > ctrsPerMsg ,
434+ }
435+
436+ log .Debugf (ctx , "sending sync message, %d/%d, %d/%d (more: %v)" ,
437+ len (req .Pods ), len (podsToSend ), len (req .Containers ), len (ctrsToSend ), req .More )
438+
439+ rpl , err = p .stub .Synchronize (ctx , req )
440+ if err == nil {
441+ if ! req .More {
442+ break
443+ }
444+
445+ if len (rpl .Update ) > 0 || rpl .More != req .More {
446+ p .close ()
447+ return nil , fmt .Errorf ("plugin does not handle split sync requests" )
448+ }
449+
450+ podsToSend = podsToSend [podsPerMsg :]
451+ ctrsToSend = ctrsToSend [ctrsPerMsg :]
452+
453+ if podsPerMsg > len (podsToSend ) {
454+ podsPerMsg = len (podsToSend )
455+ }
456+ if ctrsPerMsg > len (ctrsToSend ) {
457+ ctrsPerMsg = len (ctrsToSend )
458+ }
459+ } else {
460+ podsPerMsg , ctrsPerMsg , err = recalcObjsPerSyncMsg (podsPerMsg , ctrsPerMsg , err )
461+ if err != nil {
462+ p .close ()
463+ return nil , err
464+ }
465+
466+ log .Debugf (ctx , "oversized message, retrying in smaller chunks" )
467+ }
425468 }
426469
427470 return rpl .Update , nil
428471}
429472
473+ func recalcObjsPerSyncMsg (pods , ctrs int , err error ) (int , int , error ) {
474+ const (
475+ minObjsPerMsg = 8
476+ )
477+
478+ if status .Code (err ) != codes .ResourceExhausted {
479+ return pods , ctrs , err
480+ }
481+
482+ if pods + ctrs <= minObjsPerMsg {
483+ return pods , ctrs , fmt .Errorf ("failed to synchronize plugin with split messages" )
484+ }
485+
486+ var e * ttrpc.OversizedMessageErr
487+ if ! errors .As (err , & e ) {
488+ return pods , ctrs , fmt .Errorf ("failed to synchronize plugin with split messages" )
489+ }
490+
491+ maxLen := e .MaximumLength ()
492+ msgLen := e .RejectedLength ()
493+
494+ if msgLen == 0 || maxLen == 0 || msgLen <= maxLen {
495+ return pods , ctrs , fmt .Errorf ("failed to synchronize plugin with split messages" )
496+ }
497+
498+ factor := float64 (maxLen ) / float64 (msgLen )
499+ if factor > 0.9 {
500+ factor = 0.9
501+ }
502+
503+ pods = int (float64 (pods ) * factor )
504+ ctrs = int (float64 (ctrs ) * factor )
505+
506+ if pods + ctrs < minObjsPerMsg {
507+ pods = minObjsPerMsg / 2
508+ ctrs = minObjsPerMsg / 2
509+ }
510+
511+ return pods , ctrs , nil
512+ }
513+
430514// Relay CreateContainer request to plugin.
431515func (p * plugin ) createContainer (ctx context.Context , req * CreateContainerRequest ) (* CreateContainerResponse , error ) {
432516 if ! p .events .IsSet (Event_CREATE_CONTAINER ) {
0 commit comments