Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/operator_parameters.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ configuration they are grouped under the `kubernetes` key.
manifest. To keep secrets, set this option to `false`. The default is `true`.

* **enable_persistent_volume_claim_deletion**
By default, the operator deletes PersistentVolumeClaims when removing the
By default, the operator deletes persistent volume claims when removing the
Postgres cluster manifest, no matter if `persistent_volume_claim_retention_policy`
on the statefulset is set to `retain`. To keep PVCs set this option to `false`.
The default is `true`.
Expand Down
8 changes: 7 additions & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ type kubeResources struct {
PatroniConfigMaps map[string]*v1.ConfigMap
Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet
VolumeClaims map[types.UID]*v1.PersistentVolumeClaim
PodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream
//Pods are treated separately
//PVCs are treated separately
}

// Cluster describes postgresql cluster
Expand Down Expand Up @@ -140,6 +140,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
Endpoints: make(map[PostgresRole]*v1.Endpoints),
PatroniEndpoints: make(map[string]*v1.Endpoints),
PatroniConfigMaps: make(map[string]*v1.ConfigMap),
VolumeClaims: make(map[types.UID]*v1.PersistentVolumeClaim),
Streams: make(map[string]*zalandov1.FabricEventStream)},
userSyncStrategy: users.DefaultUserSyncStrategy{
PasswordEncryption: passwordEncryption,
Expand Down Expand Up @@ -363,6 +364,11 @@ func (c *Cluster) Create() (err error) {
c.logger.Infof("pods are ready")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready")

// sync volume may already transition volumes to gp3, if iops/throughput or type is specified
if err = c.syncVolumes(); err != nil {
return err
}

// sync resources created by Patroni
if err = c.syncPatroniResources(); err != nil {
c.logger.Warnf("Patroni resources not yet synced: %v", err)
Expand Down
17 changes: 6 additions & 11 deletions pkg/cluster/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func (c *Cluster) listResources() error {
c.logger.Infof("found logical backup job: %q (uid: %q)", util.NameFromMeta(c.LogicalBackupJob.ObjectMeta), c.LogicalBackupJob.UID)
}

for _, secret := range c.Secrets {
c.logger.Infof("found secret: %q (uid: %q) namespace: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID, secret.ObjectMeta.Namespace)
for uid, secret := range c.Secrets {
c.logger.Infof("found secret: %q (uid: %q) namespace: %s", util.NameFromMeta(secret.ObjectMeta), uid, secret.ObjectMeta.Namespace)
}

for role, service := range c.Services {
Expand Down Expand Up @@ -70,13 +70,8 @@ func (c *Cluster) listResources() error {
c.logger.Infof("found pod: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID)
}

pvcs, err := c.listPersistentVolumeClaims()
if err != nil {
return fmt.Errorf("could not get the list of PVCs: %v", err)
}

for _, obj := range pvcs {
c.logger.Infof("found PVC: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID)
for uid, pvc := range c.VolumeClaims {
c.logger.Infof("found persistent volume claim: %q (uid: %q)", util.NameFromMeta(pvc.ObjectMeta), uid)
}

for role, poolerObjs := range c.ConnectionPooler {
Expand Down Expand Up @@ -288,10 +283,10 @@ func (c *Cluster) deleteStatefulSet() error {

if c.OpConfig.EnablePersistentVolumeClaimDeletion != nil && *c.OpConfig.EnablePersistentVolumeClaimDeletion {
if err := c.deletePersistentVolumeClaims(); err != nil {
return fmt.Errorf("could not delete PersistentVolumeClaims: %v", err)
return fmt.Errorf("could not delete persistent volume claims: %v", err)
}
} else {
c.logger.Info("not deleting PersistentVolumeClaims because disabled in configuration")
c.logger.Info("not deleting persistent volume claims because disabled in configuration")
}

return nil
Expand Down
56 changes: 36 additions & 20 deletions pkg/cluster/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/constants"
"github.com/zalando/postgres-operator/pkg/util/filesystems"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/volumes"
)

Expand Down Expand Up @@ -185,7 +185,7 @@ func (c *Cluster) syncVolumeClaims() error {

if c.OpConfig.StorageResizeMode == "off" || c.OpConfig.StorageResizeMode == "ebs" {
ignoreResize = true
c.logger.Debugf("Storage resize mode is set to %q. Skipping volume size sync of PVCs.", c.OpConfig.StorageResizeMode)
c.logger.Debugf("Storage resize mode is set to %q. Skipping volume size sync of persistent volume claims.", c.OpConfig.StorageResizeMode)
}

newSize, err := resource.ParseQuantity(c.Spec.Volume.Size)
Expand All @@ -196,9 +196,10 @@ func (c *Cluster) syncVolumeClaims() error {

pvcs, err := c.listPersistentVolumeClaims()
if err != nil {
return fmt.Errorf("could not receive persistent volume claims: %v", err)
return fmt.Errorf("could not list persistent volume claims: %v", err)
}
for _, pvc := range pvcs {
c.VolumeClaims[pvc.UID] = &pvc
needsUpdate := false
currentSize := quantityToGigabyte(pvc.Spec.Resources.Requests[v1.ResourceStorage])
if !ignoreResize && currentSize != manifestSize {
Expand All @@ -213,9 +214,11 @@ func (c *Cluster) syncVolumeClaims() error {

if needsUpdate {
c.logger.Infof("updating persistent volume claim definition for volume %q", pvc.Name)
if _, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil {
updatedPvc, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("could not update persistent volume claim: %q", err)
}
c.VolumeClaims[pvc.UID] = updatedPvc
c.logger.Infof("successfully updated persistent volume claim %q", pvc.Name)
} else {
c.logger.Debugf("volume claim for volume %q do not require updates", pvc.Name)
Expand All @@ -227,10 +230,11 @@ func (c *Cluster) syncVolumeClaims() error {
if err != nil {
return fmt.Errorf("could not form patch for the persistent volume claim for volume %q: %v", pvc.Name, err)
}
_, err = c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Patch(context.TODO(), pvc.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
patchedPvc, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Patch(context.TODO(), pvc.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("could not patch annotations of the persistent volume claim for volume %q: %v", pvc.Name, err)
}
c.VolumeClaims[pvc.UID] = patchedPvc
}
}

Expand Down Expand Up @@ -268,28 +272,40 @@ func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, erro

pvcs, err := c.KubeClient.PersistentVolumeClaims(ns).List(context.TODO(), listOptions)
if err != nil {
return nil, fmt.Errorf("could not list of PersistentVolumeClaims: %v", err)
return nil, fmt.Errorf("could not list of persistent volume claims: %v", err)
}
return pvcs.Items, nil
}

func (c *Cluster) deletePersistentVolumeClaims() error {
c.logger.Debug("deleting PVCs")
pvcs, err := c.listPersistentVolumeClaims()
if err != nil {
return err
}
for _, pvc := range pvcs {
c.logger.Debugf("deleting PVC %q", util.NameFromMeta(pvc.ObjectMeta))
if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, c.deleteOptions); err != nil {
c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err)
c.setProcessName("deleting persistent volume claims")
errors := make([]string, 0)
for uid := range c.VolumeClaims {
err := c.deletePersistentVolumeClaim(uid)
if err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
}
if len(pvcs) > 0 {
c.logger.Debug("PVCs have been deleted")
} else {
c.logger.Debug("no PVCs to delete")

if len(errors) > 0 {
c.logger.Warningf("could not delete all persistent volume claims: %v", strings.Join(errors, `', '`))
}

return nil
}

func (c *Cluster) deletePersistentVolumeClaim(uid types.UID) error {
c.setProcessName("deleting persistent volume claim")
pvc := c.VolumeClaims[uid]
c.logger.Debugf("deleting secret %q", pvc.Name)
err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, c.deleteOptions)
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("persistent volume claim %q has already been deleted", pvc.Name)
} else if err != nil {
return fmt.Errorf("could not delete persistent volume claim %q: %v", pvc.Name, err)
}
c.logger.Infof("persistent volume claim %q has been deleted", pvc.Name)
delete(c.VolumeClaims, uid)

return nil
}
Expand All @@ -299,7 +315,7 @@ func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) {

pvcs, err := c.listPersistentVolumeClaims()
if err != nil {
return nil, fmt.Errorf("could not list cluster's PersistentVolumeClaims: %v", err)
return nil, fmt.Errorf("could not list cluster's persistent volume claims: %v", err)
}

pods, err := c.listPods()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestResizeVolumeClaim(t *testing.T) {

// check if listPersistentVolumeClaims returns only the PVCs matching the filter
if len(pvcs) != len(pvcList.Items)-1 {
t.Errorf("%s: could not find all PVCs, got %v, expected %v", testName, len(pvcs), len(pvcList.Items)-1)
t.Errorf("%s: could not find all persistent volume claims, got %v, expected %v", testName, len(pvcs), len(pvcList.Items)-1)
}

// check if PVCs were correctly resized
Expand Down
Loading