Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/operator_parameters.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ configuration they are grouped under the `kubernetes` key.
manifest. To keep secrets, set this option to `false`. The default is `true`.

* **enable_persistent_volume_claim_deletion**
By default, the operator deletes PersistentVolumeClaims when removing the
By default, the operator deletes persistent volume claims when removing the
Postgres cluster manifest, no matter if `persistent_volume_claim_retention_policy`
on the statefulset is set to `retain`. To keep PVCs set this option to `false`.
The default is `true`.
Expand Down
8 changes: 7 additions & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ type kubeResources struct {
PatroniConfigMaps map[string]*v1.ConfigMap
Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet
VolumeClaims map[types.UID]*v1.PersistentVolumeClaim
PodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream
//Pods are treated separately
//PVCs are treated separately
}

// Cluster describes postgresql cluster
Expand Down Expand Up @@ -140,6 +140,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
Endpoints: make(map[PostgresRole]*v1.Endpoints),
PatroniEndpoints: make(map[string]*v1.Endpoints),
PatroniConfigMaps: make(map[string]*v1.ConfigMap),
VolumeClaims: make(map[types.UID]*v1.PersistentVolumeClaim),
Streams: make(map[string]*zalandov1.FabricEventStream)},
userSyncStrategy: users.DefaultUserSyncStrategy{
PasswordEncryption: passwordEncryption,
Expand Down Expand Up @@ -363,6 +364,11 @@ func (c *Cluster) Create() (err error) {
c.logger.Infof("pods are ready")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready")

// sync volume may already transition volumes to gp3, if iops/throughput or type is specified
if err = c.syncVolumes(); err != nil {
return err
}

// sync resources created by Patroni
if err = c.syncPatroniResources(); err != nil {
c.logger.Warnf("Patroni resources not yet synced: %v", err)
Expand Down
17 changes: 6 additions & 11 deletions pkg/cluster/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func (c *Cluster) listResources() error {
c.logger.Infof("found logical backup job: %q (uid: %q)", util.NameFromMeta(c.LogicalBackupJob.ObjectMeta), c.LogicalBackupJob.UID)
}

for _, secret := range c.Secrets {
c.logger.Infof("found secret: %q (uid: %q) namespace: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID, secret.ObjectMeta.Namespace)
for uid, secret := range c.Secrets {
c.logger.Infof("found secret: %q (uid: %q) namespace: %s", util.NameFromMeta(secret.ObjectMeta), uid, secret.ObjectMeta.Namespace)
}

for role, service := range c.Services {
Expand Down Expand Up @@ -70,13 +70,8 @@ func (c *Cluster) listResources() error {
c.logger.Infof("found pod: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID)
}

pvcs, err := c.listPersistentVolumeClaims()
if err != nil {
return fmt.Errorf("could not get the list of PVCs: %v", err)
}

for _, obj := range pvcs {
c.logger.Infof("found PVC: %q (uid: %q)", util.NameFromMeta(obj.ObjectMeta), obj.UID)
for uid, pvc := range c.VolumeClaims {
c.logger.Infof("found persistent volume claim: %q (uid: %q)", util.NameFromMeta(pvc.ObjectMeta), uid)
}

for role, poolerObjs := range c.ConnectionPooler {
Expand Down Expand Up @@ -288,10 +283,10 @@ func (c *Cluster) deleteStatefulSet() error {

if c.OpConfig.EnablePersistentVolumeClaimDeletion != nil && *c.OpConfig.EnablePersistentVolumeClaimDeletion {
if err := c.deletePersistentVolumeClaims(); err != nil {
return fmt.Errorf("could not delete PersistentVolumeClaims: %v", err)
return fmt.Errorf("could not delete persistent volume claims: %v", err)
}
} else {
c.logger.Info("not deleting PersistentVolumeClaims because disabled in configuration")
c.logger.Info("not deleting persistent volume claims because disabled in configuration")
}

return nil
Expand Down
56 changes: 36 additions & 20 deletions pkg/cluster/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/constants"
"github.com/zalando/postgres-operator/pkg/util/filesystems"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/volumes"
)

Expand Down Expand Up @@ -185,7 +185,7 @@ func (c *Cluster) syncVolumeClaims() error {

if c.OpConfig.StorageResizeMode == "off" || c.OpConfig.StorageResizeMode == "ebs" {
ignoreResize = true
c.logger.Debugf("Storage resize mode is set to %q. Skipping volume size sync of PVCs.", c.OpConfig.StorageResizeMode)
c.logger.Debugf("Storage resize mode is set to %q. Skipping volume size sync of persistent volume claims.", c.OpConfig.StorageResizeMode)
}

newSize, err := resource.ParseQuantity(c.Spec.Volume.Size)
Expand All @@ -196,9 +196,10 @@ func (c *Cluster) syncVolumeClaims() error {

pvcs, err := c.listPersistentVolumeClaims()
if err != nil {
return fmt.Errorf("could not receive persistent volume claims: %v", err)
return fmt.Errorf("could not list persistent volume claims: %v", err)
}
for _, pvc := range pvcs {
c.VolumeClaims[pvc.UID] = &pvc
needsUpdate := false
currentSize := quantityToGigabyte(pvc.Spec.Resources.Requests[v1.ResourceStorage])
if !ignoreResize && currentSize != manifestSize {
Expand All @@ -213,9 +214,11 @@ func (c *Cluster) syncVolumeClaims() error {

if needsUpdate {
c.logger.Infof("updating persistent volume claim definition for volume %q", pvc.Name)
if _, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil {
updatedPvc, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("could not update persistent volume claim: %q", err)
}
c.VolumeClaims[pvc.UID] = updatedPvc
c.logger.Infof("successfully updated persistent volume claim %q", pvc.Name)
} else {
c.logger.Debugf("volume claim for volume %q do not require updates", pvc.Name)
Expand All @@ -227,10 +230,11 @@ func (c *Cluster) syncVolumeClaims() error {
if err != nil {
return fmt.Errorf("could not form patch for the persistent volume claim for volume %q: %v", pvc.Name, err)
}
_, err = c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Patch(context.TODO(), pvc.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
patchedPvc, err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Patch(context.TODO(), pvc.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("could not patch annotations of the persistent volume claim for volume %q: %v", pvc.Name, err)
}
c.VolumeClaims[pvc.UID] = patchedPvc
}
}

Expand Down Expand Up @@ -268,28 +272,40 @@ func (c *Cluster) listPersistentVolumeClaims() ([]v1.PersistentVolumeClaim, erro

pvcs, err := c.KubeClient.PersistentVolumeClaims(ns).List(context.TODO(), listOptions)
if err != nil {
return nil, fmt.Errorf("could not list of PersistentVolumeClaims: %v", err)
return nil, fmt.Errorf("could not list of persistent volume claims: %v", err)
}
return pvcs.Items, nil
}

func (c *Cluster) deletePersistentVolumeClaims() error {
c.logger.Debug("deleting PVCs")
pvcs, err := c.listPersistentVolumeClaims()
if err != nil {
return err
}
for _, pvc := range pvcs {
c.logger.Debugf("deleting PVC %q", util.NameFromMeta(pvc.ObjectMeta))
if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, c.deleteOptions); err != nil {
c.logger.Warningf("could not delete PersistentVolumeClaim: %v", err)
c.setProcessName("deleting persistent volume claims")
errors := make([]string, 0)
for uid := range c.VolumeClaims {
err := c.deletePersistentVolumeClaim(uid)
if err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
}
if len(pvcs) > 0 {
c.logger.Debug("PVCs have been deleted")
} else {
c.logger.Debug("no PVCs to delete")

if len(errors) > 0 {
c.logger.Warningf("could not delete all persistent volume claims: %v", strings.Join(errors, `', '`))
}

return nil
}

func (c *Cluster) deletePersistentVolumeClaim(uid types.UID) error {
c.setProcessName("deleting persistent volume claim")
pvc := c.VolumeClaims[uid]
c.logger.Debugf("deleting persistent volume claim %q", pvc.Name)
err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, c.deleteOptions)
if k8sutil.ResourceNotFound(err) {
c.logger.Debugf("persistent volume claim %q has already been deleted", pvc.Name)
} else if err != nil {
return fmt.Errorf("could not delete persistent volume claim %q: %v", pvc.Name, err)
}
c.logger.Infof("persistent volume claim %q has been deleted", pvc.Name)
delete(c.VolumeClaims, uid)

return nil
}
Expand All @@ -299,7 +315,7 @@ func (c *Cluster) listPersistentVolumes() ([]*v1.PersistentVolume, error) {

pvcs, err := c.listPersistentVolumeClaims()
if err != nil {
return nil, fmt.Errorf("could not list cluster's PersistentVolumeClaims: %v", err)
return nil, fmt.Errorf("could not list cluster's persistent volume claims: %v", err)
}

pods, err := c.listPods()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestResizeVolumeClaim(t *testing.T) {

// check if listPersistentVolumeClaims returns only the PVCs matching the filter
if len(pvcs) != len(pvcList.Items)-1 {
t.Errorf("%s: could not find all PVCs, got %v, expected %v", testName, len(pvcs), len(pvcList.Items)-1)
t.Errorf("%s: could not find all persistent volume claims, got %v, expected %v", testName, len(pvcs), len(pvcList.Items)-1)
}

// check if PVCs were correctly resized
Expand Down
Loading