@@ -33,6 +33,8 @@ import (
3333	"github.com/containerd/nri/pkg/net" 
3434	"github.com/containerd/nri/pkg/net/multiplex" 
3535	"github.com/containerd/ttrpc" 
36+ 	"google.golang.org/grpc/codes" 
37+ 	"google.golang.org/grpc/status" 
3638)
3739
3840const  (
@@ -414,19 +416,101 @@ func (p *plugin) synchronize(ctx context.Context, pods []*PodSandbox, containers
414416	ctx , cancel  :=  context .WithTimeout (ctx , getPluginRequestTimeout ())
415417	defer  cancel ()
416418
417- 	req  :=  & SynchronizeRequest {
418- 		Pods :       pods ,
419- 		Containers : containers ,
420- 	}
421- 	rpl , err  :=  p .stub .Synchronize (ctx , req )
422- 	if  err  !=  nil  {
423- 		p .close ()
424- 		return  nil , err 
419+ 	var  (
420+ 		podsToSend  =  pods 
421+ 		ctrsToSend  =  containers 
422+ 		podsPerMsg  =  len (pods )
423+ 		ctrsPerMsg  =  len (containers )
424+ 
425+ 		rpl  * SynchronizeResponse 
426+ 		err  error 
427+ 	)
428+ 
429+ 	for  {
430+ 		req  :=  & SynchronizeRequest {
431+ 			Pods :       podsToSend [:podsPerMsg ],
432+ 			Containers : ctrsToSend [:ctrsPerMsg ],
433+ 			More :       len (podsToSend ) >  podsPerMsg  ||  len (ctrsToSend ) >  ctrsPerMsg ,
434+ 		}
435+ 
436+ 		log .Debugf (ctx , "sending sync message, %d/%d, %d/%d (more: %v)" ,
437+ 			len (req .Pods ), len (podsToSend ), len (req .Containers ), len (ctrsToSend ), req .More )
438+ 
439+ 		rpl , err  =  p .stub .Synchronize (ctx , req )
440+ 		if  err  ==  nil  {
441+ 			if  ! req .More  {
442+ 				break 
443+ 			}
444+ 
445+ 			if  len (rpl .Update ) >  0  ||  rpl .More  !=  req .More  {
446+ 				p .close ()
447+ 				return  nil , fmt .Errorf ("plugin does not handle split sync requests" )
448+ 			}
449+ 
450+ 			podsToSend  =  podsToSend [podsPerMsg :]
451+ 			ctrsToSend  =  ctrsToSend [ctrsPerMsg :]
452+ 
453+ 			if  podsPerMsg  >  len (podsToSend ) {
454+ 				podsPerMsg  =  len (podsToSend )
455+ 			}
456+ 			if  ctrsPerMsg  >  len (ctrsToSend ) {
457+ 				ctrsPerMsg  =  len (ctrsToSend )
458+ 			}
459+ 		} else  {
460+ 			podsPerMsg , ctrsPerMsg , err  =  recalcObjsPerSyncMsg (podsPerMsg , ctrsPerMsg , err )
461+ 			if  err  !=  nil  {
462+ 				p .close ()
463+ 				return  nil , err 
464+ 			}
465+ 
466+ 			log .Debugf (ctx , "oversized message, retrying in smaller chunks" )
467+ 		}
425468	}
426469
427470	return  rpl .Update , nil 
428471}
429472
473+ func  recalcObjsPerSyncMsg (pods , ctrs  int , err  error ) (int , int , error ) {
474+ 	const  (
475+ 		minObjsPerMsg  =  8 
476+ 	)
477+ 
478+ 	if  status .Code (err ) !=  codes .ResourceExhausted  {
479+ 		return  pods , ctrs , err 
480+ 	}
481+ 
482+ 	if  pods + ctrs  <=  minObjsPerMsg  {
483+ 		return  pods , ctrs , fmt .Errorf ("failed to synchronize plugin with split messages" )
484+ 	}
485+ 
486+ 	var  e  * ttrpc.OversizedMessageErr 
487+ 	if  ! errors .As (err , & e ) {
488+ 		return  pods , ctrs , fmt .Errorf ("failed to synchronize plugin with split messages" )
489+ 	}
490+ 
491+ 	maxLen  :=  e .MaximumLength ()
492+ 	msgLen  :=  e .RejectedLength ()
493+ 
494+ 	if  msgLen  ==  0  ||  maxLen  ==  0  ||  msgLen  <=  maxLen  {
495+ 		return  pods , ctrs , fmt .Errorf ("failed to synchronize plugin with split messages" )
496+ 	}
497+ 
498+ 	factor  :=  float64 (maxLen ) /  float64 (msgLen )
499+ 	if  factor  >  0.9  {
500+ 		factor  =  0.9 
501+ 	}
502+ 
503+ 	pods  =  int (float64 (pods ) *  factor )
504+ 	ctrs  =  int (float64 (ctrs ) *  factor )
505+ 
506+ 	if  pods + ctrs  <  minObjsPerMsg  {
507+ 		pods  =  minObjsPerMsg  /  2 
508+ 		ctrs  =  minObjsPerMsg  /  2 
509+ 	}
510+ 
511+ 	return  pods , ctrs , nil 
512+ }
513+ 
430514// Relay CreateContainer request to plugin. 
431515func  (p  * plugin ) createContainer (ctx  context.Context , req  * CreateContainerRequest ) (* CreateContainerResponse , error ) {
432516	if  ! p .events .IsSet (Event_CREATE_CONTAINER ) {
0 commit comments