Skip to content

Commit c42147e

Browse files
author
vietanhduong
committed
fix: timeout when pause cluster
Signed-off-by: vietanhduong <anh.duong@kyber.network>
1 parent a41186b commit c42147e

File tree

1 file changed

+94
-79
lines changed

1 file changed

+94
-79
lines changed

pkg/gcloud/gke/gke.go

Lines changed: 94 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,11 @@ func (c *Client) ListClusters(project string) ([]*apis.Cluster, error) {
3737
}
3838
defer conn.Close()
3939

40-
igConn, err := c.newInstanceGroupsConn()
40+
migConn, err := c.newManagedInstanceGroupConn()
4141
if err != nil {
4242
return nil, errors.Wrap(err, "list clusters")
4343
}
44-
defer igConn.Close()
44+
defer migConn.Close()
4545

4646
resp, err := conn.ListClusters(context.TODO(), &containerpb.ListClustersRequest{
4747
Parent: fmt.Sprintf("projects/%s/locations/-", project),
@@ -64,7 +64,7 @@ func (c *Client) ListClusters(project string) ([]*apis.Cluster, error) {
6464
InstanceGroups: p.GetInstanceGroupUrls(),
6565
Locations: p.GetLocations(),
6666
InitialNodeCount: p.GetInitialNodeCount(),
67-
CurrentSize: int32(getNodePoolSize(igConn, project, e.GetName(), p.GetName(), p.GetLocations())),
67+
CurrentSize: int32(getNodePoolSize(migConn, project, e.GetName(), p.GetName(), p.GetLocations())),
6868
Spot: p.GetConfig().GetSpot(),
6969
Preemptible: p.GetConfig().GetPreemptible(),
7070
}
@@ -100,11 +100,11 @@ func (c *Client) GetCluster(project, location, name string) (*apis.Cluster, erro
100100
}
101101
defer conn.Close()
102102

103-
igConn, err := c.newInstanceGroupsConn()
103+
migConn, err := c.newManagedInstanceGroupConn()
104104
if err != nil {
105105
return nil, errors.Wrap(err, "get cluster")
106106
}
107-
defer igConn.Close()
107+
defer migConn.Close()
108108

109109
req := &containerpb.GetClusterRequest{
110110
Name: fmt.Sprintf("projects/%s/locations/%s/clusters/%s", project, location, name),
@@ -129,7 +129,7 @@ func (c *Client) GetCluster(project, location, name string) (*apis.Cluster, erro
129129
InstanceGroups: p.GetInstanceGroupUrls(),
130130
Locations: p.GetLocations(),
131131
InitialNodeCount: p.GetInitialNodeCount(),
132-
CurrentSize: int32(getNodePoolSize(igConn, project, cluster.GetName(), p.GetName(), p.GetLocations())),
132+
CurrentSize: int32(getNodePoolSize(migConn, project, cluster.GetName(), p.GetName(), p.GetLocations())),
133133
Spot: p.GetConfig().GetSpot(),
134134
Preemptible: p.GetConfig().GetPreemptible(),
135135
}
@@ -158,11 +158,11 @@ func (c *Client) PauseCluster(cluster *apis.Cluster) error {
158158
}
159159
defer conn.Close()
160160

161-
igConn, err := c.newInstanceGroupsConn()
161+
migConn, err := c.newManagedInstanceGroupConn()
162162
if err != nil {
163163
return errors.Wrap(err, "list clusters")
164164
}
165-
defer igConn.Close()
165+
defer migConn.Close()
166166

167167
pause := func(cluster *apis.Cluster, pool *apis.Cluster_NodePool) error {
168168
if pool.GetAutoscaling() != nil || pool.GetAutoscaling().GetEnabled() {
@@ -174,16 +174,38 @@ func (c *Client) PauseCluster(cluster *apis.Cluster) error {
174174
log.Printf("WARN: disable node pool autoscaling for '%s/%s' failed\n", cluster.GetName(), cluster.GetLocation())
175175
return errors.Wrapf(err, "pause '%s/%s'", cluster.GetName(), cluster.GetLocation())
176176
}
177-
if err = waitOp(conn, op); err != nil {
177+
if err = waitContainerOp(conn, op); err != nil {
178178
return errors.Wrapf(err, "pause '%s/%s'", cluster.GetName(), cluster.GetLocation())
179179
}
180180
log.Printf("INFO: disabled autoscaling for '%s/%s'\n", cluster.Name, pool.Name)
181181
}
182182

183-
if err := resize(conn, igConn, cluster, pool, 0); err != nil {
183+
if err := resize(migConn, cluster, pool, 0); err != nil {
184184
return errors.Wrap(err, "pause cluster")
185185
}
186-
return nil
186+
187+
interval := time.NewTicker(10 * time.Second)
188+
counter := 3
189+
for {
190+
select {
191+
case <-interval.C:
192+
// if after 3 times check and the current node is 0, we can mark this pool is scaled down
193+
if current := getNodePoolSize(migConn, cluster.GetProject(), cluster.GetName(), pool.GetName(), pool.GetLocations()); current == 0 {
194+
if counter > 0 {
195+
counter--
196+
} else {
197+
return nil
198+
}
199+
} else {
200+
// we will reset the counter if the current node return size != 0
201+
log.Printf("INFO: resizing pool '%s/%s', current size is %d...\n", cluster.GetName(), pool.GetName(), current)
202+
counter = 3
203+
_ = resize(migConn, cluster, pool, 0)
204+
}
205+
case <-context.Background().Done():
206+
return nil
207+
}
208+
}
187209
}
188210

189211
defer func() {
@@ -212,11 +234,11 @@ func (c *Client) UnpauseCluster(cluster *apis.Cluster) error {
212234
}
213235
defer conn.Close()
214236

215-
igConn, err := c.newInstanceGroupsConn()
237+
migConn, err := c.newManagedInstanceGroupConn()
216238
if err != nil {
217239
return errors.Wrap(err, "list clusters")
218240
}
219-
defer igConn.Close()
241+
defer migConn.Close()
220242
unpause := func(cluster *apis.Cluster, p *apis.Cluster_NodePool) error {
221243
if err := waitClusterOperation(conn, cluster); err != nil {
222244
return errors.Wrap(err, "unpaise cluster")
@@ -236,13 +258,13 @@ func (c *Client) UnpauseCluster(cluster *apis.Cluster) error {
236258
log.Printf("WARN: enable node pool autoscaling for '%s/%s' failed\n", cluster.GetName(), cluster.GetLocation())
237259
return errors.Wrapf(err, "unpause '%s/%s'", cluster.GetName(), cluster.GetLocation())
238260
}
239-
if err = waitOp(conn, op); err != nil {
261+
if err = waitContainerOp(conn, op); err != nil {
240262
return errors.Wrapf(err, "unpause '%s/%s'", cluster.GetName(), cluster.GetLocation())
241263
}
242264
log.Printf("INFO: enabled autoscaling for '%s/%s'\n", cluster.Name, p.Name)
243265
}
244266

245-
if err := resize(conn, igConn, cluster, p, int(p.GetCurrentSize())); err != nil {
267+
if err := resize(migConn, cluster, p, int(p.GetCurrentSize())); err != nil {
246268
return errors.Wrap(err, "unpause cluster")
247269
}
248270
return nil
@@ -332,51 +354,37 @@ func (c *Client) RefreshCluster(cluster *apis.Cluster) error {
332354
return eg.Wait()
333355
}
334356

335-
// resize the input node pool
336-
func resize(clusterConn *container_v1.ClusterManagerClient, igConn *compute_v1.InstanceGroupsClient, cluster *apis.Cluster, pool *apis.Cluster_NodePool, size int) error {
337-
_resize := func() (bool, error) {
338-
currentSize := getNodePoolSize(igConn, cluster.GetProject(), cluster.GetName(), pool.GetName(), pool.GetLocations())
339-
if size == 0 && size == currentSize {
340-
log.Printf("INFO: node pool '%s/%s' has been resized to 0!", cluster.GetName(), pool.GetName())
341-
return true, nil
342-
}
343-
if currentSize >= size && size > 0 {
344-
log.Printf("INFO: node pool '%s/%s' has been resized to %d (current=%d)!\n", cluster.GetName(), pool.GetName(), size, currentSize)
345-
return true, nil
346-
} else {
347-
log.Printf("INFO: resizing node pool '%s/%s'! current=%d; expect=%d\n", cluster.GetName(), pool.GetName(), currentSize, size)
348-
}
349-
if err := waitClusterOperation(clusterConn, cluster); err != nil {
350-
return false, errors.Wrap(err, "resize")
351-
}
357+
// resize the input node pool. This function will resize the MIGs in the input pool instead of the GKE
358+
func resize(conn *compute_v1.InstanceGroupManagersClient, cluster *apis.Cluster, pool *apis.Cluster_NodePool, size int) error {
359+
// get all mig of the input pool
360+
migs, err := findMIGs(conn, cluster.GetProject(), cluster.GetName(), pool.GetName(), pool.GetLocations())
361+
if err != nil {
362+
log.Printf("WARN: find MIGs of '%s/%s' got erorr: %v\n", cluster.GetName(), pool.GetName(), err)
363+
return err
364+
}
352365

353-
req := &containerpb.SetNodePoolSizeRequest{
354-
Name: fmt.Sprintf("projects/%s/locations/%s/clusters/%s/nodePools/%s", cluster.GetProject(), cluster.GetLocation(), cluster.GetName(), pool.GetName()),
355-
NodeCount: int32(size),
366+
_resize := func(mig *computepb.InstanceGroupManager) error {
367+
req := &computepb.ResizeInstanceGroupManagerRequest{
368+
InstanceGroupManager: mig.GetName(),
369+
Project: cluster.GetProject(),
370+
Size: int32(size),
371+
Zone: basename(mig.GetZone()),
356372
}
357-
op, err := clusterConn.SetNodePoolSize(context.Background(), req)
373+
_, err := conn.Resize(context.Background(), req)
358374
if err != nil {
359-
return false, errors.Wrap(err, "resize")
375+
log.Printf("WARN: resize MIG %q got error: %v\n", mig.GetName(), err)
360376
}
361-
return false, waitOp(clusterConn, op)
362-
}
363-
var stop bool
364-
var err error
365-
if stop, err = _resize(); err != nil || stop {
366377
return err
367378
}
368-
369-
ticker := time.NewTicker(3 * time.Second)
370-
for {
371-
select {
372-
case <-ticker.C:
373-
if stop, err = _resize(); err != nil || stop {
374-
return err
375-
}
376-
case <-context.Background().Done():
377-
return nil
378-
}
379+
var eg errgroup.Group
380+
for _, mig := range migs {
381+
mig := mig
382+
eg.Go(func() error { return _resize(mig) })
379383
}
384+
if err = eg.Wait(); err != nil {
385+
return err
386+
}
387+
return nil
380388
}
381389

382390
func (c *Client) newClusterConn() (*container_v1.ClusterManagerClient, error) {
@@ -395,14 +403,6 @@ func (c *Client) newManagedInstanceGroupConn() (*compute_v1.InstanceGroupManager
395403
return compute_v1.NewInstanceGroupManagersRESTClient(context.Background(), opts...)
396404
}
397405

398-
func (c *Client) newInstanceGroupsConn() (*compute_v1.InstanceGroupsClient, error) {
399-
var opts []option.ClientOption
400-
if c.options.Credentials != "" {
401-
opts = append(opts, option.WithCredentialsFile(c.options.Credentials))
402-
}
403-
return compute_v1.NewInstanceGroupsRESTClient(context.Background(), opts...)
404-
}
405-
406406
// getMangedInstanceNames return the url of managed instances by the input MIG
407407
func getMangedInstanceNames(conn *compute_v1.InstanceGroupManagersClient, project string, mig *computepb.InstanceGroupManager) ([]string, error) {
408408
req := &computepb.ListManagedInstancesInstanceGroupManagersRequest{
@@ -425,40 +425,55 @@ func getMangedInstanceNames(conn *compute_v1.InstanceGroupManagersClient, projec
425425
return names, nil
426426
}
427427

428-
func getNodePoolSize(conn *compute_v1.InstanceGroupsClient, project, cluster, pool string, zones []string) int {
429-
ret := make([]int, len(zones))
430-
getSize := func(i int, z string) error {
428+
func getNodePoolSize(conn *compute_v1.InstanceGroupManagersClient, project, cluster, pool string, zones []string) int {
429+
migs, err := findMIGs(conn, project, cluster, pool, zones)
430+
if err != nil {
431+
log.Printf("WARN: get pool size '%s/%s/%s' got error: %v\n", project, cluster, pool, err)
432+
return 0
433+
}
434+
435+
var size int32
436+
for _, mig := range migs {
437+
size += mig.GetTargetSize()
438+
}
439+
return int(size)
440+
}
441+
442+
func findMIGs(conn *compute_v1.InstanceGroupManagersClient, project, cluster, pool string, zones []string) ([]*computepb.InstanceGroupManager, error) {
443+
tmp := make([][]*computepb.InstanceGroupManager, len(zones))
444+
getMigs := func(i int, z string) error {
431445
filterQuery := fmt.Sprintf("name:gke-%s-%s-*", cluster, pool)
432-
req := &computepb.ListInstanceGroupsRequest{
446+
req := &computepb.ListInstanceGroupManagersRequest{
433447
Project: project,
434448
Filter: &filterQuery,
435449
Zone: z,
436450
}
437451
it := conn.List(context.Background(), req)
438452
for {
439453
resp, err := it.Next()
440-
if err == nil || err == iterator.Done {
441-
ret[i] = int(resp.GetSize())
442-
return nil
454+
if err == iterator.Done {
455+
break
443456
}
444457
if err != nil {
445-
log.Printf("WARN: get pool size '%s/%s/%s' got error: %v\n", project, cluster, pool, err)
446-
return nil
458+
return err
447459
}
460+
tmp[i] = append(tmp[i], resp)
448461
}
462+
return nil
449463
}
450464
var eg errgroup.Group
451465
for i, z := range zones {
452466
i, z := i, z
453-
eg.Go(func() error { return getSize(i, z) })
467+
eg.Go(func() error { return getMigs(i, z) })
454468
}
455-
_ = eg.Wait()
456-
457-
var size int
458-
for _, val := range ret {
459-
size += val
469+
if err := eg.Wait(); err != nil {
470+
return nil, err
471+
}
472+
var out []*computepb.InstanceGroupManager
473+
for _, l := range tmp {
474+
out = append(out, l...)
460475
}
461-
return size
476+
return out, nil
462477
}
463478

464479
func waitClusterOperation(conn *container_v1.ClusterManagerClient, cluster *apis.Cluster) error {
@@ -471,7 +486,7 @@ func waitClusterOperation(conn *container_v1.ClusterManagerClient, cluster *apis
471486
if !strings.Contains(op.GetTargetLink(), fmt.Sprintf("clusters/%s", cluster.GetName())) || op.GetStatus() == containerpb.Operation_DONE {
472487
return nil
473488
}
474-
if err := waitOp(conn, op); err != nil {
489+
if err := waitContainerOp(conn, op); err != nil {
475490
return errors.Wrapf(err, "wait operation %q", op.GetName())
476491
}
477492
log.Printf("INFO: handle operation '%s/%s' has been completed\n", cluster.GetName(), op.GetName())
@@ -486,7 +501,7 @@ func waitClusterOperation(conn *container_v1.ClusterManagerClient, cluster *apis
486501
return eg.Wait()
487502
}
488503

489-
func waitOp(conn *container_v1.ClusterManagerClient, op *containerpb.Operation) error {
504+
func waitContainerOp(conn *container_v1.ClusterManagerClient, op *containerpb.Operation) error {
490505
ticker := time.NewTicker(time.Second)
491506
for {
492507
select {

0 commit comments

Comments
 (0)