diff --git a/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java b/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java index a9ae47cec..3a7eadefb 100644 --- a/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java +++ b/coordination/src/main/java/tech/ydb/coordination/impl/SessionImpl.java @@ -162,7 +162,7 @@ private CompletableFuture> connectToSession(Stream stream, long ses } }, executor); - // and send session start message with id of previos session (or zero if it's first connect) + // and send session start message with id of previous session (or zero if it's first connect) return stream.sendSessionStart(sessionID, nodePath, connectTimeout, protectionKey); } diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/election/ElectionParticipant.java b/coordination/src/main/java/tech/ydb/coordination/recipes/election/ElectionParticipant.java new file mode 100644 index 000000000..b05c48616 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/election/ElectionParticipant.java @@ -0,0 +1,55 @@ +package tech.ydb.coordination.recipes.election; + +import java.util.Arrays; +import java.util.Objects; + +public class ElectionParticipant { + private final long sessionId; + private final byte[] data; + private final boolean isLeader; + + public ElectionParticipant(long id, byte[] data, boolean isLeader) { + this.sessionId = id; + this.data = data; + this.isLeader = isLeader; + } + + public long getSessionId() { + return sessionId; + } + + public byte[] getData() { + return data; + } + + public boolean isLeader() { + return isLeader; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ElectionParticipant that = (ElectionParticipant) o; + return sessionId == that.sessionId && isLeader == that.isLeader && + Objects.deepEquals(data, that.data); + } + + @Override + public int hashCode() { + return Objects.hash(sessionId, Arrays.hashCode(data), isLeader); + } + + @Override + public String toString() { + return "ElectionParticipant{" + + "sessionId=" + sessionId + + ", data=" + Arrays.toString(data) + + ", isLeader=" + isLeader + + '}'; + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/election/LeaderElection.java b/coordination/src/main/java/tech/ydb/coordination/recipes/election/LeaderElection.java new file mode 100644 index 000000000..a427e1c12 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/election/LeaderElection.java @@ -0,0 +1,473 @@ +package tech.ydb.coordination.recipes.election; + +import java.io.Closeable; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.common.retry.RetryPolicy; +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.coordination.description.SemaphoreDescription; +import tech.ydb.coordination.recipes.locks.LockInternals; +import tech.ydb.coordination.recipes.util.Listenable; +import tech.ydb.coordination.recipes.util.ListenableContainer; +import tech.ydb.coordination.recipes.util.RetryableTask; +import tech.ydb.coordination.recipes.util.SemaphoreObserver; +import tech.ydb.coordination.recipes.util.SessionListenableProvider; +import tech.ydb.coordination.settings.DescribeSemaphoreMode; +import tech.ydb.coordination.settings.WatchSemaphoreMode; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; + +/** + * A distributed leader election implementation using coordination services. + * This class provides a mechanism for multiple instances to compete for leadership + * of a named resource, with exactly one instance becoming the leader at any time. + * + *

The election process uses a semaphore-based approach where: + *

+ * + *

Thread safety: This class is thread-safe. All public methods can be called + * from multiple threads concurrently. + */ +public class LeaderElection implements Closeable, SessionListenableProvider { + private static final Logger logger = LoggerFactory.getLogger(LeaderElection.class); + private static final long MAX_LEASE = 1L; + + private final LeaderElectionListener leaderElectionListener; + private final String coordinationNodePath; + private final String electionName; + private final byte[] data; + private final RetryPolicy retryPolicy; + + private final ScheduledExecutorService scheduledExecutor; + private final ExecutorService blockingExecutor; + private final CoordinationSession coordinationSession; + private final ListenableContainer sessionListenable; + private final LockInternals lock; + private final SemaphoreObserver semaphoreObserver; + + private final CountDownLatch startingLatch = new CountDownLatch(1); + private final AtomicReference state = new AtomicReference<>(State.INITIAL); + private final AtomicReference> initializingTask = new AtomicReference<>(null); + private Future electionTask = null; + private volatile boolean autoRequeue = false; + private volatile boolean isLeader = false; + + /** + * Internal state + */ + private enum State { + INITIAL, + STARTING, + STARTED, + FAILED, + CLOSED + } + + /** + * Creates a new LeaderElection instance with default settings. + * + * @param client the coordination client to use + * @param coordinationNodePath path to the coordination node + * @param electionName name of the election (must be unique per coordination node) + * @param data optional data to associate with the leader (visible to all participants) + * @param leaderElectionListener callback for leadership events + */ + public LeaderElection( + CoordinationClient client, + String coordinationNodePath, + String electionName, + byte[] data, + LeaderElectionListener leaderElectionListener + ) { + this( + client, + coordinationNodePath, + electionName, + data, + leaderElectionListener, + LeaderElectionSettings.newBuilder() + .build() + ); + } + + /** + * Creates a new LeaderElection instance with custom settings. + * + * @param client the coordination client to use + * @param coordinationNodePath path to the coordination node + * @param electionName name of the election (must be unique per coordination node) + * @param data optional data to associate with the leader (visible to all participants) + * @param leaderElectionListener callback for leadership events + * @param settings configuration settings for the election process + * @throws NullPointerException if any required parameter is null + */ + public LeaderElection( + CoordinationClient client, + String coordinationNodePath, + String electionName, + byte[] data, + LeaderElectionListener leaderElectionListener, + LeaderElectionSettings settings + ) { + Preconditions.checkNotNull(client, "CoordinationClient cannot be null"); + Preconditions.checkNotNull(coordinationNodePath, "Coordination node path cannot be null"); + Preconditions.checkNotNull(electionName, "Election name cannot be null"); + Preconditions.checkNotNull(leaderElectionListener, "LeaderElectionListener cannot be null"); + Preconditions.checkNotNull(settings, "LeaderElectionSettings cannot be null"); + + this.coordinationNodePath = coordinationNodePath; + this.electionName = electionName; + this.data = data; + this.leaderElectionListener = leaderElectionListener; + this.scheduledExecutor = settings.getScheduledExecutor(); + this.blockingExecutor = Executors.newSingleThreadExecutor(); + this.retryPolicy = settings.getRetryPolicy(); + + this.coordinationSession = client.createSession(coordinationNodePath); + this.sessionListenable = new ListenableContainer<>(); + coordinationSession.addStateListener(sessionState -> { + if (!state.get().equals(State.CLOSED) && (sessionState == CoordinationSession.State.LOST || + sessionState == CoordinationSession.State.CLOSED)) { + logger.error("Coordination session unexpectedly changed to {} state, marking election as FAILED", + sessionState); + stopInternal(State.FAILED); + } + sessionListenable.notifyListeners(sessionState); + }); + this.lock = new LockInternals( + coordinationSession, + electionName, + MAX_LEASE + ); + this.semaphoreObserver = new SemaphoreObserver( + coordinationSession, + electionName, + WatchSemaphoreMode.WATCH_OWNERS, + DescribeSemaphoreMode.WITH_OWNERS_AND_WAITERS, + settings.getRetryPolicy(), + settings.getScheduledExecutor() + ); + } + + /** + * Starts the leader election process. + * + * @throws IllegalStateException if the election is already started or closed + */ + public void start() { + Preconditions.checkState( + state.compareAndSet(State.INITIAL, State.STARTING), + "Leader election may be started only once" + ); + + logger.debug("Starting leader election '{}' initialization", electionName); + + CompletableFuture connectionTask = executeWithRetry(coordinationSession::connect); + CompletableFuture semaphoreCreateTask = executeWithRetry( + () -> coordinationSession.createSemaphore(electionName, MAX_LEASE) + .thenCompose(status -> { + if (status.getCode() == StatusCode.ALREADY_EXISTS) { + return CompletableFuture.completedFuture(Status.SUCCESS); + } + return CompletableFuture.completedFuture(status); + }) + ); + + CompletableFuture initializingRetriedTask = connectionTask + .thenCompose(connectionStatus -> { + connectionStatus.expectSuccess("Unable to establish session"); + return semaphoreCreateTask; + }) + .thenApply(semaphoreStatus -> { + if (semaphoreStatus.isSuccess()) { + state.set(State.STARTED); + semaphoreObserver.start(); + startingLatch.countDown(); + } + semaphoreStatus.expectSuccess("Unable to create semaphore"); + return semaphoreStatus; + }).exceptionally(ex -> { + logger.error("Leader election initializing task failed", ex); + stopInternal(State.FAILED); + return Status.of(StatusCode.CLIENT_INTERNAL_ERROR); + }); + + initializingTask.set(initializingRetriedTask); + + if (autoRequeue) { + enqueueElection(); + } + } + + private CompletableFuture executeWithRetry(Supplier> taskSupplier) { + return new RetryableTask("leaderElectionInitialize", taskSupplier, scheduledExecutor, retryPolicy).execute(); + } + + /** + * Enables automatic requeueing when leadership is lost. + * If called before start election will be started immediately. + */ + public void autoRequeue() { + autoRequeue = true; + } + + /** + * Checks if this instance is currently the leader. + * + * @return true if this instance is the leader, false otherwise + */ + public boolean isLeader() { + return isLeader; + } + + /** + * Re-queue an attempt for leadership. If this instance is already queued, nothing + * happens and false is returned. If the instance was not queued, it is re-queued and true + * is returned. + * + * @return true if reenqueue was successful + * @throws IllegalStateException if the election is not in STARTED or STARTING state + */ + public boolean requeue() { + State localState = state.get(); + Preconditions.checkState( + localState == State.STARTED || localState == State.STARTING, + "Unexpected state: " + localState.name() + ); + + return enqueueElection(); + } + + /** + * Interrupts the current leadership attempt if one is in progress. + * + * @return true if leadership was interrupted, false if no attempt was in progress + */ + public synchronized boolean interruptLeadership() { + Future localTask = electionTask; + if (localTask != null) { + localTask.cancel(true); + electionTask = null; + return true; + } + return false; + } + + private synchronized boolean enqueueElection() { + State localState = state.get(); + if (!isQueued() && (localState == State.STARTED || localState == State.STARTING)) { + electionTask = blockingExecutor.submit(new Callable() { + @Override + public Void call() throws Exception { + try { + doWork(); + } finally { + finishTask(); + } + return null; + } + }); + return true; + } + + return false; + } + + /** + * Main work loop for leadership acquisition and maintenance. + * + * @throws Exception if the leadership attempt fails + */ + private void doWork() throws Exception { + isLeader = false; + + try { + waitStartedStateOrFail(); + lock.tryAcquire( + null, + true, + data + ); + isLeader = true; + try { + leaderElectionListener.takeLeadership(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; + } catch (Throwable e) { + logger.error("Unexpected error in takeLeadership", e); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; + } finally { + if (isLeader) { + isLeader = false; + boolean wasInterrupted = Thread.interrupted(); + try { + lock.release(); + } catch (Exception e) { + logger.error("Lock release exception for: " + coordinationNodePath); + } finally { + if (wasInterrupted) { + Thread.currentThread().interrupt(); + } + } + } + } + } + + private void waitStartedStateOrFail() throws InterruptedException { + State localState = state.get(); + if (localState == State.STARTING) { + startingLatch.await(); + localState = state.get(); + } + + if (localState != State.STARTED) { + throw new IllegalStateException("Unexpected state: " + localState.name()); + } + } + + private synchronized void finishTask() { + electionTask = null; + State localState = state.get(); + if (autoRequeue && localState != State.CLOSED && localState != State.FAILED) { + enqueueElection(); + } + } + + private boolean isQueued() { + return electionTask != null; + } + + /** + * Gets all participants in the election. + * Note: Due to observer limitations, waiters may be visible only eventually (after lease changes). + * + * @return list of election participants (owners and visible waiters) + */ + public List getParticipants() { + SemaphoreDescription semaphoreDescription = semaphoreObserver.getCachedData(); + if (semaphoreDescription == null) { + return Collections.emptyList(); + } + + return Stream.concat( + semaphoreDescription.getOwnersList().stream() + .map(session -> mapParticipant(session, true)), + semaphoreDescription.getWaitersList().stream() + .map(session -> mapParticipant(session, false)) + ).collect(Collectors.toList()); + } + + /** + * Gets the current leader if one exists. + * + * @return Optional containing the current leader, or empty if no leader exists + */ + public Optional getCurrentLeader() { + SemaphoreDescription semaphoreDescription = semaphoreObserver.getCachedData(); + if (semaphoreDescription == null) { + return Optional.empty(); + } + + return semaphoreDescription.getOwnersList().stream().findFirst() + .map(session -> mapParticipant(session, true)); + } + + private static ElectionParticipant mapParticipant(SemaphoreDescription.Session session, boolean owner) { + return new ElectionParticipant( + session.getId(), + session.getData(), + owner + ); + } + + @Override + public Listenable getSessionListenable() { + return sessionListenable; + } + + /** + * Closes the leader election and releases all resources. + * After closing, the instance cannot be reused. + */ + @Override + public synchronized void close() { + stopInternal(State.CLOSED); + } + + /** + * Internal method to stop the election with the specified termination state. + * + * @param terminationState the state to transition to (FAILED or CLOSED) + * @return true if the state was changed, false if already terminated + */ + private synchronized boolean stopInternal(State terminationState) { + State localState = state.get(); + if (localState == State.FAILED || localState == State.CLOSED) { + logger.warn("Already stopped leader election {} with status: {}", electionName, localState); + return false; + } + logger.debug("Transitioning leader election {} from {} to {}", electionName, localState, terminationState); + + // change state + state.set(terminationState); + + // unblock starting latch if not yet + startingLatch.countDown(); + + // stop tasks + Future localInitializingTask = initializingTask.get(); + if (localInitializingTask != null) { + localInitializingTask.cancel(true); + initializingTask.set(null); + } + Future localTask = electionTask; + if (localTask != null) { + localTask.cancel(true); + electionTask = null; + } + + // Clean up resources + try { + semaphoreObserver.close(); + } catch (Exception e) { + logger.warn("Error closing semaphore observer for {}: {}", electionName, e.getMessage()); + } + try { + blockingExecutor.shutdown(); + } catch (Exception e) { + logger.warn("Error shutting down executor for {}: {}", electionName, e.getMessage()); + } + try { + coordinationSession.close(); + } catch (Exception e) { + logger.warn("Error closing session for {}: {}", electionName, e.getMessage()); + } + return true; + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/election/LeaderElectionListener.java b/coordination/src/main/java/tech/ydb/coordination/recipes/election/LeaderElectionListener.java new file mode 100644 index 000000000..20210c2b7 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/election/LeaderElectionListener.java @@ -0,0 +1,66 @@ +package tech.ydb.coordination.recipes.election; + +/** + * A listener interface for receiving leadership election events in a distributed system. + * + *

Implementations of this interface are notified when the current process becomes + * the leader in a leader election scenario.

+ * + *

Leadership Lifecycle:

+ *
    + *
  1. Election: The distributed system selects a leader
  2. + *
  3. Takeover: {@code takeLeadership()} is invoked on the elected leader
  4. + *
  5. Execution: The leader performs its duties while maintaining leadership
  6. + *
  7. Termination: When {@code takeLeadership()} completes (either normally or exceptionally), + * the leadership is automatically relinquished and new elections begin
  8. + *
+ * + *

Usage Example:

+ *
{@code
+ * LeaderElectionListener listener = new LeaderElectionListener() {
+ *     public void takeLeadership() throws Exception {
+ *         startServices();
+ *
+ *         // Main leadership work
+ *         while (shouldContinueLeadership()) {
+ *             performLeaderDuties();
+ *         }
+ *
+ *         // Cleanup will trigger automatically when method exits
+ *     }
+ * };
+ * }
+ * + *

Important Implementation Notes:

+ * + * + *

Error Handling: If the implementation throws an exception, the leadership will be + * released and new elections will be triggered, just as with normal completion.

+ */ +public interface LeaderElectionListener { + /** + * Called when leadership is acquired by the current process. + * + *

The leadership period lasts exactly as long as this method's execution. When the method + * returns (either normally or exceptionally), the leadership is automatically relinquished + * and new elections begin immediately. + * + *

For continuous leadership, implementations should: + *

+ * + * @throws Exception if leadership cannot be maintained or should be terminated early. + * The leadership will be released and new elections will begin when any + * exception is thrown. + */ + void takeLeadership() throws Exception; +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/election/LeaderElectionSettings.java b/coordination/src/main/java/tech/ydb/coordination/recipes/election/LeaderElectionSettings.java new file mode 100644 index 000000000..d4494a320 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/election/LeaderElectionSettings.java @@ -0,0 +1,55 @@ +package tech.ydb.coordination.recipes.election; + +import java.time.Duration; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import tech.ydb.common.retry.RetryPolicy; +import tech.ydb.common.retry.RetryUntilElapsed; + +public class LeaderElectionSettings { + public static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(5); + public static final RetryUntilElapsed DEFAULT_RETRY_POLICY = new RetryUntilElapsed( + DEFAULT_CONNECT_TIMEOUT.toMillis(), 250, 5 + ); + + private final ScheduledExecutorService scheduledExecutor; + private final RetryPolicy retryPolicy; + + public LeaderElectionSettings(Builder builder) { + this.scheduledExecutor = builder.scheduledExecutor != null ? builder.scheduledExecutor : + Executors.newSingleThreadScheduledExecutor(); + this.retryPolicy = builder.retryPolicy; + } + + public ScheduledExecutorService getScheduledExecutor() { + return scheduledExecutor; + } + + public RetryPolicy getRetryPolicy() { + return retryPolicy; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private ScheduledExecutorService scheduledExecutor; + private RetryPolicy retryPolicy = DEFAULT_RETRY_POLICY; + + public Builder withScheduledExecutor(ScheduledExecutorService executorService) { + this.scheduledExecutor = executorService; + return this; + } + + public Builder withRetryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + return this; + } + + public LeaderElectionSettings build() { + return new LeaderElectionSettings(this); + } + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/group/GroupMember.java b/coordination/src/main/java/tech/ydb/coordination/recipes/group/GroupMember.java new file mode 100644 index 000000000..9a7cbbd93 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/group/GroupMember.java @@ -0,0 +1,47 @@ +package tech.ydb.coordination.recipes.group; + +import java.util.Arrays; +import java.util.Objects; + +public class GroupMember { + private final long sessionId; + private final byte[] data; + + public GroupMember(long sessionId, byte[] data) { + this.sessionId = sessionId; + this.data = data; + } + + public long getSessionId() { + return sessionId; + } + + public byte[] getData() { + return data; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GroupMember that = (GroupMember) o; + return sessionId == that.sessionId && Objects.deepEquals(data, that.data); + } + + @Override + public int hashCode() { + return Objects.hash(sessionId, Arrays.hashCode(data)); + } + + @Override + public String toString() { + return "GroupMember{" + + "sessionId=" + sessionId + + ", data=" + Arrays.toString(data) + + '}'; + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/group/GroupMembership.java b/coordination/src/main/java/tech/ydb/coordination/recipes/group/GroupMembership.java new file mode 100644 index 000000000..8b38876d3 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/group/GroupMembership.java @@ -0,0 +1,412 @@ +package tech.ydb.coordination.recipes.group; + +import java.io.Closeable; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import com.google.common.base.Preconditions; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.common.retry.RetryPolicy; +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.coordination.description.SemaphoreDescription; +import tech.ydb.coordination.recipes.util.Listenable; +import tech.ydb.coordination.recipes.util.ListenableContainer; +import tech.ydb.coordination.recipes.util.RetryableTask; +import tech.ydb.coordination.recipes.util.SemaphoreObserver; +import tech.ydb.coordination.recipes.util.SessionListenableProvider; +import tech.ydb.coordination.settings.CoordinationSessionSettings; +import tech.ydb.coordination.settings.DescribeSemaphoreMode; +import tech.ydb.coordination.settings.WatchSemaphoreMode; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; + +/** + * A distributed group membership implementation that uses coordination service + * to manage membership and track group members. + * + *

This class provides mechanisms to: + *

+ * + *

The implementation uses a semaphore with watch capabilities to track membership. + */ +public class GroupMembership implements Closeable, SessionListenableProvider { + private static final Logger logger = LoggerFactory.getLogger(GroupMembership.class); + private static final long MAX_GROUP_SIZE = Long.MAX_VALUE; + private static final Duration ACQUIRE_TIMEOUT = Duration.ofSeconds(30); + + private final String groupName; + private final byte[] data; + private final RetryPolicy retryPolicy; + private final ScheduledExecutorService scheduledExecutor; + + private final CoordinationSession coordinationSession; + private final SemaphoreObserver semaphoreObserver; + private final ListenableContainer sessionListenable; + private final ListenableContainer> groupMembersListenable; + + private final AtomicReference state = new AtomicReference<>(State.INITIAL); + private final AtomicReference> initializingTask = new AtomicReference<>(null); + private Future acquireTask = null; + + /** + * Internal state + */ + private enum State { + /** Initial state before starting */ + INITIAL, + /** When start() has been called but initialization isn't complete */ + STARTING, + /** Fully operational state */ + STARTED, + /** Failed terminated state */ + FAILED, + /** Closed terminated state */ + CLOSED + } + + /** + * Creates a new GroupMembership with default settings. + * + * @param coordinationClient the coordination service client + * @param coordinationNodePath path to the coordination node + * @param groupName name of the group to join + * @throws IllegalArgumentException if any argument is invalid + */ + public GroupMembership( + CoordinationClient coordinationClient, + String coordinationNodePath, + String groupName, + byte[] data + ) { + this( + coordinationClient, + coordinationNodePath, + groupName, + data, + GroupMembershipSettings.newBuilder() + .build() + ); + } + + /** + * Creates a new GroupMembership with custom settings. + * + * @param coordinationClient the coordination service client + * @param coordinationNodePath path to the coordination node + * @param groupName name of the group to join + * @param settings configuration settings + * @throws IllegalArgumentException if any argument is invalid + * @throws NullPointerException if any required argument is null + */ + public GroupMembership( + CoordinationClient coordinationClient, + String coordinationNodePath, + String groupName, + byte[] data, + GroupMembershipSettings settings + ) { + validateConstructorArgs(coordinationClient, coordinationNodePath, groupName, settings); + + this.groupName = groupName; + this.data = data; + this.retryPolicy = settings.getRetryPolicy(); + this.scheduledExecutor = settings.getScheduledExecutor(); + + this.coordinationSession = coordinationClient.createSession( + coordinationNodePath, + CoordinationSessionSettings.newBuilder() + .withRetryPolicy(retryPolicy) + .build() + ); + this.sessionListenable = new ListenableContainer<>(); + coordinationSession.addStateListener(sessionState -> { + if (!state.get().equals(State.CLOSED) && (sessionState == CoordinationSession.State.LOST || + sessionState == CoordinationSession.State.CLOSED)) { + logger.error("Coordination session unexpectedly changed to {} state, group membership went FAILED", + sessionState); + stopInternal(State.FAILED); + } + if (sessionState == CoordinationSession.State.RECONNECTED) { + reconnect(); + } + sessionListenable.notifyListeners(sessionState); + }); + + this.semaphoreObserver = new SemaphoreObserver( + coordinationSession, + groupName, + WatchSemaphoreMode.WATCH_OWNERS, + DescribeSemaphoreMode.WITH_OWNERS, + settings.getRetryPolicy(), + settings.getScheduledExecutor() + ); + this.groupMembersListenable = new ListenableContainer<>(); + semaphoreObserver.getWatchDataListenable().addListener(description -> { + List groupMembers = mapSemaphoreDescriptionToMembersList(description); + groupMembersListenable.notifyListeners(groupMembers); + }); + } + + + private void validateConstructorArgs( + CoordinationClient coordinationClient, + String coordinationNodePath, + String groupName, + GroupMembershipSettings settings + ) { + Objects.requireNonNull(coordinationClient, "CoordinationClient cannot be null"); + Objects.requireNonNull(coordinationNodePath, "Coordination node path cannot be null"); + Objects.requireNonNull(groupName, "Group name cannot be null"); + Objects.requireNonNull(settings, "Settings cannot be null"); + Objects.requireNonNull(settings.getRetryPolicy(), "Retry policy cannot be null"); + Objects.requireNonNull(settings.getScheduledExecutor(), "Scheduled executor cannot be null"); + + if (groupName.isEmpty()) { + throw new IllegalArgumentException("Group name cannot be empty"); + } + if (coordinationNodePath.isEmpty()) { + throw new IllegalArgumentException("Coordination node path cannot be empty"); + } + } + + private void reconnect() { + logger.info("Attempting to reconnect group membership for group '{}'", groupName); + tryEnqueueAcquire(); + } + + /** + * Starts the group membership service. + * + *

This begins the process of joining the group and starts tracking membership. + * + * @throws IllegalStateException if already started + */ + public void start() { + Preconditions.checkState( + state.compareAndSet(State.INITIAL, State.STARTING), + "Group membership may be started only once" + ); + + logger.info("Starting group membership initialization for group '{}'", groupName); + + CompletableFuture sessionConnectTask = getSessionConnectRetryableTask(); + CompletableFuture semaphoreCreateTask = getSemaphoreCreateTask(); + + CompletableFuture initializingRetriedTask = sessionConnectTask + .thenCompose(connectionStatus -> { + connectionStatus.expectSuccess("Unable to establish session"); + logger.debug("Successfully connected session for group '{}'", groupName); + return semaphoreCreateTask; + }) + .thenApply(semaphoreStatus -> { + if (semaphoreStatus.isSuccess()) { + logger.info("Successfully initialized semaphore for group '{}'", groupName); + onInitializeSuccess(); + } else { + logger.error("Failed to create semaphore for group '{}': {}", groupName, semaphoreStatus); + } + semaphoreStatus.expectSuccess("Unable to create semaphore"); + return semaphoreStatus; + }).exceptionally(ex -> { + logger.error("Group membership initialization failed for group '{}'", groupName, ex); + stopInternal(State.FAILED); + return Status.of(StatusCode.CLIENT_INTERNAL_ERROR); + }); + + initializingTask.set(initializingRetriedTask); + } + + private CompletableFuture getSessionConnectRetryableTask() { + return new RetryableTask( + "groupMembership-sessionConnect-" + groupName, + coordinationSession::connect, + scheduledExecutor, + retryPolicy + ).execute(); + } + + private CompletableFuture getSemaphoreCreateTask() { + Supplier> semaphoreCreateTaskSupplier = () -> + coordinationSession.createSemaphore(groupName, MAX_GROUP_SIZE) + .thenCompose(status -> { + if (status.getCode() == StatusCode.ALREADY_EXISTS) { + return CompletableFuture.completedFuture(Status.SUCCESS); + } + return CompletableFuture.completedFuture(status); + }); + return new RetryableTask( + "groupMembership-semaphoreCreate-" + groupName, + semaphoreCreateTaskSupplier, + scheduledExecutor, + retryPolicy + ).execute(); + } + + private void onInitializeSuccess() { + logger.info("Group membership initialization completed successfully for group '{}'", groupName); + state.set(State.STARTED); + semaphoreObserver.start(); + tryEnqueueAcquire(); + } + + /** + * Enqueues task if no current working task + */ + private synchronized boolean tryEnqueueAcquire() { + if (acquireTask != null) { + logger.warn("Acquire task already in progress for group '{}', skipping", groupName); + return false; + } + + logger.debug("Enqueuing new acquire task for group '{}'", groupName); + CompletableFuture acquireRetryableTask = new RetryableTask( + "groupMembership-acquireSemaphoreTask-" + groupName, + () -> coordinationSession.acquireSemaphore(groupName, 1, data, ACQUIRE_TIMEOUT) + .thenApply(Result::getStatus), + scheduledExecutor, + retryPolicy + ).execute(); + + acquireTask = acquireRetryableTask.whenComplete(this::finishAcquireTask); + return true; + } + + private synchronized void finishAcquireTask(Status status, @Nullable Throwable throwable) { + acquireTask = null; + + if (throwable != null) { + logger.error("Acquire task failed with exception for group '{}'", groupName, throwable); + tryEnqueueAcquire(); + return; + } + + if (status.isSuccess()) { + logger.info("Successfully acquired semaphore for group '{}'", groupName); + return; + } + + logger.warn("Failed to acquire semaphore for group '{}' with status: '{}'", groupName, status); + tryEnqueueAcquire(); + } + + /** + * Gets the current list of group members. + * + * @return list of current group members, or null if not available + */ + public @Nullable List getCurrentMembers() { + SemaphoreDescription cachedDescription = semaphoreObserver.getCachedData(); + return mapSemaphoreDescriptionToMembersList(cachedDescription); + } + + private static @Nullable List mapSemaphoreDescriptionToMembersList(SemaphoreDescription description) { + if (description == null) { + return null; + } + + List ownersList = description.getOwnersList(); + return ownersList.stream().map(GroupMembership::mapSessionToGroupMember).collect(Collectors.toList()); + } + + private static GroupMember mapSessionToGroupMember(SemaphoreDescription.Session session) { + return new GroupMember( + session.getId(), + session.getData() + ); + } + + /** + * Gets a listenable for session state changes. + * + * @return observable for coordination session state changes + */ + @Override + public Listenable getSessionListenable() { + return sessionListenable; + } + + /** + * Gets a listenable for group membership changes. + * + * @return observable for group membership changes + */ + public Listenable> getMembersListenable() { + return groupMembersListenable; + } + + /** + * Closes the group membership and releases all resources. + * + *

After closing, the instance cannot be reused. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + logger.info("Closing group membership for group '{}'", groupName); + stopInternal(State.CLOSED); + } + + /** + * Stops the service and transitions to specified termination state. + * + * @param terminationState either FAILED or CLOSED + * @return true if state was changed, false if already terminated + */ + private synchronized boolean stopInternal(State terminationState) { + State localState = state.get(); + if (localState == State.FAILED || localState == State.CLOSED) { + logger.warn("Attempted to stop already stopped group membership '{}' (current state: {})", + groupName, localState); + return false; + } + logger.info("Stopping group membership '{}' (current state: {}, target state: {})", + groupName, localState, terminationState); + + // change state + state.set(terminationState); + + // stop tasks + Future localInitializingTask = initializingTask.get(); + if (localInitializingTask != null) { + localInitializingTask.cancel(true); + initializingTask.set(null); + } + Future localAcquireTask = acquireTask; + if (localAcquireTask != null) { + localAcquireTask.cancel(true); + acquireTask = null; + } + + // Clean up resources + try { + semaphoreObserver.close(); + } catch (Exception e) { + logger.warn("Error closing semaphore observer for {}: {}", groupName, e.getMessage()); + } + try { + coordinationSession.close(); + } catch (Exception e) { + logger.warn("Error closing session for {}: {}", groupName, e.getMessage()); + } + + return true; + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/group/GroupMembershipSettings.java b/coordination/src/main/java/tech/ydb/coordination/recipes/group/GroupMembershipSettings.java new file mode 100644 index 000000000..c5c932200 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/group/GroupMembershipSettings.java @@ -0,0 +1,56 @@ +package tech.ydb.coordination.recipes.group; + +import java.time.Duration; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import tech.ydb.common.retry.RetryPolicy; +import tech.ydb.common.retry.RetryUntilElapsed; + +public class GroupMembershipSettings { + public static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(5); + public static final RetryUntilElapsed DEFAULT_RETRY_POLICY = new RetryUntilElapsed( + DEFAULT_CONNECT_TIMEOUT.toMillis(), 250, 5 + ); + + private final ScheduledExecutorService scheduledExecutor; + private final RetryPolicy retryPolicy; + + public GroupMembershipSettings(Builder builder) { + this.scheduledExecutor = builder.scheduledExecutor != null ? builder.scheduledExecutor : + Executors.newSingleThreadScheduledExecutor(); + this.retryPolicy = builder.retryPolicy; + } + + public RetryPolicy getRetryPolicy() { + return retryPolicy; + } + + public ScheduledExecutorService getScheduledExecutor() { + return scheduledExecutor; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private RetryPolicy retryPolicy = DEFAULT_RETRY_POLICY; + private ScheduledExecutorService scheduledExecutor; + + public Builder withRetryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + return this; + } + + public Builder withScheduledExecutor(ScheduledExecutorService scheduledExecutor) { + this.scheduledExecutor = scheduledExecutor; + return this; + } + + public GroupMembershipSettings build() { + return new GroupMembershipSettings(this); + } + } +} + diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/locks/InterProcessLock.java b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/InterProcessLock.java new file mode 100644 index 000000000..d32508462 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/InterProcessLock.java @@ -0,0 +1,52 @@ +package tech.ydb.coordination.recipes.locks; + +import java.time.Duration; + +import tech.ydb.coordination.recipes.locks.exception.LockAcquireFailedException; +import tech.ydb.coordination.recipes.locks.exception.LockAlreadyAcquiredException; +import tech.ydb.coordination.recipes.locks.exception.LockReleaseFailedException; +import tech.ydb.coordination.recipes.locks.exception.LockStateException; +import tech.ydb.coordination.recipes.util.SessionListenableProvider; + +public interface InterProcessLock extends SessionListenableProvider { + /** + * Acquires the distributed lock, blocking until it is obtained. + * + * @throws Exception if an unexpected error occurs + * @throws LockAlreadyAcquiredException if the lock is already acquired by this process + * @throws LockAcquireFailedException if the lock acquisition fails + * @throws LockStateException if the lock is in invalid state for acquisition + */ + void acquire() throws Exception, LockAlreadyAcquiredException, LockAcquireFailedException, LockStateException; + + /** + * Attempts to acquire the lock within the given waiting time. + * + * @param waitDuration maximum time to wait for the lock + * @return true if the lock was acquired, false if the waiting time elapsed + * @throws Exception if an unexpected error occurs + * @throws LockAlreadyAcquiredException if the lock is already acquired by this process + * @throws LockAcquireFailedException if the lock acquisition fails + * @throws LockStateException if the lock is in invalid state for acquisition + */ + boolean acquire(Duration waitDuration) throws Exception, LockAlreadyAcquiredException, LockAcquireFailedException, + LockStateException; + + /** + * Releases the lock if it is held by this process. + * + * @return false if there was nothing to release + * @throws Exception if an unexpected error occurs + * @throws LockReleaseFailedException if the lock release fails + * @throws LockStateException if the lock is in invalid state for release + */ + boolean release() throws Exception, LockReleaseFailedException, LockStateException; + + + /** + * Checks if the lock is currently acquired by this process. + * + * @return true if the lock is acquired by this process + */ + boolean isAcquiredInThisProcess(); +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/locks/InterProcessMutex.java b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/InterProcessMutex.java new file mode 100644 index 000000000..e1b297ee7 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/InterProcessMutex.java @@ -0,0 +1,216 @@ +package tech.ydb.coordination.recipes.locks; + +import java.io.Closeable; +import java.time.Duration; +import java.time.Instant; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.coordination.recipes.locks.exception.LockException; +import tech.ydb.coordination.recipes.locks.exception.LockInitializationException; +import tech.ydb.coordination.recipes.locks.exception.LockStateException; +import tech.ydb.coordination.recipes.util.Listenable; +import tech.ydb.coordination.recipes.util.ListenableContainer; + +/** + * Distributed mutex implementation using a coordination service. + * This implementation is thread-safe and supports session state listening. + */ +public class InterProcessMutex implements InterProcessLock, Closeable { + private static final Logger logger = LoggerFactory.getLogger(InterProcessMutex.class); + + private final String lockName; + private final CoordinationSession coordinationSession; + private final Future sessionConnectionTask; + private final LockInternals lockInternals; + private final ListenableContainer sessionListenable; + + private final AtomicReference state = new AtomicReference<>(State.INITIAL); + + /** + * Internal state machine states + */ + private enum State { + INITIAL, + STARTING, + STARTED, + FAILED, + CLOSED + } + + /** + * Creates a new distributed mutex instance with default settings. + * + * @param client coordination client + * @param coordinationNodePath path to the coordination node + * @param lockName name of the lock + * @throws IllegalArgumentException if any parameter is null + */ + public InterProcessMutex( + CoordinationClient client, + String coordinationNodePath, + String lockName + ) { + this( + client, + coordinationNodePath, + lockName, + InterProcessMutexSettings.newBuilder().build() + ); + } + + /** + * Creates a new distributed mutex instance. + * + * @param client coordination client + * @param coordinationNodePath path to the coordination node + * @param lockName name of the lock + * @param settings configuration settings + * @throws IllegalArgumentException if any parameter is null + */ + public InterProcessMutex( + CoordinationClient client, + String coordinationNodePath, + String lockName, + InterProcessMutexSettings settings + ) { + if (client == null || coordinationNodePath == null || lockName == null || settings == null) { + throw new IllegalArgumentException("All parameters must be non-null"); + } + + state.set(State.STARTING); + this.lockName = lockName; + this.coordinationSession = client.createSession(coordinationNodePath); + this.sessionListenable = new ListenableContainer<>(); + this.lockInternals = new LockInternals(coordinationSession, lockName); + + coordinationSession.addStateListener(sessionState -> { + if (sessionState == CoordinationSession.State.LOST || sessionState == CoordinationSession.State.CLOSED) { + logger.error("Coordination session unexpectedly changed to '{}' state, marking mutex as 'FAILED'", + sessionState); + state.set(State.FAILED); + } + logger.info("New State: " + sessionState); + sessionListenable.notifyListeners(sessionState); + }); + + sessionConnectionTask = coordinationSession.connect().thenAccept(sessionConnectStatus -> { + if (!sessionConnectStatus.isSuccess()) { + state.set(State.FAILED); + logger.error("Failed to establish coordination session for lock '{}'", lockName); + } else { + state.set(State.STARTED); + logger.info("Successfully established session for lock '{}'", lockName); + } + }); + + if (settings.isWaitConnection()) { + try { + logger.debug("Waiting for session connection to complete for lock '{}'", lockName); + sessionConnectionTask.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Interrupted while waiting for session connection for lock '{}'", lockName, e); + throw new LockInitializationException("Interrupted while initializing lock '" + lockName + "'", e, + lockName); + } catch (ExecutionException e) { + logger.error("Failed to initialize lock '{}' due to execution error", lockName, e); + throw new LockInitializationException("Failed to initialize lock '" + lockName + "'", e.getCause(), + lockName); + } + } + } + + @Override + public void acquire() throws Exception { + checkState(); + logger.debug("Attempting to acquire lock '{}'", lockName); + lockInternals.tryAcquire( + null, + true, + null + ); + logger.debug("Lock '{}' acquired successfully", lockName); + } + + @Override + public boolean acquire(Duration waitDuration) throws Exception { + Objects.requireNonNull(waitDuration, "wait duration must not be null"); + + checkState(); + logger.debug("Attempting to acquire lock '{}' with timeout {}", lockName, waitDuration); + Instant deadline = Instant.now().plus(waitDuration); + boolean acquired = lockInternals.tryAcquire( + deadline, + true, + null + ) != null; + logger.debug("Lock '{}' acquisition {}successful", lockName, acquired ? "" : "un"); + return acquired; + } + + @Override + public boolean release() throws InterruptedException { + checkState(); + logger.debug("Attempting to release lock '{}'", lockName); + boolean released = lockInternals.release(); + if (released) { + logger.debug("Lock '{}' released successfully", lockName); + } else { + logger.debug("No lock to release"); + } + return released; + } + + @Override + public boolean isAcquiredInThisProcess() { + try { + checkState(); + boolean acquired = lockInternals.isAcquired(); + logger.trace("Lock acquisition check: {}", acquired); + return acquired; + } catch (LockException e) { + logger.debug("Lock state check failed", e); + return false; + } + } + + @Override + public Listenable getSessionListenable() { + return sessionListenable; + } + + @Override + public void close() { + logger.debug("Closing mutex '{}'", lockName); + state.set(State.CLOSED); + try { + lockInternals.close(); + coordinationSession.close(); + } catch (Exception e) { + logger.warn("Error while closing lock internals '{}'", lockName, e); + } + logger.info("Mutex '{}' closed", lockName); + } + + private void checkState() throws LockStateException { + State currentState = state.get(); + if (currentState == State.FAILED) { + throw new LockStateException("Lock '" + lockName + "' is in FAILED state", lockName); + } + if (currentState == State.CLOSED) { + throw new LockStateException("Lock '" + lockName + "' is already closed", lockName); + } + if (currentState != State.STARTED) { + throw new LockStateException("Lock '" + lockName + "' is not ready (current state: " + currentState + ")", + lockName); + } + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/locks/InterProcessMutexSettings.java b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/InterProcessMutexSettings.java new file mode 100644 index 000000000..e2a419010 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/InterProcessMutexSettings.java @@ -0,0 +1,30 @@ +package tech.ydb.coordination.recipes.locks; + +public class InterProcessMutexSettings { + private final boolean waitConnection; + + public InterProcessMutexSettings(Builder builder) { + this.waitConnection = builder.waitConnection; + } + + public boolean isWaitConnection() { + return waitConnection; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private boolean waitConnection = false; + + public Builder withWaitConnection(boolean waitConnection) { + this.waitConnection = waitConnection; + return this; + } + + public InterProcessMutexSettings build() { + return new InterProcessMutexSettings(this); + } + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/locks/LockInternals.java b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/LockInternals.java new file mode 100644 index 000000000..5f0296180 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/LockInternals.java @@ -0,0 +1,280 @@ +package tech.ydb.coordination.recipes.locks; + +import java.io.Closeable; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.coordination.SemaphoreLease; +import tech.ydb.coordination.recipes.locks.exception.LockAcquireFailedException; +import tech.ydb.coordination.recipes.locks.exception.LockAlreadyAcquiredException; +import tech.ydb.coordination.recipes.locks.exception.LockReleaseFailedException; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; + +public class LockInternals implements Closeable { + private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30); + + private static final Logger logger = LoggerFactory.getLogger(LockInternals.class); + + private final boolean persistent; + private final long maxPersistentLease; + private final String lockName; + private final CoordinationSession coordinationSession; + + private volatile LeaseData leaseData = null; + + public static class LeaseData { + private final SemaphoreLease processLease; + private final boolean exclusive; + private final long leaseSessionId; + + public LeaseData(SemaphoreLease processLease, boolean exclusive, long leaseSessionId) { + this.processLease = processLease; + this.exclusive = exclusive; + this.leaseSessionId = leaseSessionId; + } + + public boolean isExclusive() { + return exclusive; + } + + public SemaphoreLease getProcessLease() { + return processLease; + } + + public long getLeaseSessionId() { + return leaseSessionId; + } + + @Override + public String toString() { + return "LeaseData{" + + "processLease=" + processLease + + ", isExclusive=" + exclusive + + ", leaseSessionId=" + leaseSessionId + + '}'; + } + } + + public LockInternals( + CoordinationSession coordinationSession, + String lockName + ) { + this(coordinationSession, lockName, null); + } + + public LockInternals( + CoordinationSession coordinationSession, + String lockName, + @Nullable Long maxPersistentLease + ) { + if (maxPersistentLease == null) { + this.persistent = false; + this.maxPersistentLease = -1; + } else { + this.persistent = true; + this.maxPersistentLease = maxPersistentLease; + } + + this.lockName = lockName; + this.coordinationSession = coordinationSession; + this.coordinationSession.addStateListener(new Consumer() { + @Override + public void accept(CoordinationSession.State state) { + switch (state) { + case RECONNECTED: + onReconnect(); + break; + case CLOSED: + case LOST: + leaseData = null; + break; + default: + } + } + }); + } + + private void onReconnect() { + LeaseData currentLeaseData = leaseData; + long oldId = currentLeaseData.getLeaseSessionId(); + long newId = coordinationSession.getId(); + if (oldId != newId) { + logger.warn( + "Current session with new id: {} lost lease after reconnection on semaphore: {}", + newId, + lockName + ); + leaseData = null; + } else { + logger.debug("Successfully reestablished session with same id: {}", newId); + } + } + + public synchronized boolean release() throws LockReleaseFailedException, InterruptedException { + logger.debug("Trying to release semaphore '{}'", lockName); + + if (!coordinationSession.getState().isActive()) { + throw new LockReleaseFailedException( + "Coordination session is inactive", + lockName + ); + } + + LeaseData localLeaseData = leaseData; + if (localLeaseData == null) { + logger.debug("Semaphore '{}' already released", lockName); + return false; + } + + try { + localLeaseData.getProcessLease().release().get(); + leaseData = null; + logger.debug("Successfully released semaphore '{}'", lockName); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; + } catch (ExecutionException e) { + throw new LockReleaseFailedException( + "Failed to release lock: " + e.getCause().getMessage(), + e.getCause(), + lockName + ); + } + } + + public synchronized LeaseData tryAcquire( + @Nullable Instant deadline, + boolean exclusive, + byte @Nullable [] data + ) throws Exception { + logger.debug("Trying to acquire lock: {} with deadline: {}, exclusive: {}", lockName, deadline, exclusive); + + if (leaseData != null) { + throw new LockAlreadyAcquiredException(lockName); + } + + Optional lease = tryBlockingLock(deadline, exclusive, data); + if (lease.isPresent()) { + LeaseData localLeaseData = new LeaseData(lease.get(), exclusive, 1); + leaseData = localLeaseData; + logger.debug("Successfully acquired lock: {}", lockName); + return localLeaseData; + } + + logger.debug("Unable to acquire lock: {}", lockName); + return null; + } + + private Optional tryBlockingLock( + @Nullable Instant deadline, + boolean exclusive, + byte @Nullable [] data + ) throws Exception { + int retryCount = 0; + while (coordinationSession.getState().isActive() && (deadline == null || Instant.now().isBefore(deadline))) { + retryCount++; + + Duration timeout; + if (deadline == null) { + timeout = DEFAULT_TIMEOUT; + } else { + timeout = Duration.between(Instant.now(), deadline); + } + + CompletableFuture> acquireTask = acquireCall(exclusive, data, timeout); + + Result leaseResult; + try { + leaseResult = acquireTask.get(); + } catch (InterruptedException e) { + // If acquire is interrupted, then release immediately + Thread.currentThread().interrupt(); + acquireTask.thenAccept(acquireResult -> { + if (!acquireResult.getStatus().isSuccess()) { + return; + } + SemaphoreLease lease = acquireResult.getValue(); + lease.release(); + }); + throw e; + } + + Status status = leaseResult.getStatus(); + logger.debug("Lease result status: {}", status); + + if (status.isSuccess()) { + logger.debug("Successfully acquired the lock '{}'", lockName); + return Optional.of(leaseResult.getValue()); + } + + if (status.getCode() == StatusCode.TIMEOUT) { + logger.debug("Trying to acquire semaphore {} again, retries: {}", lockName, retryCount); + continue; + } + + if (!status.getCode().isRetryable(true)) { + logger.debug("Unable to retry acquiring semaphore '{}'", lockName); + throw new LockAcquireFailedException(lockName); + } + } + + if (deadline != null && Instant.now().compareTo(deadline) >= 0) { + return Optional.empty(); + } + + throw new LockAcquireFailedException(lockName); + } + + private CompletableFuture> acquireCall( + boolean exclusive, + byte[] data, + Duration timeout + ) { + if (!persistent) { + return coordinationSession.acquireEphemeralSemaphore(lockName, exclusive, data, timeout); + } + + if (exclusive) { + return coordinationSession.acquireSemaphore(lockName, maxPersistentLease, data, timeout); + } + + return coordinationSession.acquireSemaphore(lockName, 1, data, timeout); + } + + public String getLockName() { + return lockName; + } + + public @Nullable LeaseData getLeaseData() { + return leaseData; + } + + public boolean isAcquired() { + return leaseData != null; + } + + public boolean isPersistent() { + return persistent; + } + + @Override + public void close() { + try { + release(); + } catch (Exception exception) { + logger.error("Exception during closing release", exception); + } + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/locks/ReadWriteInterProcessLock.java b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/ReadWriteInterProcessLock.java new file mode 100644 index 000000000..3137f5d1b --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/ReadWriteInterProcessLock.java @@ -0,0 +1,275 @@ +package tech.ydb.coordination.recipes.locks; + +import java.io.Closeable; +import java.time.Duration; +import java.time.Instant; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.coordination.recipes.locks.exception.LockInitializationException; +import tech.ydb.coordination.recipes.locks.exception.LockStateException; +import tech.ydb.coordination.recipes.util.Listenable; +import tech.ydb.coordination.recipes.util.ListenableContainer; + +/** + * Distributed read-write lock implementation that allows multiple readers or a single writer. + * + *

This implementation provides non-reentrant read/write locking semantics across multiple processes. + * Multiple processes can hold the read lock simultaneously, while only one process can hold + * the write lock (with no concurrent readers).

+ * + *

Thread-safety: Instances of this class are thread-safe and can be used from multiple threads.

+ */ +public class ReadWriteInterProcessLock implements Closeable { + private static final Logger logger = LoggerFactory.getLogger(ReadWriteInterProcessLock.class); + + private final InternalLock readLock; + private final InternalLock writeLock; + + /** + * Creates a new distributed read-write lock with default settings. + * + * @param client the coordination client instance + * @param coordinationNodePath the base path for coordination nodes + * @param lockName the name of the lock (must be unique within the coordination namespace) + * @throws IllegalArgumentException if any parameter is null + */ + public ReadWriteInterProcessLock( + CoordinationClient client, + String coordinationNodePath, + String lockName + ) { + this( + client, + coordinationNodePath, + lockName, + ReadWriteInterProcessLockSettings.newBuilder().build() + ); + } + + /** + * Creates a new distributed read-write lock with custom settings. + * + * @param client the coordination client instance + * @param coordinationNodePath the base path for coordination nodes + * @param lockName the name of the lock (must be unique within the coordination namespace) + * @param settings the lock configuration settings + * @throws IllegalArgumentException if any parameter is null + * @throws LockInitializationException if the lock cannot be initialized + */ + public ReadWriteInterProcessLock( + CoordinationClient client, + String coordinationNodePath, + String lockName, + ReadWriteInterProcessLockSettings settings + ) { + if (client == null || coordinationNodePath == null || lockName == null || settings == null) { + throw new IllegalArgumentException("All parameters must be non-null"); + } + + this.readLock = new InternalLock( + client, + coordinationNodePath, + lockName, + false + ); + this.writeLock = new InternalLock( + client, + coordinationNodePath, + lockName, + true + ); + + if (settings.isWaitConnection()) { + try { + logger.debug("Waiting for session connection to complete for rwlock {}", lockName); + readLock.sessionConnectionTask.get(); + writeLock.sessionConnectionTask.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Interrupted while waiting for session connection for rwlock '{}'", lockName, e); + throw new LockInitializationException( + "Interrupted while initializing rwlock '" + lockName + "'", e, lockName + ); + } catch (ExecutionException e) { + logger.error("Failed to initialize rwlock '{}' due to execution error", lockName, e); + throw new LockInitializationException( + "Failed to initialize rwlock '" + lockName + "'", e.getCause(), lockName + ); + } + } + } + + /** + * Returns the write lock instance. + * + * @return the write lock (exclusive) + */ + public InterProcessLock writeLock() { + return writeLock; + } + + /** + * Returns the read lock instance. + * + * @return the read lock (shared) + */ + public InterProcessLock readLock() { + return readLock; + } + + /** + * Internal lock implementation that handles both read and write operations. + */ + private static class InternalLock implements InterProcessLock { + private final String lockName; + private final boolean isExclusive; + private final Future sessionConnectionTask; + private final CoordinationSession coordinationSession; + private final LockInternals lockInternals; + private final ListenableContainer sessionListenable; + + private final AtomicReference state = new AtomicReference<>(State.INITIAL); + + /** + * Internal state of the lock. + */ + private enum State { + INITIAL, + STARTING, + STARTED, + FAILED, + CLOSED + } + + InternalLock( + CoordinationClient client, + String coordinationNodePath, + String lockName, + boolean isExclusive + ) { + state.set(State.STARTING); + logger.debug("Initializing InterProcessMutex for lock '{}'", lockName); + + this.lockName = lockName; + this.coordinationSession = client.createSession(coordinationNodePath); + this.sessionListenable = new ListenableContainer<>(); + this.lockInternals = new LockInternals(coordinationSession, lockName); + this.isExclusive = isExclusive; + + coordinationSession.addStateListener(sessionState -> { + if (sessionState == CoordinationSession.State.LOST || + sessionState == CoordinationSession.State.CLOSED) { + logger.error("Coordination session unexpectedly changed to {} state, marking lock as FAILED", + sessionState); + state.set(State.FAILED); + } + sessionListenable.notifyListeners(sessionState); + }); + + sessionConnectionTask = coordinationSession.connect().thenAccept(sessionConnectStatus -> { + if (!sessionConnectStatus.isSuccess()) { + state.set(State.FAILED); + logger.error("Failed to establish coordination session for lock '{}'", lockName); + } else { + state.set(State.STARTED); + logger.info("Successfully established session for lock '{}'", lockName); + } + }); + } + + @Override + public void acquire() throws Exception { + checkState(); + logger.debug("Attempting to acquire lock {}", lockName); + lockInternals.tryAcquire( + null, + isExclusive, + null + ); + logger.debug("Lock '{}' acquired successfully", lockName); + } + + @Override + public boolean acquire(Duration waitDuration) throws Exception { + Objects.requireNonNull(waitDuration, "wait duration must not be null"); + + checkState(); + logger.debug("Attempting to acquire lock '{}' with timeout {}", lockName, waitDuration); + Instant deadline = Instant.now().plus(waitDuration); + boolean acquired = lockInternals.tryAcquire( + deadline, + isExclusive, + null + ) != null; + logger.debug("Lock '{}' acquisition {}successful", lockName, acquired ? "" : "un"); + return acquired; + } + + @Override + public boolean release() throws Exception { + checkState(); + logger.debug("Attempting to release lock '{}'", lockName); + boolean released = lockInternals.release(); + if (released) { + logger.debug("Lock {} released successfully", lockName); + } else { + logger.debug("No lock to release"); + } + return released; + } + + @Override + public boolean isAcquiredInThisProcess() { + return lockInternals.isAcquired(); + } + + @Override + public Listenable getSessionListenable() { + return sessionListenable; + } + + private void close() { + logger.debug("Closing rwlock {}", lockName); + state.set(State.CLOSED); + try { + lockInternals.close(); + } catch (Exception e) { + logger.warn("Error while closing rwlock internals {}", lockName, e); + } + logger.info("Rwlock {} closed", lockName); + } + + private void checkState() throws LockStateException { + State currentState = state.get(); + if (currentState == State.FAILED) { + throw new LockStateException("Lock '" + lockName + "' is in FAILED state", lockName); + } + if (currentState == State.CLOSED) { + throw new LockStateException("Lock '" + lockName + "' is already closed", lockName); + } + if (currentState != State.STARTED) { + throw new LockStateException( + "Lock '" + lockName + "' is not ready (current state: " + currentState + ")", lockName + ); + } + } + } + + /** + * Closes both read and write locks and releases all associated resources. + * After closing, the lock instance can no longer be used. + */ + @Override + public void close() { + readLock.close(); + writeLock.close(); + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/locks/ReadWriteInterProcessLockSettings.java b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/ReadWriteInterProcessLockSettings.java new file mode 100644 index 000000000..aef384c40 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/ReadWriteInterProcessLockSettings.java @@ -0,0 +1,31 @@ +package tech.ydb.coordination.recipes.locks; + +public class ReadWriteInterProcessLockSettings { + private final boolean waitConnection; + + public ReadWriteInterProcessLockSettings(Builder builder) { + this.waitConnection = builder.waitConnection; + } + + public boolean isWaitConnection() { + return waitConnection; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private boolean waitConnection = false; + + public Builder withWaitConnection(boolean waitConnection) { + this.waitConnection = waitConnection; + return this; + } + + public ReadWriteInterProcessLockSettings build() { + return new ReadWriteInterProcessLockSettings(this); + } + } +} + diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockAcquireFailedException.java b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockAcquireFailedException.java new file mode 100644 index 000000000..ac41a8505 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockAcquireFailedException.java @@ -0,0 +1,18 @@ +package tech.ydb.coordination.recipes.locks.exception; + +/** + * Thrown when lock acquisition fails (excluding cases when lock is already acquired) + */ +public class LockAcquireFailedException extends LockException { + public LockAcquireFailedException(String lockName) { + super("Failed to acquire lock '" + lockName + "'", lockName); + } + + public LockAcquireFailedException(String message, String lockName) { + super(message, lockName); + } + + public LockAcquireFailedException(String message, Throwable cause, String lockName) { + super(message, cause, lockName); + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockAlreadyAcquiredException.java b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockAlreadyAcquiredException.java new file mode 100644 index 000000000..f72d88745 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockAlreadyAcquiredException.java @@ -0,0 +1,18 @@ +package tech.ydb.coordination.recipes.locks.exception; + +/** + * Thrown when attempting to acquire a lock that is already held by current process + */ +public class LockAlreadyAcquiredException extends LockException { + public LockAlreadyAcquiredException(String lockName) { + super("Lock '" + lockName + "' is already acquired by this process", lockName); + } + + public LockAlreadyAcquiredException(String message, String lockName) { + super(message, lockName); + } + + public LockAlreadyAcquiredException(String message, Throwable cause, String lockName) { + super(message, cause, lockName); + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockException.java b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockException.java new file mode 100644 index 000000000..ebdd9bbcf --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockException.java @@ -0,0 +1,23 @@ +package tech.ydb.coordination.recipes.locks.exception; + +public class LockException extends RuntimeException { + protected final String lockName; + + public LockException(String lockName) { + this.lockName = lockName; + } + + public LockException(String message, String lockName) { + super(message); + this.lockName = lockName; + } + + public LockException(String message, Throwable cause, String lockName) { + super(message, cause); + this.lockName = lockName; + } + + public String getLockName() { + return lockName; + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockInitializationException.java b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockInitializationException.java new file mode 100644 index 000000000..583b84ea7 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockInitializationException.java @@ -0,0 +1,18 @@ +package tech.ydb.coordination.recipes.locks.exception; + +/** + * Thrown when lock initialization fails + */ +public class LockInitializationException extends LockException { + public LockInitializationException(String lockName) { + super("Failed to initialize lock '" + lockName + "'", lockName); + } + + public LockInitializationException(String message, String lockName) { + super(message, lockName); + } + + public LockInitializationException(String message, Throwable cause, String lockName) { + super(message, cause, lockName); + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockReleaseFailedException.java b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockReleaseFailedException.java new file mode 100644 index 000000000..fe3e04287 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockReleaseFailedException.java @@ -0,0 +1,18 @@ +package tech.ydb.coordination.recipes.locks.exception; + +/** + * Thrown when lock release operation fails + */ +public class LockReleaseFailedException extends LockException { + public LockReleaseFailedException(String lockName) { + super("Failed to release lock '" + lockName + "'", lockName); + } + + public LockReleaseFailedException(String message, String lockName) { + super(message, lockName); + } + + public LockReleaseFailedException(String message, Throwable cause, String lockName) { + super(message, cause, lockName); + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockStateException.java b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockStateException.java new file mode 100644 index 000000000..4e81d7da8 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/locks/exception/LockStateException.java @@ -0,0 +1,19 @@ +package tech.ydb.coordination.recipes.locks.exception; + +/** + * Thrown when lock operation is attempted in invalid state + */ +public class LockStateException extends LockException { + public LockStateException(String lockName) { + super("Invalid state for lock operation on lock '" + lockName + "'", lockName); + } + + public LockStateException(String message, String lockName) { + super(message, lockName); + } + + public LockStateException(String message, Throwable cause, String lockName) { + super(message, cause, lockName); + } +} + diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/util/Listenable.java b/coordination/src/main/java/tech/ydb/coordination/recipes/util/Listenable.java new file mode 100644 index 000000000..2cc31bd38 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/util/Listenable.java @@ -0,0 +1,35 @@ +package tech.ydb.coordination.recipes.util; + +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; + +/** + * Generic interface for objects that allow adding and removing listeners for events of type T. + * + * @param the type of event data that listeners will receive + */ +public interface Listenable { + /** + * Adds a listener that will be notified synchronously when the event occurs. + * + * @param listener the listener to add, must not be null + * @throws NullPointerException if listener is null + */ + void addListener(Consumer listener); + + /** + * Adds a listener that will be notified asynchronously using the provided executor. + * + * @param listener the listener to add, must not be null + * @param executor the executor to use for asynchronous notification, must not be null + * @throws NullPointerException if listener or executor is null + */ + void addListener(Consumer listener, ExecutorService executor); + + /** + * Removes the specified listener. + * + * @param listener the listener to remove + */ + void removeListener(Consumer listener); +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/util/ListenableContainer.java b/coordination/src/main/java/tech/ydb/coordination/recipes/util/ListenableContainer.java new file mode 100644 index 000000000..9769ee5b4 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/util/ListenableContainer.java @@ -0,0 +1,87 @@ +package tech.ydb.coordination.recipes.util; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thread-safe container for managing and notifying listeners. + * + * @param the type of event data that listeners will receive + */ +public class ListenableContainer implements Listenable { + private static final Logger logger = LoggerFactory.getLogger(ListenableContainer.class); + + // Maps original listeners to potentially wrapped listeners + private final Map, Consumer> listenersMapping = new ConcurrentHashMap<>(); + + /** + * Notifies all registered listeners with the provided data. + * Exceptions thrown by listeners are caught and logged. + * + * @param data the data to send to listeners + * @throws NullPointerException if data is null + */ + public void notifyListeners(T data) { + Objects.requireNonNull(data, "Data cannot be null"); + + listenersMapping.values().forEach(listener -> { + try { + listener.accept(data); + } catch (Exception ex) { + logger.error("Listener threw exception during notification", ex); + } + }); + } + + @Override + public void addListener(Consumer listener) { + Objects.requireNonNull(listener, "Listener cannot be null"); + + if (listenersMapping.containsKey(listener)) { + logger.debug("Listener already registered, skipping"); + return; + } + + listenersMapping.put(listener, listener); + } + + @Override + public void addListener(Consumer listener, ExecutorService executor) { + Objects.requireNonNull(listener, "Listener cannot be null"); + Objects.requireNonNull(executor, "Executor cannot be null"); + + if (listenersMapping.containsKey(listener)) { + logger.debug("Listener already registered, skipping"); + return; + } + + Consumer wrapper = data -> { + try { + executor.submit(() -> { + try { + listener.accept(data); + } catch (Exception ex) { + logger.error("Asynchronous listener threw exception", ex); + } + }); + } catch (Exception ex) { + logger.error("Failed to submit listener task to executor", ex); + } + }; + + listenersMapping.put(listener, wrapper); + } + + @Override + public void removeListener(Consumer listener) { + Objects.requireNonNull(listener, "Listener cannot be null"); + + listenersMapping.remove(listener); + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/util/RetryableTask.java b/coordination/src/main/java/tech/ydb/coordination/recipes/util/RetryableTask.java new file mode 100644 index 000000000..0e3649e59 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/util/RetryableTask.java @@ -0,0 +1,104 @@ +package tech.ydb.coordination.recipes.util; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.common.retry.RetryPolicy; +import tech.ydb.core.Status; + +public class RetryableTask { + private static final Logger logger = LoggerFactory.getLogger(RetryableTask.class); + + private final String taskName; + private final Supplier> taskSupplier; + private final ScheduledExecutorService executor; + private final RetryPolicy retryPolicy; + private final long startTime; + private int retryCount; + + public RetryableTask( + String taskName, + Supplier> taskSupplier, + ScheduledExecutorService executor, + RetryPolicy retryPolicy + ) { + this.taskName = taskName; + this.taskSupplier = taskSupplier; + this.executor = executor; + this.retryPolicy = retryPolicy; + this.startTime = System.currentTimeMillis(); + this.retryCount = 0; + } + + public CompletableFuture execute() { + CompletableFuture result = new CompletableFuture<>(); + attemptTask(result); + return result; + } + + void attemptTask(CompletableFuture result) { + try { + taskSupplier.get().whenComplete((status, throwable) -> { + if (throwable != null) { + handleFailure(result, throwable); + } else if (status.isSuccess()) { + logSuccess(); + result.complete(status); + } else { + handleFailure( + result, + new RuntimeException("Operation '" + taskName + "' failed with status: " + status) + ); + } + }); + } catch (Exception e) { + handleFailure(result, e); + } + } + + private void handleFailure(CompletableFuture result, Throwable failure) { + long elapsedTime = System.currentTimeMillis() - startTime; + long delayMs = retryPolicy.nextRetryMs(retryCount, elapsedTime); + + if (delayMs >= 0) { + retryCount++; + logRetry(delayMs, failure); + + if (delayMs == 0) { + executor.execute(() -> attemptTask(result)); + } else { + executor.schedule(() -> attemptTask(result), delayMs, TimeUnit.MILLISECONDS); + } + } else { + logFailure(failure); + result.completeExceptionally(failure); + } + } + + private void logSuccess() { + if (retryCount > 0) { + logger.info("Operation '{}' succeeded after {} retries", taskName, retryCount); + } else { + logger.info("Operation '{}' succeeded on first attempt", taskName); + } + } + + private void logRetry(long delayMs, Throwable failure) { + logger.warn( + "Attempt {} of operation '{}' failed ({}). Retrying in {}ms", + retryCount, taskName, failure.getMessage(), delayMs + ); + } + + private void logFailure(Throwable failure) { + logger.error( + "Operation '{}' failed after {} retries. Last error: {}", + taskName, retryCount, failure.getMessage(), failure + ); + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/util/SemaphoreObserver.java b/coordination/src/main/java/tech/ydb/coordination/recipes/util/SemaphoreObserver.java new file mode 100644 index 000000000..c57e00555 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/util/SemaphoreObserver.java @@ -0,0 +1,254 @@ +package tech.ydb.coordination.recipes.util; + +import java.io.Closeable; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.common.retry.RetryPolicy; +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.coordination.description.SemaphoreDescription; +import tech.ydb.coordination.description.SemaphoreWatcher; +import tech.ydb.coordination.settings.DescribeSemaphoreMode; +import tech.ydb.coordination.settings.WatchSemaphoreMode; +import tech.ydb.core.Result; +import tech.ydb.core.Status; + +/** + * Observes changes in a distributed semaphore state and notifies listeners. + * Handles automatic reconnection and retries on failures. + */ +public class SemaphoreObserver implements Closeable { + private static final Logger logger = LoggerFactory.getLogger(SemaphoreObserver.class); + + private final CoordinationSession session; + private final String semaphoreName; + private final WatchSemaphoreMode watchSemaphoreMode; + private final DescribeSemaphoreMode describeSemaphoreMode; + private final RetryPolicy retryPolicy; + private final ScheduledExecutorService scheduledExecutor; + private final ListenableContainer watchDataListenable = new ListenableContainer<>(); + private final ListenableContainer sessionStateListenable = new ListenableContainer<>(); + + private AtomicReference state = new AtomicReference<>(State.CREATED); + private AtomicReference cachedData = new AtomicReference<>(null); + private Future watchTask; + private final AtomicReference> forceDescribeTask = new AtomicReference<>(); + + /** + * Observer state + */ + public enum State { + CREATED, + STARTED, + CLOSED + } + + /** + * Creates a new semaphore observer instance. + * + * @param session coordination session to use + * @param semaphoreName name of the semaphore to observe + * @param watchSemaphoreMode watch mode configuration + * @param describeSemaphoreMode describe mode configuration + * @param retryPolicy retry policy for failed operations + * @param scheduledExecutor executor for scheduling retries + */ + public SemaphoreObserver( + CoordinationSession session, + String semaphoreName, + WatchSemaphoreMode watchSemaphoreMode, + DescribeSemaphoreMode describeSemaphoreMode, + RetryPolicy retryPolicy, + ScheduledExecutorService scheduledExecutor + ) { + this.session = Objects.requireNonNull(session, "session cannot be null"); + this.semaphoreName = Objects.requireNonNull(semaphoreName, "semaphoreName cannot be null"); + this.watchSemaphoreMode = Objects.requireNonNull(watchSemaphoreMode, "watchSemaphoreMode cannot be null"); + this.describeSemaphoreMode = Objects.requireNonNull( + describeSemaphoreMode, "describeSemaphoreMode cannot be null" + ); + this.retryPolicy = Objects.requireNonNull(retryPolicy, "retryPolicy cannot be null"); + this.scheduledExecutor = Objects.requireNonNull(scheduledExecutor, "scheduledExecutor cannot be null"); + + this.session.addStateListener(state -> { + logger.debug("Session state changed to {} for semaphore {}", state, semaphoreName); + if (state == CoordinationSession.State.LOST || state == CoordinationSession.State.CLOSED) { + logger.warn("Session lost or closed, closing observer for semaphore {}", semaphoreName); + close(); + } + + if (state == CoordinationSession.State.RECONNECTED) { + logger.info("Session reconnected, forcing describe for semaphore {}", semaphoreName); + enqueueForceDescribe(); + } + + sessionStateListenable.notifyListeners(state); + }); + } + + /** + * Starts observing the semaphore state. + * Can only be called once. + */ + public void start() { + if (!state.compareAndSet(State.CREATED, State.STARTED)) { + logger.warn("Attempt to start already started observer for semaphore {}", semaphoreName); + return; + } + + logger.info("Starting semaphore observer for: {}", semaphoreName); + enqueueForceDescribe(); + enqueueWatch(); + } + + private void enqueueForceDescribe() { + CompletableFuture existingTask = forceDescribeTask.get(); + if (existingTask != null && !existingTask.isDone()) { + logger.debug("Force describe task already exists for semaphore {}", semaphoreName); + return; + } + + Supplier> taskSupplier = () -> + session.describeSemaphore(semaphoreName, describeSemaphoreMode) + .thenApply(result -> { + if (result.isSuccess()) { + logger.debug("Successfully described semaphore {}", semaphoreName); + saveDescription(result.getValue()); + } else { + logger.warn("Failed to describe semaphore {}: {}", semaphoreName, result.getStatus()); + } + return result.getStatus(); + }); + + CompletableFuture newTask = new RetryableTask( + "semaphoreObserverForceDescribe-" + semaphoreName, + taskSupplier, + scheduledExecutor, + retryPolicy + ).execute(); + + if (!forceDescribeTask.compareAndSet(existingTask, newTask)) { + newTask.cancel(true); + logger.debug("Another thread updated force describe task first for semaphore {}", semaphoreName); + } + } + + private synchronized boolean enqueueWatch() { + if (watchTask != null && state.get() == State.STARTED) { + logger.warn("Watch task already exists for semaphore {}", semaphoreName); + return false; + } + + logger.debug("Enqueuing new watch task for semaphore {}", semaphoreName); + CompletableFuture watchRetriedTask = new RetryableTask( + "semaphoreObserverWatchTask-" + semaphoreName, + this::watchSemaphore, + scheduledExecutor, + retryPolicy + ).execute(); + + this.watchTask = watchRetriedTask.thenCompose(status -> { + if (!status.isSuccess()) { + logger.error("Failed to watch semaphore: {} with status: {}", semaphoreName, status); + } + finishWatch(); + return null; + }); + + return true; + } + + private synchronized void finishWatch() { + logger.debug("Finishing watch task for semaphore {}", semaphoreName); + watchTask = null; + if (state.get() == State.STARTED) { + logger.debug("Restarting watch for semaphore {}", semaphoreName); + enqueueWatch(); + } + } + + private CompletableFuture watchSemaphore() { + logger.debug("Starting watch operation for semaphore {}", semaphoreName); + return session.watchSemaphore( + semaphoreName, + describeSemaphoreMode, + watchSemaphoreMode + ).thenCompose(result -> { + Status status = result.getStatus(); + if (!status.isSuccess()) { + logger.warn("Watch operation failed for semaphore {}: {}", semaphoreName, status); + return CompletableFuture.completedFuture(status); + } + + SemaphoreWatcher watcher = result.getValue(); + saveDescription(watcher.getDescription()); + logger.debug("Successfully started watching semaphore {}", semaphoreName); + return watcher.getChangedFuture().thenApply(Result::getStatus); + }); + } + + private void saveDescription(SemaphoreDescription description) { + SemaphoreDescription prev = cachedData.getAndSet(description); + logger.info("Semaphore state changed: {} -> {}", + formatSemaphoreDescription(prev), + formatSemaphoreDescription(description)); + watchDataListenable.notifyListeners(description); + } + + private static String formatSemaphoreDescription(SemaphoreDescription desc) { + if (desc == null) { + return "null"; + } + return String.format("SemaphoreDescription{name='%s', count=%d, limit=%d, owners=%d, waiters=%d}", + desc.getName(), desc.getCount(), desc.getLimit(), + desc.getOwnersList().size(), desc.getWaitersList().size()); + } + + /** + * Returns listenable for semaphore state changes. + */ + public Listenable getWatchDataListenable() { + return watchDataListenable; + } + + /** + * Gets the last observed semaphore state. + * @return last cached semaphore description or null if not available + */ + public @Nullable SemaphoreDescription getCachedData() { + return cachedData.get(); + } + + /** + * Closes the observer and releases all resources. + */ + @Override + public void close() { + state.set(State.CLOSED); + stopTasks(); + } + + private synchronized void stopTasks() { + Future localWatchTask = watchTask; + CompletableFuture describeTask = forceDescribeTask.getAndSet(null); + + if (describeTask != null) { + logger.debug("Cancelling force describe task for semaphore {}", semaphoreName); + describeTask.cancel(true); + } + + if (localWatchTask != null) { + logger.debug("Cancelling watch task for semaphore {}", semaphoreName); + localWatchTask.cancel(true); + watchTask = null; + } + } +} diff --git a/coordination/src/main/java/tech/ydb/coordination/recipes/util/SessionListenableProvider.java b/coordination/src/main/java/tech/ydb/coordination/recipes/util/SessionListenableProvider.java new file mode 100644 index 000000000..293996d19 --- /dev/null +++ b/coordination/src/main/java/tech/ydb/coordination/recipes/util/SessionListenableProvider.java @@ -0,0 +1,15 @@ +package tech.ydb.coordination.recipes.util; + +import tech.ydb.coordination.CoordinationSession; + +/** + * Provides access to a Listenable for session state changes. + */ +public interface SessionListenableProvider { + /** + * Gets the Listenable for session state changes. + * + * @return the Listenable instance for session state changes, never null + */ + Listenable getSessionListenable(); +} diff --git a/coordination/src/test/java/tech/ydb/coordination/AwaitAssert.java b/coordination/src/test/java/tech/ydb/coordination/AwaitAssert.java new file mode 100644 index 000000000..288d84617 --- /dev/null +++ b/coordination/src/test/java/tech/ydb/coordination/AwaitAssert.java @@ -0,0 +1,46 @@ +package tech.ydb.coordination; + +import java.time.Duration; +import java.util.function.BooleanSupplier; + +public class AwaitAssert { + private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(10); + private static final Duration DEFAULT_POLL_INTERVAL = Duration.ofMillis(50); + + private Duration timeout = DEFAULT_TIMEOUT; + private Duration pollInterval = DEFAULT_POLL_INTERVAL; + + public static AwaitAssert await() { + return new AwaitAssert(); + } + + public AwaitAssert timeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + public AwaitAssert pollInterval(Duration pollInterval) { + this.pollInterval = pollInterval; + return this; + } + + public void until(BooleanSupplier condition) { + long endTime = System.currentTimeMillis() + timeout.toMillis(); + + while (System.currentTimeMillis() < endTime) { + if (condition.getAsBoolean()) { + return; + } + + try { + Thread.sleep(pollInterval.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Await interrupted", e); + } + } + + throw new RuntimeException("Condition not met within " + timeout); + } +} + diff --git a/coordination/src/test/java/tech/ydb/coordination/CoordinationSessionBaseMockedTest.java b/coordination/src/test/java/tech/ydb/coordination/CoordinationSessionBaseMockedTest.java new file mode 100644 index 000000000..788433cb9 --- /dev/null +++ b/coordination/src/test/java/tech/ydb/coordination/CoordinationSessionBaseMockedTest.java @@ -0,0 +1,246 @@ +package tech.ydb.coordination; + +import java.time.Duration; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import org.junit.Assert; +import org.junit.Before; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mockito.stubbing.OngoingStubbing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.ydb.coordination.description.SemaphoreDescription; +import tech.ydb.coordination.description.SemaphoreWatcher; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +public class CoordinationSessionBaseMockedTest { + private static final Logger logger = LoggerFactory.getLogger(CoordinationSessionBaseMockedTest.class); + + private final CoordinationClient client = Mockito.mock(CoordinationClient.class); + private final CoordinationSession coordinationSession = Mockito.mock(CoordinationSession.class); + private final SessionMock sessionMock = new SessionMock(); + + @Before + public void beforeEach() { + when(client.createSession(any())).thenReturn(coordinationSession); + + when(coordinationSession.connect()) + .thenReturn( + CompletableFuture.completedFuture( + Status.of(StatusCode.TRANSPORT_UNAVAILABLE) + ) + ); + + doAnswer((InvocationOnMock iom) -> { + Consumer consumer = iom.getArgument(0, Consumer.class); + logger.debug("Add session mock listener={}", consumer); + sessionMock.addListener(consumer); + return null; + }).when(coordinationSession).addStateListener(any()); + + doAnswer((InvocationOnMock iom) -> { + Consumer consumer = iom.getArgument(0, Consumer.class); + logger.debug("Remove session mock listener={}", consumer); + sessionMock.removeListener(consumer); + return null; + }).when(coordinationSession).removeStateListener(any()); + + doAnswer((InvocationOnMock iom) -> { + logger.debug("Get mock state={}", sessionMock.state); + return sessionMock.state; + }).when(coordinationSession).getState(); + } + + public CoordinationSession getCoordinationSession() { + return coordinationSession; + } + + protected Answer> successConnect() { + return (InvocationOnMock iom) -> { + logger.debug("Successful session connect"); + return CompletableFuture.completedFuture( + Status.SUCCESS + ); + }; + } + + protected Answer> failedConnect(StatusCode code) { + return (InvocationOnMock iom) -> { + logger.debug("Failed session connect, code={}", code); + return CompletableFuture.completedFuture( + Status.of(code) + ); + }; + } + + protected Answer>> successAcquire(SemaphoreLease lease) { + return (InvocationOnMock iom) -> { + logger.debug("Success semaphore acquire {}", lease.getSemaphoreName()); + return CompletableFuture.completedFuture( + Result.success(lease) + ); + }; + } + + protected Answer>> statusAcquire(StatusCode statusCode) { + return (InvocationOnMock iom) -> { + logger.debug("Response semaphore acquire with code: {}", statusCode); + return CompletableFuture.completedFuture( + Result.fail(Status.of(statusCode)) + ); + }; + } + + protected Answer>> timeoutAcquire() { + return (InvocationOnMock iom) -> { + logger.debug("Timeout semaphore acquire"); + return CompletableFuture.completedFuture( + Result.fail(Status.of(StatusCode.TIMEOUT)) + ); + }; + } + + protected Answer>> timeoutAcquire(Duration blockDuration) { + return (InvocationOnMock iom) -> { + logger.debug("Block acquire duration={}", blockDuration); + Thread.sleep(blockDuration.toMillis()); + return CompletableFuture.completedFuture( + Result.fail(Status.of(StatusCode.TIMEOUT)) + ); + }; + } + + protected Answer>> lostAcquire() { + return (InvocationOnMock iom) -> { + logger.debug("Lost session during"); + sessionMock.lost(); + return CompletableFuture.completedFuture( + Result.fail(Status.of(StatusCode.TIMEOUT)) + ); + }; + } + + protected LeaseMock lease(String semaphoreName) { + return new LeaseMock( + sessionMock, + semaphoreName, + coordinationSession + ); + } + + protected CoordinationClient getClient() { + return client; + } + + protected SessionMock getSessionMock() { + return sessionMock; + } + + protected class LeaseMock implements SemaphoreLease { + private final SessionMock sessionMock; + private final String name; + private final CoordinationSession session; + private boolean released = false; + private CompletableFuture result = CompletableFuture.completedFuture(null); + + public LeaseMock(SessionMock sessionMock, String name, CoordinationSession session) { + this.sessionMock = sessionMock; + this.name = name; + this.session = session; + } + + @Override + public CoordinationSession getSession() { + return session; + } + + @Override + public String getSemaphoreName() { + return name; + } + + @Override + public CompletableFuture release() { + released = true; + return result; + } + + public LeaseMock failed(Exception exception) { + result = new CompletableFuture<>(); + result.completeExceptionally(exception); + return this; + } + + public void assertReleased() { + Assert.assertTrue(released); + } + } + + protected class SessionMock { + private final Set> listeners = new HashSet<>(); + + private CoordinationSession.State state = CoordinationSession.State.INITIAL; + + public SessionMock() { + } + + private void addListener(Consumer consumer) { + listeners.add(consumer); + } + + private void removeListener(Consumer consumer) { + listeners.remove(consumer); + } + + public OngoingStubbing> connect() { + return when(coordinationSession.connect()); + } + + public OngoingStubbing>> acquireEphemeralSemaphore() { + return when(coordinationSession.acquireEphemeralSemaphore(anyString(), anyBoolean(), any(), any())); + } + + public OngoingStubbing>> describeSemaphore() { + return when(coordinationSession.describeSemaphore(anyString(), any())); + } + + public OngoingStubbing>> watchSemaphore() { + return when(coordinationSession.watchSemaphore(anyString(), any(), any())); + } + + public void connecting() { + changeState(CoordinationSession.State.CONNECTING); + } + + public void connected() { + changeState(CoordinationSession.State.CONNECTED); + } + + public void lost() { + changeState(CoordinationSession.State.LOST); + } + + public void closed() { + changeState(CoordinationSession.State.CLOSED); + } + + private void changeState(CoordinationSession.State newState) { + state = newState; + listeners.forEach(it -> it.accept(newState)); + } + } + +} diff --git a/coordination/src/test/java/tech/ydb/coordination/recipes/election/LeaderElectionIntegrationTest.java b/coordination/src/test/java/tech/ydb/coordination/recipes/election/LeaderElectionIntegrationTest.java new file mode 100644 index 000000000..6d4857fd3 --- /dev/null +++ b/coordination/src/test/java/tech/ydb/coordination/recipes/election/LeaderElectionIntegrationTest.java @@ -0,0 +1,211 @@ +package tech.ydb.coordination.recipes.election; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.ydb.coordination.AwaitAssert; +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.test.junit4.GrpcTransportRule; + +public class LeaderElectionIntegrationTest { + private static final Logger logger = LoggerFactory.getLogger(LeaderElectionIntegrationTest.class); + + @ClassRule + public static final GrpcTransportRule ydbRule = new GrpcTransportRule(); + + private static CoordinationClient client; + + @BeforeClass + public static void init() { + client = CoordinationClient.newClient(ydbRule); + } + + @AfterClass + public static void clean() { + ydbRule.close(); + } + + private LeaderElection getLeaderElector( + String testName, + LeaderElectionListener leaderElectionListener + ) { + return getLeaderElector( + testName, + testName, + testName.getBytes(StandardCharsets.UTF_8), + leaderElectionListener + ); + } + + private LeaderElection getLeaderElector( + String nodePath, + String lockName, + byte[] data, + LeaderElectionListener leaderElectionListener + ) { + client.createNode(nodePath).join().expectSuccess("cannot create coordination path"); + LeaderElection leaderElectorImpl = new LeaderElection( + client, + nodePath, + lockName, + data, + leaderElectionListener + ); + return leaderElectorImpl; + } + + @Test + public void shouldCallTakeLeadershipWhenElected() throws Exception { + AtomicBoolean leadershipTaken = new AtomicBoolean(false); + + String testName = "shouldCallTakeLeadershipWhenElected"; + LeaderElection elector = getLeaderElector(testName, new LeaderElectionListener() { + @Override + public void takeLeadership() throws Exception { + leadershipTaken.set(true); + logger.debug("Leadership is taken"); + } + }); + elector.start(); + elector.requeue(); + + AwaitAssert.await().until(leadershipTaken::get); + Assert.assertTrue(leadershipTaken.get()); + elector.close(); + } + + @Test + public void interruptLeadership_ThenStops() throws Exception { + AtomicBoolean leadershipTaken = new AtomicBoolean(false); + AtomicBoolean interrupted = new AtomicBoolean(false); + + String testName = "interruptLeadership_ThenStops"; + LeaderElection elector = getLeaderElector(testName, new LeaderElectionListener() { + @Override + public void takeLeadership() throws Exception { + try { + logger.debug("Leadership is taken"); + leadershipTaken.set(true); + Thread.sleep(10000); + } catch (InterruptedException e) { + interrupted.set(true); + logger.debug("Leadership is interrupted"); + } + } + }); + elector.start(); + elector.requeue(); + + AwaitAssert.await().until(leadershipTaken::get); + Assert.assertTrue(leadershipTaken.get()); + + elector.interruptLeadership(); + AwaitAssert.await().until(interrupted::get); + Assert.assertFalse(elector.isLeader()); + + elector.close(); + } + + + @Test + public void shouldCallTakeLeadershipAgainAfterRequeue() throws Exception { + AtomicInteger leadershipCount = new AtomicInteger(0); + + String testName = "shouldCallTakeLeadershipAgainAfterRequeue"; + LeaderElection elector = getLeaderElector(testName, new LeaderElectionListener() { + @Override + public void takeLeadership() throws Exception { + leadershipCount.incrementAndGet(); + } + }); + elector.start(); + + elector.requeue(); + AwaitAssert.await().until(() -> leadershipCount.get() == 1); + + elector.requeue(); + AwaitAssert.await().until(() -> leadershipCount.get() == 2); + elector.close(); + } + + @Test + public void shouldTrackParticipantsAndLeader() throws Exception { + String testName = "shouldTrackParticipantsAndLeader"; + + // Create first leader + AtomicBoolean leader1Taken = new AtomicBoolean(false); + LeaderElection elector1 = getLeaderElector(testName, new LeaderElectionListener() { + @Override + public void takeLeadership() throws Exception { + logger.info("Leadership 1 taken"); + leader1Taken.set(true); + Thread.sleep(5000); + logger.info("Leadership 1 ended"); + } + }); + elector1.start(); + elector1.requeue(); + + AwaitAssert.await().until(leader1Taken::get); + + // Check participants and leader + List participants1 = elector1.getParticipants(); + Optional leader1 = elector1.getCurrentLeader(); + logger.info("current leader 1 {}", leader1); + logger.info("current participants 1 {}", participants1); + + Assert.assertEquals(1, participants1.size()); + Assert.assertTrue(leader1.isPresent()); + Assert.assertTrue(leader1.get().isLeader()); + Assert.assertArrayEquals(leader1.get().getData(), testName.getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(participants1.get(0).getSessionId(), leader1.get().getSessionId()); + + // Add second leader + AtomicBoolean leader2Taken = new AtomicBoolean(false); + LeaderElection elector2 = getLeaderElector(testName, new LeaderElectionListener() { + @Override + public void takeLeadership() throws Exception { + logger.info("Leadership 2 taken"); + leader2Taken.set(true); + Thread.sleep(20000); + logger.info("Leadership 2 ended"); + } + }); + elector2.autoRequeue(); + elector2.start(); + + AwaitAssert.await().until(leader2Taken::get); + // Check participants and leader + participants1 = elector1.getParticipants(); + leader1 = elector1.getCurrentLeader(); + logger.info("current leader 1 {}", leader1); + logger.info("current participants 1 {}", participants1); + + List participants2 = elector2.getParticipants(); + Optional leader2 = elector2.getCurrentLeader(); + logger.info("current leader 2 {}", leader2); + logger.info("current participants 2 {}", participants2); + + Assert.assertEquals(participants1, participants2); + Assert.assertEquals(leader1.hashCode(), leader2.hashCode()); + + Assert.assertTrue(leader2Taken.get()); + Assert.assertTrue(elector2.isLeader()); + Assert.assertEquals(elector2.getCurrentLeader().get().getSessionId(), + elector1.getCurrentLeader().get().getSessionId()); + Assert.assertFalse(elector1.isLeader()); + + elector1.close(); + elector2.close(); + } +} diff --git a/coordination/src/test/java/tech/ydb/coordination/recipes/group/GroupMembershipIntegrationTest.java b/coordination/src/test/java/tech/ydb/coordination/recipes/group/GroupMembershipIntegrationTest.java new file mode 100644 index 000000000..c4b414981 --- /dev/null +++ b/coordination/src/test/java/tech/ydb/coordination/recipes/group/GroupMembershipIntegrationTest.java @@ -0,0 +1,135 @@ +package tech.ydb.coordination.recipes.group; + +import java.util.List; +import java.util.concurrent.Executors; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.ydb.common.retry.RetryForever; +import tech.ydb.coordination.AwaitAssert; +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.test.junit4.GrpcTransportRule; + +public class GroupMembershipIntegrationTest { + private static final Logger logger = LoggerFactory.getLogger(GroupMembershipIntegrationTest.class); + + @ClassRule + public static final GrpcTransportRule ydbRule = new GrpcTransportRule(); + + private static CoordinationClient client; + + @BeforeClass + public static void init() { + client = CoordinationClient.newClient(ydbRule); + } + + @AfterClass + public static void clean() { + ydbRule.close(); + } + + private GroupMembership getGroupMembership(String testName) { + return getGroupMembership(testName, testName, testName.getBytes()); + } + + private GroupMembership getGroupMembership( + String coordinationNodePath, + String groupName, + byte[] data + ) { + client.createNode(coordinationNodePath).join().expectSuccess( + "cannot create coordination node on path: " + coordinationNodePath + ); + return new GroupMembership( + client, + coordinationNodePath, + groupName, + data + ); + } + + private GroupMembership getGroupMembershipCustom( + String coordinationNodePath, + String groupName, + byte[] data, + GroupMembershipSettings settings + ) { + client.createNode(coordinationNodePath).join().expectSuccess( + "cannot create coordination node on path: " + coordinationNodePath + ); + return new GroupMembership( + client, + coordinationNodePath, + groupName, + data, + settings + ); + } + + @Test + public void successTest() throws Exception { + String testName = "successTest"; + + GroupMembership groupMembership = getGroupMembership(testName); + + groupMembership.getSessionListenable().addListener( + state -> logger.info("State change: " + state) + ); + groupMembership.getMembersListenable().addListener( + groupMembers -> logger.info("Members change: " + groupMembers) + ); + + groupMembership.start(); + + AwaitAssert.await().until(() -> { + if (groupMembership.getCurrentMembers() == null) { + return false; + } + return groupMembership.getCurrentMembers().size() == 1; + }); + + groupMembership.close(); + } + + @Test + public void everyTest() throws Exception { + String testName = "everyTest"; + + GroupMembership groupMembership = getGroupMembershipCustom( + testName, + testName, + testName.getBytes(), + GroupMembershipSettings.newBuilder() + .withRetryPolicy(new RetryForever(100)) + .withScheduledExecutor(Executors.newSingleThreadScheduledExecutor()) + .build() + ); + groupMembership.start(); + + + AwaitAssert.await().until(() -> { + if (groupMembership.getCurrentMembers() == null) { + return false; + } + return groupMembership.getCurrentMembers().size() == 1; + }); + + List currentMembers = groupMembership.getCurrentMembers(); + GroupMember groupMember1 = currentMembers.get(0); + logger.info(groupMember1.toString()); + + Assert.assertEquals(1L, groupMember1.getSessionId()); + Assert.assertArrayEquals(groupMember1.getData(), testName.getBytes()); + GroupMember groupMember2 = new GroupMember(1L, testName.getBytes()); + Assert.assertEquals(groupMember1, groupMember2); + Assert.assertEquals(groupMember1.hashCode(), groupMember2.hashCode()); + + groupMembership.close(); + } + +} diff --git a/coordination/src/test/java/tech/ydb/coordination/recipes/locks/InterProcessMutexIntegrationTest.java b/coordination/src/test/java/tech/ydb/coordination/recipes/locks/InterProcessMutexIntegrationTest.java new file mode 100644 index 000000000..b8cc068b9 --- /dev/null +++ b/coordination/src/test/java/tech/ydb/coordination/recipes/locks/InterProcessMutexIntegrationTest.java @@ -0,0 +1,250 @@ +package tech.ydb.coordination.recipes.locks; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.coordination.recipes.locks.exception.LockAlreadyAcquiredException; +import tech.ydb.test.junit4.GrpcTransportRule; + +public class InterProcessMutexIntegrationTest { + private static final Logger logger = LoggerFactory.getLogger(InternalLockMockedTest.class); + + @ClassRule + public static final GrpcTransportRule ydbRule = new GrpcTransportRule(); + + private static CoordinationClient client; + + @BeforeClass + public static void init() { + client = CoordinationClient.newClient(ydbRule); + } + + @AfterClass + public static void clean() { + ydbRule.close(); + } + + private InterProcessMutex getInterProcessMutex() { + return getInterProcessMutex(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + + private InterProcessMutex getInterProcessMutex(String testName) { + return getInterProcessMutex(testName, testName); + } + + private InterProcessMutex getInterProcessMutex(String nodePath, String lockName) { + client.createNode(nodePath).join().expectSuccess("cannot create coordination path"); + InterProcessMutex lock = new InterProcessMutex( + client, + nodePath, + lockName, + InterProcessMutexSettings.newBuilder() + .withWaitConnection(true) + .build() + ); + return lock; + } + + /** + * Asserts that code does not throw any exceptions + */ + @Test(timeout = 10000) + public void simpleLockTest() throws Exception { + InterProcessMutex lock = getInterProcessMutex(); + + lock.acquire(); + Assert.assertTrue(lock.isAcquiredInThisProcess()); + Thread.sleep(100); + lock.release(); + } + + /** + * Asserts that code does not throw any exceptions + */ + @Test(timeout = 10000) + public void sessionListenerTest() throws Exception { + InterProcessMutex lock = getInterProcessMutex(); + + Consumer syncListener = state -> logger.info("Recieved sync state change: " + state); + Consumer asyncListener = state -> logger.info("Recieved async state change: " + state); + + lock.getSessionListenable().addListener(syncListener); + // try add listener twice + lock.getSessionListenable().addListener(syncListener); + + lock.getSessionListenable().addListener(asyncListener, Executors.newSingleThreadExecutor()); + // try add listener twice + lock.getSessionListenable().addListener(asyncListener, Executors.newSingleThreadExecutor()); + + lock.acquire(); + Assert.assertTrue(lock.isAcquiredInThisProcess()); + Thread.sleep(100); + lock.release(); + lock.close(); + + lock.getSessionListenable().removeListener(syncListener); + lock.getSessionListenable().removeListener(asyncListener); + } + + /** + * Asserts that code does not throw any exceptions + */ + @Test(timeout = 10000) + public void tryLockTest() throws Exception { + String testName = "tryLockTest"; + InterProcessMutex lock1 = getInterProcessMutex(testName); + InterProcessMutex lock2 = getInterProcessMutex(testName); + + lock1.acquire(); + Assert.assertTrue(lock1.isAcquiredInThisProcess()); + + Assert.assertFalse(lock2.acquire(Duration.ofMillis(100))); + Assert.assertFalse(lock2.isAcquiredInThisProcess()); + } + + /** + * Asserts that there is no data race around counter that is protected by distributed lock + * When locksN sessions tries to acquire lock at the same time + */ + @Test(timeout = 10000) + public void concurrentLockTest() { + String testName = "concurrentLockTest"; + // given + ExecutorService executor = Executors.newFixedThreadPool(2); + int cycles = 5; + int locksN = 5; + + List locks = new ArrayList<>(locksN); + for (int i = 0; i < locksN; i++) { + locks.add(getInterProcessMutex(testName)); + } + + AtomicInteger counter = new AtomicInteger(0); + + // when + List> tasks = locks.stream().map(lock -> + (Callable) () -> { + for (int i = 0; i < cycles; i++) { + lock.acquire(); + int start = counter.get(); + logger.debug("Lock acquired, cycle = {}, count = {}", i, start); + Thread.sleep(100); + counter.set(start + 1); + logger.debug("Lock released, cycle = {}", i); + lock.release(); + } + return null; + } + ).collect(Collectors.toList()); + + try { + List> futures = executor.invokeAll(tasks); + futures.forEach(future -> { + try { + future.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } catch (Exception ignored) { + } + + // then + Assert.assertEquals(cycles * locksN, counter.get()); + + executor.shutdown(); + } + + @Test(timeout = 10000) + public void reentrantLockTest() throws Exception { + InterProcessMutex lock = getInterProcessMutex(); + + // first acquire + lock.acquire(); + Assert.assertTrue(lock.isAcquiredInThisProcess()); + + // try to acquire the same lock + Assert.assertThrows(LockAlreadyAcquiredException.class, lock::acquire); + Assert.assertTrue(lock.isAcquiredInThisProcess()); + Assert.assertTrue(lock.release()); + } + + @Test(timeout = 10000) + public void longWaitTimeoutTest() throws Exception { + String testName = "reentrantLockTest"; + InterProcessMutex lock1 = getInterProcessMutex(testName); + InterProcessMutex lock2 = getInterProcessMutex(testName); + + lock1.acquire(); + Assert.assertFalse(lock2.acquire(Duration.ofMillis(10))); + } + + @Test(timeout = 10000) + public void releaseNotAcquiredLockTest() throws Exception { + InterProcessMutex lock = getInterProcessMutex(); + + Assert.assertFalse(lock.release()); + } + + @Test(timeout = 10000) + public void concurrentReleaseTest() throws Exception { + ExecutorService executor = Executors.newFixedThreadPool(2); + InterProcessMutex lock = getInterProcessMutex(); + + lock.acquire(); + + Future future1 = executor.submit(lock::release); + Future future2 = executor.submit(lock::release); + + // Only one concurrent release must be successful + Assert.assertTrue(future1.get() ^ future2.get()); + + executor.shutdown(); + } + +} + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/coordination/src/test/java/tech/ydb/coordination/recipes/locks/InternalLockMockedTest.java b/coordination/src/test/java/tech/ydb/coordination/recipes/locks/InternalLockMockedTest.java new file mode 100644 index 000000000..a35cadcf0 --- /dev/null +++ b/coordination/src/test/java/tech/ydb/coordination/recipes/locks/InternalLockMockedTest.java @@ -0,0 +1,518 @@ +package tech.ydb.coordination.recipes.locks; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.junit.Assert; +import org.junit.Test; +import tech.ydb.coordination.CoordinationSessionBaseMockedTest; +import tech.ydb.coordination.recipes.locks.exception.LockAcquireFailedException; +import tech.ydb.coordination.recipes.locks.exception.LockAlreadyAcquiredException; +import tech.ydb.coordination.recipes.locks.exception.LockReleaseFailedException; +import tech.ydb.core.StatusCode; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class InternalLockMockedTest extends CoordinationSessionBaseMockedTest { + + /* + Части функционала: + tryAcquire + isAcquired (deadline, data, exclusive) + release + isAcquired (уже освобожден, еще не захвачен?) + + listeners + listeners + reconnection when session is interrupted + + session start + close - (освобождает ресурсы) + + Внешние эффекты: + 1) состояние сессии (изначальное, в процессе) + 1.1) Дожидается открытия соединения + 1.2) Разрыв соединения (LOST, CLOSED) - возвращает сразу ошибку + 2) ответы на запросы + 2.1) При ошибке ретраит то что можно + 2.2) Кидает ошибку, если не ретраится + 3) прерывание потока + 4) контракт: вызвать 2 раза, 0 раз + + acquire: + Критерии корректности: + 1) lock.isAcquired - нету нарушения внутреннего состояния (при успехе и не успехе) + 2) был успешный вызов session.isAcquired + + Тестовые кейсы: + 1) Все ОК - successAcquireTest + 2) Сессия сразу разорвана - failedAcquireOnLostSessionTest + 3) Сессия рвется в процессе захвата - failedAcquireDuringSessionLostTest + 4) Сессия рвется после захвата - successAcquireThenReleasedOnLostSessionTest + 4) При ответе ретраит - acquireRetriedOnRetryableTest + 5) При ответе ошибки кидает ошибку - acquireFailsOnNonRetryableErrorTest + 6) На блокировке может быть прервано - acquireInterruptedTest + 7) Уже захвачен - acquireFailsAlreadyAcquiredTest + 8) Все ОК (с таймаутом) - successTryAcquireTest + 9) Таймаут вышел - tryAcquireFailsTimeoutExceededTest + + release: + Критерии корректности: + 1) lock.isAcquired - нету нарушения внутреннего состояния (при успехе и не успехе) + 2) был успешный вызов releaseSemaphore + + Тестовые кейсы: + 1) Все ОК (сессия, захват, освобождение) - release_RespondedStatusSuccess_ReleasedLock + 2) Семафор не был даже захвачен - release_NoCurrentLease_ReturnedFalse + 5) Вызван поверх LOST сессии - release_AlreadySessionLost_ThrowsLockReleaseFailedException + 3) Порвалась сессия в процессе освобождения - release_DuringSessionLost_ThrowsLockReleaseFailedException + 4) Корректно обрабатывается InterruptedException - release_Interrupted_StateConsistent + + */ + + @Test + public void acquire_RespondedStatusSuccess_AcquiresLock() throws Exception { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + + LockInternals lock = new LockInternals( + getCoordinationSession(), + "lock_name" + ); + sessionMock.connected(); + + LeaseMock lease = lease("lock_name"); + sessionMock.acquireEphemeralSemaphore() + .then(successAcquire(lease)); + + LockInternals.LeaseData leaseData = lock.tryAcquire(null, false, null); + Assert.assertNotNull(leaseData); + Assert.assertFalse(leaseData.isExclusive()); + Assert.assertTrue(lock.isAcquired()); + verify(getCoordinationSession()) + .acquireEphemeralSemaphore( + eq("lock_name"), + eq(false), + isNull(), + any() + ); + + lock.close(); + } + + @Test + public void acquire_WithCustomData_PropagatesToSession() throws Exception { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect().then(successConnect()); + + LockInternals lock = new LockInternals( + getCoordinationSession(), + "lock_name" + ); + sessionMock.connected(); + + LeaseMock lease = lease("lock_name"); + sessionMock.acquireEphemeralSemaphore() + .then(successAcquire(lease)); + + byte[] testData = "test_payload".getBytes(StandardCharsets.UTF_8); + LockInternals.LeaseData leaseData = lock.tryAcquire(null, true, testData); + + Assert.assertNotNull(leaseData); + Assert.assertTrue(leaseData.isExclusive()); + verify(getCoordinationSession()).acquireEphemeralSemaphore( + eq("lock_name"), + eq(true), // exclusive + eq(testData), + any() // deadline + ); + } + + + @Test + public void acquire_LostSession_ThrowsLockAcquireFailedException() { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + + LockInternals lock = new LockInternals( + getCoordinationSession(), + "lock_name" + ); + + sessionMock.lost(); + + Assert.assertThrows( + LockAcquireFailedException.class, + () -> lock.tryAcquire(null, false, null) + ); + Assert.assertFalse(lock.isAcquired()); + } + + @Test + public void acquire_SessionLostDuringBlock_ThrowsLockAcquireFailedException() { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + + LockInternals lock = new LockInternals( + getCoordinationSession(), + "lock_name" + ); + sessionMock.connected(); + + sessionMock.acquireEphemeralSemaphore() + .then(timeoutAcquire()) + .then(lostAcquire()) + .then(successAcquire(lease("lock_name"))); // never reached + + Assert.assertThrows( + LockAcquireFailedException.class, + () -> lock.tryAcquire(null, false, null) + ); + Assert.assertFalse(lock.isAcquired()); + verify(getCoordinationSession(), times(2)) + .acquireEphemeralSemaphore( + eq("lock_name"), + eq(false), + isNull(), + any() + ); + } + + @Test + public void acquire_RespondedSuccessStatusThenLostSession_ReleasedLock() throws Exception { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + + LockInternals lock = new LockInternals( + getCoordinationSession(), + "lock_name" + ); + + sessionMock.connecting(); + sessionMock.connected(); + + sessionMock.acquireEphemeralSemaphore() + .then(successAcquire(lease("lock_name"))); + + LockInternals.LeaseData leaseData = lock.tryAcquire(null, false, null); + Assert.assertNotNull(leaseData); + Assert.assertFalse(leaseData.isExclusive()); + Assert.assertTrue(lock.isAcquired()); + verify(getCoordinationSession()) + .acquireEphemeralSemaphore( + eq("lock_name"), + eq(false), + isNull(), + any() + ); + + sessionMock.lost(); + Assert.assertFalse(lock.isAcquired()); + } + + @Test + public void acquire_RespondedRetryableStatusThenSuccessStatus_AcquiredLock() throws Exception { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + + LockInternals lock = new LockInternals( + getCoordinationSession(), + "lock_name" + ); + sessionMock.connected(); + + sessionMock.acquireEphemeralSemaphore() + .then(timeoutAcquire()) + .then(statusAcquire(StatusCode.SESSION_BUSY)) + .then(statusAcquire(StatusCode.UNAVAILABLE)) + .then(statusAcquire(StatusCode.TRANSPORT_UNAVAILABLE)) + .then(successAcquire(lease("lock_name"))); + + LockInternals.LeaseData leaseData = lock.tryAcquire(null, false, null); + Assert.assertNotNull(leaseData); + Assert.assertFalse(leaseData.isExclusive()); + Assert.assertTrue(lock.isAcquired()); + verify(getCoordinationSession(), times(5)) + .acquireEphemeralSemaphore( + eq("lock_name"), + eq(false), + isNull(), + any() + ); + ; + } + + @Test + public void acquire_RespondedNonRetryableStatus_ThrowsLockAcquireFailedException() { + StatusCode badStatus = StatusCode.BAD_REQUEST; + + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + + LockInternals lock = new LockInternals( + getCoordinationSession(), + "lock_name" + ); + sessionMock.connected(); + + sessionMock.acquireEphemeralSemaphore() + .then(statusAcquire(badStatus)) + .then(successAcquire(lease("lock_name"))); // never reached + + Assert.assertThrows( + LockAcquireFailedException.class, + () -> lock.tryAcquire(null, false, null) + ); + + Assert.assertFalse(lock.isAcquired()); + verify(getCoordinationSession(), times(1)) + .acquireEphemeralSemaphore( + eq("lock_name"), + eq(false), + isNull(), + any() + ); + } + + @Test + public void acquire_BlockingInterrupted_ThrowsInterruptedException() throws InterruptedException { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + + LockInternals lock = new LockInternals( + getCoordinationSession(), + "lock_name" + ); + sessionMock.connected(); + + sessionMock.acquireEphemeralSemaphore() + .then(timeoutAcquire(Duration.ofSeconds(120))) + .then(successAcquire(lease("lock_name"))); // never reached + + CountDownLatch latch = new CountDownLatch(1); + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future future = executor.submit(() -> { + latch.countDown(); + Assert.assertThrows( + InterruptedException.class, + () -> lock.tryAcquire(null, false, null) + ); + }); + + Thread.sleep(20); + latch.await(); + future.cancel(true); + + Assert.assertFalse(lock.isAcquired()); + verify(getCoordinationSession(), times(1)) + .acquireEphemeralSemaphore( + eq("lock_name"), + eq(false), + isNull(), + any() + ); + } + + @Test + public void acquire_CallAcquireTwice_ThrowsLockAlreadyAcquiredException() throws Exception { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + + LockInternals lock = new LockInternals( + getCoordinationSession(), + "lock_name" + ); + sessionMock.connected(); + + sessionMock.acquireEphemeralSemaphore() + .then(successAcquire(lease("lock_name"))) + .then(statusAcquire(StatusCode.BAD_REQUEST)); // never reached + + LockInternals.LeaseData leaseData = lock.tryAcquire(null, false, null); + Assert.assertNotNull(leaseData); + Assert.assertFalse(leaseData.isExclusive()); + Assert.assertTrue(lock.isAcquired()); + + verify(getCoordinationSession(), times(1)) + .acquireEphemeralSemaphore(any(), anyBoolean(), any(), any()); + + Assert.assertThrows( + LockAlreadyAcquiredException.class, + () -> lock.tryAcquire(null, false, null) + ); + Assert.assertTrue(lock.isAcquired()); + verify(getCoordinationSession(), times(1)) + .acquireEphemeralSemaphore( + eq("lock_name"), + eq(false), + isNull(), + any() + ); + } + + @Test + public void acquireWithTimeout_RespondedSuccessStatus_AcquiredLock() throws Exception { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + + LockInternals lock = new LockInternals( + getCoordinationSession(), + "lock_name" + ); + sessionMock.connected(); + + LeaseMock lease = lease("lock_name"); + sessionMock.acquireEphemeralSemaphore() + .then(timeoutAcquire(Duration.ofMillis(100))) + .then(successAcquire(lease)); + + Instant deadline = Instant.now().plus(Duration.ofMillis(1000)); + LockInternals.LeaseData leaseData = lock.tryAcquire(deadline, false, null); + + Assert.assertNotNull(leaseData); + Assert.assertFalse(leaseData.isExclusive()); + Assert.assertTrue(lock.isAcquired()); + verify(getCoordinationSession(), times(2)) + .acquireEphemeralSemaphore( + eq("lock_name"), + eq(false), + isNull(), + any() + ); + } + + @Test + public void acquireWithTimeout_ResponseTimeout_ReturnsFalse() throws Exception { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + + LockInternals lock = new LockInternals( + getCoordinationSession(), + "lock_name" + ); + sessionMock.connected(); + + LeaseMock lease = lease("lock_name"); + sessionMock.acquireEphemeralSemaphore() + .then(timeoutAcquire(Duration.ofMillis(100))) + .then(successAcquire(lease)); // never reaches + + Instant deadline = Instant.now().plus(Duration.ofMillis(10)); + LockInternals.LeaseData leaseData = lock.tryAcquire(deadline, false, null); + + Assert.assertNull(leaseData); + Assert.assertFalse(lock.isAcquired()); + verify(getCoordinationSession(), times(1)) + .acquireEphemeralSemaphore( + eq("lock_name"), + eq(false), + isNull(), + any() + ); + } + + @Test + public void release_RespondedStatusSuccess_ReleasedLock() throws Exception { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + + LockInternals lock = new LockInternals( + getCoordinationSession(), + "lock_name" + ); + sessionMock.connected(); + + LeaseMock lease = lease("lock_name"); + sessionMock.acquireEphemeralSemaphore() + .then(timeoutAcquire(Duration.ofMillis(100))) + .then(successAcquire(lease)); + + LockInternals.LeaseData leaseData = lock.tryAcquire(null, false, null); + Assert.assertNotNull(leaseData); + Assert.assertFalse(leaseData.isExclusive()); + Assert.assertTrue(lock.isAcquired()); + + Assert.assertTrue(lock.release()); + Assert.assertFalse(lock.isAcquired()); + lease.assertReleased(); + } + + @Test + public void release_NoCurrentLease_ReturnedFalse() throws Exception { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + + LockInternals lock = new LockInternals( + getCoordinationSession(), + "lock_name" + ); + sessionMock.connected(); + + Assert.assertFalse(lock.release()); + Assert.assertFalse(lock.isAcquired()); + } + + @Test + public void release_AlreadySessionLost_ThrowsLockReleaseFailedException() throws Exception { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + + LockInternals lock = new LockInternals( + getCoordinationSession(), + "lock_name" + ); + + sessionMock.lost(); + + Assert.assertThrows(LockReleaseFailedException.class, lock::release); + Assert.assertFalse(lock.isAcquired()); + } + + @Test + public void release_DuringSessionLost_ThrowsLockReleaseFailedException() throws Exception { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + + LockInternals lock = new LockInternals( + getCoordinationSession(), + "lock_name" + ); + sessionMock.connected(); + + LeaseMock lease = lease("lock_name") + .failed(new IllegalStateException()); + sessionMock.acquireEphemeralSemaphore() + .then(timeoutAcquire(Duration.ofMillis(100))) + .then(successAcquire(lease)); + + LockInternals.LeaseData leaseData = lock.tryAcquire(null, false, null); + Assert.assertNotNull(leaseData); + Assert.assertFalse(leaseData.isExclusive()); + Assert.assertTrue(lock.isAcquired()); + + Assert.assertThrows(LockReleaseFailedException.class, lock::release); + lease.assertReleased(); + } + +} diff --git a/coordination/src/test/java/tech/ydb/coordination/recipes/locks/ReadWriteInterProcessLockIntegrationTest.java b/coordination/src/test/java/tech/ydb/coordination/recipes/locks/ReadWriteInterProcessLockIntegrationTest.java new file mode 100644 index 000000000..be3717451 --- /dev/null +++ b/coordination/src/test/java/tech/ydb/coordination/recipes/locks/ReadWriteInterProcessLockIntegrationTest.java @@ -0,0 +1,333 @@ +package tech.ydb.coordination.recipes.locks; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.ydb.coordination.CoordinationClient; +import tech.ydb.coordination.CoordinationSession; +import tech.ydb.test.junit4.GrpcTransportRule; + +public class ReadWriteInterProcessLockIntegrationTest { + private static final Logger logger = LoggerFactory.getLogger(ReadWriteInterProcessLockIntegrationTest.class); + + @ClassRule + public static final GrpcTransportRule ydbRule = new GrpcTransportRule(); + + private static CoordinationClient client; + + @BeforeClass + public static void init() { + client = CoordinationClient.newClient(ydbRule); + } + + @AfterClass + public static void clean() { + ydbRule.close(); + } + + /** + * Asserts that code does not throw any exceptions for basic write lock operations + */ + @Test + public void simpleWriteLockTest() throws Exception { + ReadWriteInterProcessLock rwLock = getReadWriteLock(); + + Consumer listener = state -> logger.info("New state: " + state); + rwLock.writeLock().getSessionListenable().addListener(listener); + + rwLock.writeLock().acquire(); + Assert.assertTrue(rwLock.writeLock().isAcquiredInThisProcess()); + Thread.sleep(100); + rwLock.writeLock().release(); + + rwLock.close(); + rwLock.writeLock().getSessionListenable().removeListener(listener); + } + + /** + * Asserts that code does not throw any exceptions for basic read lock operations + */ + @Test + public void simpleReadLockTest() throws Exception { + ReadWriteInterProcessLock rwLock = getReadWriteLock(); + + rwLock.readLock().acquire(); + Assert.assertTrue(rwLock.readLock().isAcquiredInThisProcess()); + Thread.sleep(100); + rwLock.readLock().release(); + + rwLock.close(); + } + + /** + * Tests that write lock is exclusive (only one can acquire it) + */ + @Test + public void writeLockExclusivityTest() throws Exception { + String coordinationNodePath = "writeLockExclusivityTest"; + String lockName = "writeLockExclusivityTest"; + ReadWriteInterProcessLock lock1 = getReadWriteLock(coordinationNodePath, lockName); + ReadWriteInterProcessLock lock2 = getReadWriteLock(coordinationNodePath, lockName); + + lock1.writeLock().acquire(); + Assert.assertTrue(lock1.writeLock().isAcquiredInThisProcess()); + + Assert.assertFalse(lock2.writeLock().acquire(Duration.ofMillis(100))); + Assert.assertFalse(lock2.writeLock().isAcquiredInThisProcess()); + + lock1.close(); + lock2.close(); + } + + /** + * Tests that multiple read locks can be acquired simultaneously + */ + @Test + public void readLockSharedAccessTest() throws Exception { + String coordinationNodePath = "readLockSharedAccessTest"; + String lockName = "readLockSharedAccessTest"; + ReadWriteInterProcessLock lock1 = getReadWriteLock(coordinationNodePath, lockName); + ReadWriteInterProcessLock lock2 = getReadWriteLock(coordinationNodePath, lockName); + + lock1.readLock().acquire(); + Assert.assertTrue(lock1.readLock().isAcquiredInThisProcess()); + + Assert.assertTrue(lock2.readLock().acquire(Duration.ofMillis(100))); + Assert.assertTrue(lock2.readLock().isAcquiredInThisProcess()); + + lock1.close(); + lock2.close(); + } + + + /** + * Tests that write lock cannot be acquired while read lock is held + */ + @Test + public void writeLockBlockedByReadLockTest() throws Exception { + String coordinationNodePath = "writeLockBlockedByReadLockTest"; + String lockName = "writeLockBlockedByReadLockTest"; + ReadWriteInterProcessLock lock1 = getReadWriteLock(coordinationNodePath, lockName); + ReadWriteInterProcessLock lock2 = getReadWriteLock(coordinationNodePath, lockName); + + lock1.readLock().acquire(); + Assert.assertTrue(lock1.readLock().isAcquiredInThisProcess()); + + Assert.assertFalse(lock2.writeLock().acquire(Duration.ofMillis(100))); + Assert.assertFalse(lock2.writeLock().isAcquiredInThisProcess()); + + lock1.close(); + lock2.close(); + } + + /** + * Tests that read lock cannot be acquired while write lock is held + */ + @Test + public void readLockBlockedByWriteLockTest() throws Exception { + String coordinationNodePath = "readLockBlockedByWriteLockTest"; + String lockName = "readLockBlockedByWriteLockTest"; + ReadWriteInterProcessLock lock1 = getReadWriteLock(coordinationNodePath, lockName); + ReadWriteInterProcessLock lock2 = getReadWriteLock(coordinationNodePath, lockName); + + lock1.writeLock().acquire(); + Assert.assertTrue(lock1.writeLock().isAcquiredInThisProcess()); + + Assert.assertFalse(lock2.readLock().acquire(Duration.ofMillis(100))); + Assert.assertFalse(lock2.readLock().isAcquiredInThisProcess()); + + lock1.close(); + lock2.close(); + } + + /** + * Concurrent test for write locks (should be exclusive) + */ + @Test + public void concurrentWriteLockTest() { + // given + ExecutorService executor = Executors.newFixedThreadPool(2); + int cycles = 10; + int locksN = 10; + + String nodePath = UUID.randomUUID().toString(); + String lockName = UUID.randomUUID().toString(); + List locks = new ArrayList<>(locksN); + for (int i = 0; i < locksN; i++) { + locks.add(getReadWriteLock(nodePath, lockName)); + } + + AtomicInteger counter = new AtomicInteger(0); + + // when + List> tasks = locks.stream().map(lock -> + (Callable) () -> { + for (int i = 0; i < cycles; i++) { + lock.writeLock().acquire(); + int start = counter.get(); + logger.debug("Write lock acquired, cycle = {}, count = {}", i, start); + Thread.sleep(100); + counter.set(start + 1); // not an atomic increment + logger.debug("Write lock released, cycle = {}", i); + lock.writeLock().release(); + } + return null; + } + ).collect(Collectors.toList()); + + try { + List> futures = executor.invokeAll(tasks); + futures.forEach(future -> { + try { + future.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } catch (Exception ignored) { + } + + // then + Assert.assertEquals(cycles * locksN, counter.get()); + executor.shutdown(); + } + + /** + * Mixed read-write lock test with more complex scenarios + */ + @Test + public void mixedReadWriteLockTest() throws Exception { + String coordinationNodePath = "mixedReadWriteLockTest"; + String lockName = "mixedReadWriteLockTest"; + ReadWriteInterProcessLock lock1 = getReadWriteLock(coordinationNodePath, lockName); + ReadWriteInterProcessLock lock2 = getReadWriteLock(coordinationNodePath, lockName); + ReadWriteInterProcessLock lock3 = getReadWriteLock(coordinationNodePath, lockName); + + // 1. Test read lock sharing + lock1.readLock().acquire(); + Assert.assertTrue(lock1.readLock().isAcquiredInThisProcess()); + + // Second read lock should be allowed + Assert.assertTrue(lock2.readLock().acquire(Duration.ofMillis(100))); + Assert.assertTrue(lock2.readLock().isAcquiredInThisProcess()); + + // Write lock should be blocked while read locks are held + Assert.assertFalse(lock3.writeLock().acquire(Duration.ofMillis(100))); + Assert.assertFalse(lock3.writeLock().isAcquiredInThisProcess()); + + // 2. Release read locks and test write lock exclusivity + lock1.readLock().release(); + lock2.readLock().release(); + + // Now write lock should be acquirable + Assert.assertTrue(lock3.writeLock().acquire(Duration.ofMillis(100))); + Assert.assertTrue(lock3.writeLock().isAcquiredInThisProcess()); + + // Read locks should be blocked while write lock is held + Assert.assertFalse(lock1.readLock().acquire(Duration.ofMillis(100))); + Assert.assertFalse(lock1.readLock().isAcquiredInThisProcess()); + Assert.assertFalse(lock2.readLock().acquire(Duration.ofMillis(100))); + Assert.assertFalse(lock2.readLock().isAcquiredInThisProcess()); + + // 3. Release write lock and test read lock acquisition again + lock3.writeLock().release(); + + // Read locks should be acquirable again + Assert.assertTrue(lock1.readLock().acquire(Duration.ofMillis(100))); + Assert.assertTrue(lock1.readLock().isAcquiredInThisProcess()); + Assert.assertTrue(lock2.readLock().acquire(Duration.ofMillis(100))); + Assert.assertTrue(lock2.readLock().isAcquiredInThisProcess()); + + // 4. Test write lock waiting for read locks to be released + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future writeLockFuture = executor.submit(() -> { + // This should block until read locks are released + return lock3.writeLock().acquire(Duration.ofSeconds(2)); + }); + + // Wait a bit to ensure write lock is waiting + Thread.sleep(100); + Assert.assertFalse(writeLockFuture.isDone()); + + // Release read locks + lock1.readLock().release(); + lock2.readLock().release(); + + // Now write lock should be acquired + Assert.assertTrue(writeLockFuture.get(1, TimeUnit.SECONDS)); + Assert.assertTrue(lock3.writeLock().isAcquiredInThisProcess()); + + // Cleanup + executor.shutdown(); + + lock1.close(); + lock2.close(); + lock3.close(); + } + + /** + * Test re-acquisition after release + */ + @Test + public void reacquisitionTest() throws Exception { + ReadWriteInterProcessLock rwLock = getReadWriteLock(); + + // 1. Test write lock re-acquisition + rwLock.writeLock().acquire(); + Assert.assertTrue(rwLock.writeLock().isAcquiredInThisProcess()); + rwLock.writeLock().release(); + Assert.assertFalse(rwLock.writeLock().isAcquiredInThisProcess()); + + // Should be able to acquire again + Assert.assertTrue(rwLock.writeLock().acquire(Duration.ofMillis(100))); + Assert.assertTrue(rwLock.writeLock().isAcquiredInThisProcess()); + rwLock.writeLock().release(); + + // 2. Test read lock re-acquisition + rwLock.readLock().acquire(); + Assert.assertTrue(rwLock.readLock().isAcquiredInThisProcess()); + rwLock.readLock().release(); + Assert.assertFalse(rwLock.readLock().isAcquiredInThisProcess()); + + // Should be able to acquire again + Assert.assertTrue(rwLock.readLock().acquire(Duration.ofMillis(100))); + Assert.assertTrue(rwLock.readLock().isAcquiredInThisProcess()); + rwLock.readLock().release(); + + rwLock.close(); + } + + private ReadWriteInterProcessLock getReadWriteLock() { + return getReadWriteLock(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + + private ReadWriteInterProcessLock getReadWriteLock(String nodePath, String lockName) { + client.createNode(nodePath).join().expectSuccess("cannot create coordination path"); + return new ReadWriteInterProcessLock( + client, + nodePath, + lockName, + ReadWriteInterProcessLockSettings.newBuilder() + .withWaitConnection(true) + .build() + ); + } + +} diff --git a/coordination/src/test/java/tech/ydb/coordination/recipes/util/RetryableTaskTest.java b/coordination/src/test/java/tech/ydb/coordination/recipes/util/RetryableTaskTest.java new file mode 100644 index 000000000..a225951e9 --- /dev/null +++ b/coordination/src/test/java/tech/ydb/coordination/recipes/util/RetryableTaskTest.java @@ -0,0 +1,137 @@ +package tech.ydb.coordination.recipes.util; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import tech.ydb.common.retry.RetryPolicy; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.class) +public class RetryableTaskTest { + @Mock + private Supplier> taskSupplier; + + @Mock + private ScheduledExecutorService executor; + + @Mock + private RetryPolicy retryPolicy; + + private RetryableTask retryableTask; + private final String taskName = "testTask"; + + @Before + public void setUp() { + retryableTask = new RetryableTask(taskName, taskSupplier, executor, retryPolicy); + } + + @Test + public void testExecute_SuccessOnFirstAttempt() { + Status successStatus = Status.SUCCESS; + CompletableFuture future = CompletableFuture.completedFuture(successStatus); + + when(taskSupplier.get()).thenReturn(future); + + CompletableFuture result = retryableTask.execute(); + + assertTrue(result.isDone()); + assertEquals(successStatus, result.join()); + } + + @Test + public void testExecute_FailureWithRetries() { + Status failureStatus = Status.of(StatusCode.CLIENT_INTERNAL_ERROR); + RuntimeException exception = new RuntimeException("Operation failed"); + + // First attempt fails + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(exception); + + when(taskSupplier.get()) + .thenReturn(failedFuture) + .thenReturn(CompletableFuture.completedFuture(failureStatus)); + + when(retryPolicy.nextRetryMs(anyInt(), anyLong())) + .thenReturn(100L) // First retry after 100ms + .thenReturn(-1L); // No more retries + + CompletableFuture result = retryableTask.execute(); + + // Verify retry was scheduled + verify(executor).schedule(any(Runnable.class), eq(100L), eq(TimeUnit.MILLISECONDS)); + + // Simulate retry execution + retryableTask.attemptTask(result); + + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + } + + @Test + public void testExecute_SuccessAfterRetry() { + Status successStatus = Status.SUCCESS; + RuntimeException exception = new RuntimeException("Temporary failure"); + + // First attempt fails + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(exception); + + when(taskSupplier.get()) + .thenReturn(failedFuture) + .thenReturn(CompletableFuture.completedFuture(successStatus)); + + when(retryPolicy.nextRetryMs(anyInt(), anyLong())) + .thenReturn(0L); // Immediate retry + + CompletableFuture result = retryableTask.execute(); + + // Verify immediate retry was scheduled + verify(executor).execute(any(Runnable.class)); + + // Simulate retry execution + retryableTask.attemptTask(result); + + assertTrue(result.isDone()); + assertEquals(successStatus, result.join()); + } + + @Test + public void testExecute_NoMoreRetries() { + RuntimeException exception = new RuntimeException("Permanent failure"); + + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(exception); + + when(taskSupplier.get()).thenReturn(failedFuture); + when(retryPolicy.nextRetryMs(anyInt(), anyLong())).thenReturn(-1L); // No more retries + + CompletableFuture result = retryableTask.execute(); + + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + } + + @Test + public void testExecute_TaskSupplierThrowsException() { + RuntimeException exception = new RuntimeException("Supplier failure"); + + when(taskSupplier.get()).thenThrow(exception); + when(retryPolicy.nextRetryMs(anyInt(), anyLong())).thenReturn(-1L); // No more retries + + CompletableFuture result = retryableTask.execute(); + + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + } +} diff --git a/coordination/src/test/java/tech/ydb/coordination/recipes/util/SemaphoreObserverMockedTest.java b/coordination/src/test/java/tech/ydb/coordination/recipes/util/SemaphoreObserverMockedTest.java new file mode 100644 index 000000000..bd3a6645d --- /dev/null +++ b/coordination/src/test/java/tech/ydb/coordination/recipes/util/SemaphoreObserverMockedTest.java @@ -0,0 +1,92 @@ +package tech.ydb.coordination.recipes.util; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; + +import org.junit.Test; +import org.mockito.Mockito; +import tech.ydb.common.retry.RetryForever; +import tech.ydb.coordination.CoordinationSessionBaseMockedTest; +import tech.ydb.coordination.description.SemaphoreChangedEvent; +import tech.ydb.coordination.description.SemaphoreDescription; +import tech.ydb.coordination.description.SemaphoreWatcher; +import tech.ydb.coordination.settings.DescribeSemaphoreMode; +import tech.ydb.coordination.settings.WatchSemaphoreMode; +import tech.ydb.core.Result; +import tech.ydb.core.Status; + +public class SemaphoreObserverMockedTest extends CoordinationSessionBaseMockedTest { + + @Test + public void successTest() { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + sessionMock.describeSemaphore() + .thenReturn(CompletableFuture.completedFuture(Mockito.mock(Result.class))); + + Result watchResultMock = Mockito.mock(Result.class); + Mockito.when(watchResultMock.isSuccess()).thenReturn(true); + Mockito.when(watchResultMock.getStatus()).thenReturn(Status.SUCCESS); + SemaphoreWatcher semaphoreWatcher = Mockito.mock(SemaphoreWatcher.class); + Mockito.when(semaphoreWatcher.getDescription()).thenReturn(Mockito.mock(SemaphoreDescription.class)); + Result watchResultEventMock = Mockito.mock(Result.class); + CompletableFuture> watchedEvent = + CompletableFuture.completedFuture(watchResultEventMock); + Mockito.when(semaphoreWatcher.getChangedFuture()).thenReturn(watchedEvent); + Mockito.when(watchResultMock.getValue()).thenReturn(semaphoreWatcher); + + sessionMock.watchSemaphore() + .thenReturn(CompletableFuture.completedFuture(watchResultMock)); + + SemaphoreObserver observer = new SemaphoreObserver( + getCoordinationSession(), + "observable_semaphore", + WatchSemaphoreMode.WATCH_DATA_AND_OWNERS, + DescribeSemaphoreMode.WITH_OWNERS_AND_WAITERS, + new RetryForever(100), + Executors.newSingleThreadScheduledExecutor() + ); + observer.start(); + sessionMock.connected(); + observer.getCachedData(); + observer.close(); + } + + @Test + public void start_alreadyStarted_Error() { + SessionMock sessionMock = getSessionMock(); + sessionMock.connect() + .then(successConnect()); + sessionMock.describeSemaphore() + .thenReturn(CompletableFuture.completedFuture(Mockito.mock(Result.class))); + + Result watchResultMock = Mockito.mock(Result.class); + Mockito.when(watchResultMock.isSuccess()).thenReturn(true); + Mockito.when(watchResultMock.getStatus()).thenReturn(Status.SUCCESS); + SemaphoreWatcher semaphoreWatcher = Mockito.mock(SemaphoreWatcher.class); + Mockito.when(semaphoreWatcher.getDescription()).thenReturn(Mockito.mock(SemaphoreDescription.class)); + Result watchResultEventMock = Mockito.mock(Result.class); + CompletableFuture> watchedEvent = + CompletableFuture.completedFuture(watchResultEventMock); + Mockito.when(semaphoreWatcher.getChangedFuture()).thenReturn(watchedEvent); + Mockito.when(watchResultMock.getValue()).thenReturn(semaphoreWatcher); + + sessionMock.watchSemaphore() + .thenReturn(CompletableFuture.completedFuture(watchResultMock)); + + SemaphoreObserver observer = new SemaphoreObserver( + getCoordinationSession(), + "observable_semaphore", + WatchSemaphoreMode.WATCH_DATA_AND_OWNERS, + DescribeSemaphoreMode.WITH_OWNERS_AND_WAITERS, + new RetryForever(100), + Executors.newSingleThreadScheduledExecutor() + ); + observer.start(); + sessionMock.connected(); + observer.getCachedData(); + observer.close(); + } + +}