Skip to content

Update pauser to handle exception and run unpause as long as possible #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
92ebb3a
Update pauser to handle exception and run unpause as long as possible
kota2and3kan Apr 25, 2025
07d8325
[WIP] Add several unit tests
kota2and3kan Apr 25, 2025
08312b2
[WIP] Add several unit tests
kota2and3kan Apr 28, 2025
34a03fc
Fix minor things
kota2and3kan Apr 28, 2025
25c10f6
Merge remote-tracking branch 'origin/main' into add-error-handling-wh…
kota2and3kan Apr 30, 2025
4d22a34
Update exception handling based on review feedback
kota2and3kan May 1, 2025
42f1a30
Update pauser logic based on feedback and discussion
kota2and3kan May 2, 2025
1d17af9
Update unit tests
kota2and3kan May 7, 2025
f087077
Update dummy object name variable to static constant
kota2and3kan May 7, 2025
d3b54c0
Add exception handling for getTarget() after pause
kota2and3kan May 8, 2025
55ba29c
Apply suggestions from code review
kota2and3kan May 8, 2025
891beb8
Change variable name from backupOk to isPauseOk
kota2and3kan May 8, 2025
c233242
Update name of target status variable and method
kota2and3kan May 8, 2025
744da27
Return PausedDuration from pauseInternal() method
kota2and3kan May 8, 2025
c413386
Move variable definition in catch block
kota2and3kan May 8, 2025
600e7b1
Make the logic a bit simpler
kota2and3kan May 9, 2025
9e9132d
Update error message generation and definition based on feedback
kota2and3kan May 9, 2025
909e596
Add code comments
kota2and3kan May 9, 2025
f9dc445
Implement buildException() based on feedback
kota2and3kan May 13, 2025
5cca44c
Run targetStatusEquals() only if targetAfterPause is not null
kota2and3kan May 14, 2025
639e132
Apply suggestions from code review
kota2and3kan May 15, 2025
b93c054
Fix code comments based on feedback
kota2and3kan May 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies {
testImplementation(platform("org.junit:junit-bom:${junitVersion}"))
testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation "org.mockito:mockito-core:${mockitoVersion}"
testImplementation "org.mockito:mockito-inline:${mockitoVersion}"
}

test {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.scalar.admin.kubernetes;

public class GetTargetAfterPauseFailedException extends PauserException {
public GetTargetAfterPauseFailedException(String message) {
super(message);
}

public GetTargetAfterPauseFailedException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.scalar.admin.kubernetes;

public class PauseFailedException extends PauserException {
public PauseFailedException(String message) {
super(message);
}

public PauseFailedException(String message, Throwable cause) {
super(message, cause);
}
}
214 changes: 182 additions & 32 deletions lib/src/main/java/com/scalar/admin/kubernetes/Pauser.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.scalar.admin.kubernetes;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
import com.scalar.admin.RequestCoordinator;
import io.kubernetes.client.openapi.Configuration;
Expand All @@ -13,8 +14,6 @@
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class implements a pause operation for Scalar product pods in a Kubernetes cluster. The
Expand All @@ -34,11 +33,38 @@
@NotThreadSafe
public class Pauser {

private static final int MAX_UNPAUSE_RETRY_COUNT = 3;
@VisibleForTesting static final int MAX_UNPAUSE_RETRY_COUNT = 3;

private final Logger logger = LoggerFactory.getLogger(Pauser.class);
private final TargetSelector targetSelector;

@VisibleForTesting
static final String UNPAUSE_ERROR_MESSAGE =
"Unpause operation failed. Scalar products might still be in a paused state. You"
+ " must restart related pods by using the `kubectl rollout restart deployment"
+ " <DEPLOYMENT_NAME>` command to unpause all pods.";

@VisibleForTesting
static final String PAUSE_ERROR_MESSAGE =
"Pause operation failed. You cannot use the backup that was taken during this pause"
+ " duration. You need to retry the pause operation from the beginning to"
+ " take a backup.";

@VisibleForTesting
static final String GET_TARGET_AFTER_PAUSE_ERROR_MESSAGE =
"Failed to find the target pods to examine if the targets pods were updated during"
+ " paused.";

@VisibleForTesting
static final String STATUS_CHECK_ERROR_MESSAGE =
"Status check failed. You cannot use the backup that was taken during this pause"
+ " duration. You need to retry the pause operation from the beginning to"
+ " take a backup.";

@VisibleForTesting
static final String STATUS_UNMATCHED_ERROR_MESSAGE =
"The target pods were updated during the pause duration. You cannot use the backup that"
+ " was taken during this pause duration.";

/**
* @param namespace The namespace where the pods are deployed.
* @param helmReleaseName The Helm release name used to deploy the pods.
Expand Down Expand Up @@ -77,67 +103,191 @@ public PausedDuration pause(int pauseDuration, @Nullable Long maxPauseWaitTime)
"pauseDuration is required to be greater than 0 millisecond.");
}

TargetSnapshot target;
// Get pods and deployment information before pause.
TargetSnapshot targetBeforePause;
try {
target = targetSelector.select();
targetBeforePause = getTarget();
} catch (Exception e) {
throw new PauserException("Failed to find the target pods to pause.", e);
}

RequestCoordinator coordinator = getRequestCoordinator(target);

coordinator.pause(true, maxPauseWaitTime);

Instant startTime = Instant.now();
// Get RequestCoordinator of Scalar Admin to pause.
RequestCoordinator requestCoordinator;
try {
requestCoordinator = getRequestCoordinator(targetBeforePause);
} catch (Exception e) {
throw new PauserException("Failed to initialize the request coordinator.", e);
}

Uninterruptibles.sleepUninterruptibly(pauseDuration, TimeUnit.MILLISECONDS);
// From here, we cannot throw exceptions right after they occur because we need to take care of
// the unpause operation failure. We will throw the exception after the unpause operation or at
// the end of this method.

Instant endTime = Instant.now();
// Run a pause operation.
PausedDuration pausedDuration = null;
PauseFailedException pauseFailedException = null;
try {
pausedDuration = pauseInternal(requestCoordinator, pauseDuration, maxPauseWaitTime);
} catch (Exception e) {
pauseFailedException = new PauseFailedException(PAUSE_ERROR_MESSAGE, e);
}

unpauseWithRetry(coordinator, MAX_UNPAUSE_RETRY_COUNT, target);
// Run an unpause operation.
UnpauseFailedException unpauseFailedException = null;
try {
unpauseWithRetry(requestCoordinator, MAX_UNPAUSE_RETRY_COUNT);
} catch (Exception e) {
unpauseFailedException = new UnpauseFailedException(UNPAUSE_ERROR_MESSAGE, e);
}

TargetSnapshot targetAfterPause;
// Get pods and deployment information after pause.
TargetSnapshot targetAfterPause = null;
GetTargetAfterPauseFailedException getTargetAfterPauseFailedException = null;
try {
targetAfterPause = targetSelector.select();
targetAfterPause = getTarget();
} catch (Exception e) {
throw new PauserException(
"Failed to find the target pods to examine if the targets pods were updated during"
+ " paused.",
e);
getTargetAfterPauseFailedException =
new GetTargetAfterPauseFailedException(GET_TARGET_AFTER_PAUSE_ERROR_MESSAGE, e);
}

if (!target.getStatus().equals(targetAfterPause.getStatus())) {
throw new PauserException("The target pods were updated during paused. Please retry.");
// Check if pods and deployment information are the same between before pause and after pause.
StatusCheckFailedException statusCheckFailedException = null;
StatusUnmatchedException statusUnmatchedException = null;
if (targetAfterPause != null) {
try {
statusUnmatchedException = targetStatusEquals(targetBeforePause, targetAfterPause);
} catch (Exception e) {
statusCheckFailedException = new StatusCheckFailedException(STATUS_CHECK_ERROR_MESSAGE, e);
}
}

return new PausedDuration(startTime, endTime);
// We use the exceptions as conditions instead of using boolean flags like `isPauseOk`, etc. If
// we use boolean flags, it might cause a bit large number of combinations. For example, if we
// have three flags, they generate 2^3 = 8 combinations. It also might make the nested if
// statement or a lot of branches of the switch statement. That's why we don't use status flags
// for now.
PauserException pauserException =
buildException(
unpauseFailedException,
pauseFailedException,
getTargetAfterPauseFailedException,
statusCheckFailedException,
statusUnmatchedException);

// Return the final result based on each process.
if (pauserException != null) {
// Some operations failed.
throw pauserException;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] Should we have some log messages when pauseException != null? 👀

Looks like we removed a log in this PR (Sorry if I miss some context here 😓)

          logger.warn(
              "Failed to unpause Scalar product. They are still in paused. You must restart related"
                  + " pods by using the `kubectl rollout restart deployment {}`"
                  + " command to unpase all pods.",
              target.getDeployment().getMetadata().getName());
          return;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to output such a log message here.

I decided to include all messages in the generated exception.

Now, we can show users clearer errors by using several kinds of exceptions (e.g., UnpauseFailedException).

Also, I wanted to see What happened from the log, but now we can see all the messages in the exceptions via the stacktrace.

So, there is no problem if we don't have a log here.

} else {
// All operations succeeded.
return pausedDuration;
}
}

private void unpauseWithRetry(
RequestCoordinator coordinator, int maxRetryCount, TargetSnapshot target) {
@VisibleForTesting
void unpauseWithRetry(RequestCoordinator coordinator, int maxRetryCount) {
int retryCounter = 0;

while (true) {
try {
coordinator.unpause();
return;
} catch (Exception e) {
if (++retryCounter >= maxRetryCount) {
logger.warn(
"Failed to unpause Scalar product. They are still in paused. You must restart related"
+ " pods by using the `kubectl rollout restart deployment {}`"
+ " command to unpase all pods.",
target.getDeployment().getMetadata().getName());
return;
throw e;
}
}
}
}

@VisibleForTesting
TargetSnapshot getTarget() throws PauserException {
return targetSelector.select();
}
Comment on lines +202 to +205
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in another comment, I separated this targetSelector.select() as one method getTarget() for testing.


@VisibleForTesting
RequestCoordinator getRequestCoordinator(TargetSnapshot target) {
return new RequestCoordinator(
target.getPods().stream()
.map(p -> new InetSocketAddress(p.getStatus().getPodIP(), target.getAdminPort()))
.collect(Collectors.toList()));
}

@VisibleForTesting
PausedDuration pauseInternal(
RequestCoordinator requestCoordinator, int pauseDuration, @Nullable Long maxPauseWaitTime) {
requestCoordinator.pause(true, maxPauseWaitTime);
Instant startTime = Instant.now();
Uninterruptibles.sleepUninterruptibly(pauseDuration, TimeUnit.MILLISECONDS);
Instant endTime = Instant.now();
return new PausedDuration(startTime, endTime);
}

@VisibleForTesting
@Nullable
StatusUnmatchedException targetStatusEquals(TargetSnapshot before, TargetSnapshot after) {
if (before.getStatus().equals(after.getStatus())) {
return null;
} else {
return new StatusUnmatchedException(STATUS_UNMATCHED_ERROR_MESSAGE);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] How about adding before and after to the error message just in case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. It might have a lot of information that makes the error message very long, but I will check what is included in before and after, first. And, if the information is not too large, I will add them into the error message.

Thank you for your suggestion!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked what is included in TargetStatus (i.e., before and after). As a result, I think we can add their information to the log, but in the current implementation, I think the information might not be readable a little.

So, I think it would be better to implement TargetStatus.toString() first. However, I think we need to consider something before implementing it.

Therefore, I will keep the current implementation as is in this PR, and I will fix it in another PR in the future.

}
}

@Nullable
@VisibleForTesting
PauserException buildException(
@Nullable UnpauseFailedException unpauseFailedException,
@Nullable PauseFailedException pauseFailedException,
@Nullable GetTargetAfterPauseFailedException getTargetAfterPauseFailedException,
@Nullable StatusCheckFailedException statusCheckFailedException,
@Nullable StatusUnmatchedException statusUnmatchedException) {
PauserException pauserException = null;

// Treat the unpause failure as most critical because it might cause system unavailability.
if (unpauseFailedException != null) {
pauserException = unpauseFailedException;
}

// Treat the pause failure as the second priority because the issue might be caused by a
// configuration mistake.
if (pauseFailedException != null) {
if (pauserException == null) {
pauserException = pauseFailedException;
} else {
pauserException.addSuppressed(pauseFailedException);
}
}

// Treat the getting target failure as the third priority because this issue might be caused by
// a temporary glitch, for example, network failures.
if (getTargetAfterPauseFailedException != null) {
if (pauserException == null) {
pauserException = getTargetAfterPauseFailedException;
} else {
pauserException.addSuppressed(getTargetAfterPauseFailedException);
}
}

// Treat the status checking failure as the third priority because this issue might be caused by
// a temporary glitch, for example, getting target information by using the Kubernetes API fails
// after the pause operation.
if (statusCheckFailedException != null) {
if (pauserException == null) {
pauserException = statusCheckFailedException;
} else {
pauserException.addSuppressed(statusCheckFailedException);
}
}

// Treat the status unmatched issue as the third priority because this issue might be caused by
// temporary glitch, for example, pod restarts.
if (statusUnmatchedException != null) {
if (pauserException == null) {
pauserException = statusUnmatchedException;
} else {
pauserException.addSuppressed(statusUnmatchedException);
}
}

return pauserException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.scalar.admin.kubernetes;

public class StatusCheckFailedException extends PauserException {
public StatusCheckFailedException(String message) {
super(message);
}

public StatusCheckFailedException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.scalar.admin.kubernetes;

public class StatusUnmatchedException extends PauserException {
public StatusUnmatchedException(String message) {
super(message);
}

public StatusUnmatchedException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.scalar.admin.kubernetes;

public class UnpauseFailedException extends PauserException {
public UnpauseFailedException(String message) {
super(message);
}

public UnpauseFailedException(String message, Throwable cause) {
super(message, cause);
}
}
Loading