Skip to content

Commit 39a2554

Browse files
Snapshot update batching code renames and comments (#127356)
A collection of miscellaneous changes I made while working on understanding the code.
1 parent 7ddc8d9 commit 39a2554

File tree

13 files changed

+164
-99
lines changed

13 files changed

+164
-99
lines changed

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,16 @@ private SnapshotsInProgress(Map<ProjectRepo, ByRepo> entries, Set<String> nodesI
135135

136136
@FixForMultiProject
137137
@Deprecated(forRemoval = true)
138-
public SnapshotsInProgress withUpdatedEntriesForRepo(String repository, List<Entry> updatedEntries) {
139-
return withUpdatedEntriesForRepo(Metadata.DEFAULT_PROJECT_ID, repository, updatedEntries);
138+
public SnapshotsInProgress createCopyWithUpdatedEntriesForRepo(String repository, List<Entry> updatedEntries) {
139+
return createCopyWithUpdatedEntriesForRepo(Metadata.DEFAULT_PROJECT_ID, repository, updatedEntries);
140140
}
141141

142-
public SnapshotsInProgress withUpdatedEntriesForRepo(ProjectId projectId, String repository, List<Entry> updatedEntries) {
142+
public SnapshotsInProgress createCopyWithUpdatedEntriesForRepo(ProjectId projectId, String repository, List<Entry> updatedEntries) {
143143
if (updatedEntries.equals(forRepo(projectId, repository))) {
144+
// No changes to apply, return the current object.
144145
return this;
145146
}
147+
146148
final Map<ProjectRepo, ByRepo> copy = new HashMap<>(this.entries);
147149
final var projectRepo = new ProjectRepo(projectId, repository);
148150
if (updatedEntries.isEmpty()) {
@@ -153,13 +155,14 @@ public SnapshotsInProgress withUpdatedEntriesForRepo(ProjectId projectId, String
153155
} else {
154156
copy.put(projectRepo, new ByRepo(updatedEntries));
155157
}
158+
156159
return new SnapshotsInProgress(copy, nodesIdsForRemoval);
157160
}
158161

159162
public SnapshotsInProgress withAddedEntry(Entry entry) {
160163
final List<Entry> forRepo = new ArrayList<>(forRepo(entry.projectId(), entry.repository()));
161164
forRepo.add(entry);
162-
return withUpdatedEntriesForRepo(entry.projectId(), entry.repository(), forRepo);
165+
return createCopyWithUpdatedEntriesForRepo(entry.projectId(), entry.repository(), forRepo);
163166
}
164167

165168
/**

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.cluster.routing.IndexRoutingTable;
3434
import org.elasticsearch.cluster.routing.RerouteService;
3535
import org.elasticsearch.cluster.routing.ShardRouting;
36+
import org.elasticsearch.cluster.routing.ShardRoutingState;
3637
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3738
import org.elasticsearch.cluster.routing.allocation.FailedShard;
3839
import org.elasticsearch.cluster.routing.allocation.StaleShard;
@@ -634,10 +635,11 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
634635
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(batchExecutionContext.taskContexts().size());
635636
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
636637
final Map<Index, ClusterStateTimeRanges> updatedTimestampRanges = new HashMap<>();
638+
637639
final ClusterState initialState = batchExecutionContext.initialState();
638640
for (var taskContext : batchExecutionContext.taskContexts()) {
639-
final var task = taskContext.getTask();
640-
final StartedShardEntry startedShardEntry = task.getEntry();
641+
final ShardStateAction.StartedShardUpdateTask task = taskContext.getTask();
642+
final StartedShardEntry startedShardEntry = task.getStartedShardEntry();
641643
final Optional<ProjectMetadata> project = initialState.metadata().lookupProject(startedShardEntry.shardId.getIndex());
642644
final ShardRouting matched = project.map(ProjectMetadata::id)
643645
.map(id -> initialState.routingTable(id).getByAllocationId(startedShardEntry.shardId, startedShardEntry.allocationId))
@@ -917,9 +919,16 @@ public int hashCode() {
917919
}
918920
}
919921

922+
/**
923+
* Task that runs on the master node. Handles responding to the request listener with the result of the update request.
924+
* Task is created when the master node receives a data node request to mark a shard as {@link ShardRoutingState#STARTED}.
925+
*
926+
* @param entry Information about the newly sharted shard.
927+
* @param listener Channel listener with which to respond to the data node.
928+
*/
920929
public record StartedShardUpdateTask(StartedShardEntry entry, ActionListener<Void> listener) implements ClusterStateTaskListener {
921930

922-
public StartedShardEntry getEntry() {
931+
public StartedShardEntry getStartedShardEntry() {
923932
return entry;
924933
}
925934

server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1437,7 +1437,7 @@ private interface Batch {
14371437
*
14381438
* @param name The name of the queue, which is mostly useful for debugging.
14391439
*
1440-
* @param priority The priority at which tasks submitted to the queue are executed. Avoid priorites other than {@link Priority#NORMAL}
1440+
* @param priority The priority at which tasks submitted to the queue are executed. Avoid priorities other than {@link Priority#NORMAL}
14411441
* where possible. A stream of higher-priority tasks can starve lower-priority ones from running. Higher-priority tasks
14421442
* should definitely re-use the same {@link MasterServiceTaskQueue} so that they are executed in batches.
14431443
*

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3076,7 +3076,9 @@ private ClusterState updateRepositoryGenerationsIfNecessary(ClusterState state,
30763076
snapshotEntries.add(entry);
30773077
}
30783078
}
3079-
updatedSnapshotsInProgress = changedSnapshots ? snapshotsInProgress.withUpdatedEntriesForRepo(repoName, snapshotEntries) : null;
3079+
updatedSnapshotsInProgress = changedSnapshots
3080+
? snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(repoName, snapshotEntries)
3081+
: null;
30803082
final SnapshotDeletionsInProgress updatedDeletionsInProgress;
30813083
boolean changedDeletions = false;
30823084
final List<SnapshotDeletionsInProgress.Entry> deletionEntries = new ArrayList<>();

server/src/main/java/org/elasticsearch/snapshots/RegisteredPolicySnapshots.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ public static class Builder {
196196
* If the request is from SLM it will contain a key "policy" with an SLM policy name as the value.
197197
* @param snapshotId the snapshotId to potentially add to the registered set
198198
*/
199-
void maybeAdd(Map<String, Object> userMetadata, SnapshotId snapshotId) {
199+
void addIfSnapshotIsSLMInitiated(Map<String, Object> userMetadata, SnapshotId snapshotId) {
200200
final String policy = getPolicyFromMetadata(userMetadata);
201201
if (policy != null) {
202202
snapshots.add(new PolicySnapshot(policy, snapshotId));

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -436,19 +436,23 @@ public void onFailure(Exception e) {
436436
startShardSnapshotTaskRunner.runSyncTasksEagerly(threadPool.executor(ThreadPool.Names.SNAPSHOT));
437437
}
438438

439-
private void pauseShardSnapshotsForNodeRemoval(String localNodeId, SnapshotsInProgress.Entry entry) {
440-
final var localShardSnapshots = shardSnapshots.getOrDefault(entry.snapshot(), Map.of());
439+
/**
440+
* Iterates all the shard snapshots for the given snapshot entry. Any shard snapshots assigned to this data node will be: immediately
441+
* paused, if not already running; or flagged for pausing, if running.
442+
*/
443+
private void pauseShardSnapshotsForNodeRemoval(String localNodeId, SnapshotsInProgress.Entry masterEntryCopy) {
444+
final var localShardSnapshots = shardSnapshots.getOrDefault(masterEntryCopy.snapshot(), Map.of());
441445

442-
for (final Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : entry.shards().entrySet()) {
446+
for (final Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : masterEntryCopy.shards().entrySet()) {
443447
final ShardId shardId = shardEntry.getKey();
444-
final ShardSnapshotStatus masterShardSnapshotStatus = shardEntry.getValue();
448+
final ShardSnapshotStatus masterShardSnapshotStatusCopy = shardEntry.getValue();
445449

446-
if (masterShardSnapshotStatus.state() != ShardState.INIT) {
450+
if (masterShardSnapshotStatusCopy.state() != ShardState.INIT) {
447451
// shard snapshot not currently scheduled by master
448452
continue;
449453
}
450454

451-
if (localNodeId.equals(masterShardSnapshotStatus.nodeId()) == false) {
455+
if (localNodeId.equals(masterShardSnapshotStatusCopy.nodeId()) == false) {
452456
// shard snapshot scheduled on a different node
453457
continue;
454458
}
@@ -457,11 +461,11 @@ private void pauseShardSnapshotsForNodeRemoval(String localNodeId, SnapshotsInPr
457461
if (localShardSnapshotStatus == null) {
458462
// shard snapshot scheduled but not currently running, pause immediately without starting
459463
notifyUnsuccessfulSnapshotShard(
460-
entry.snapshot(),
464+
masterEntryCopy.snapshot(),
461465
shardId,
462466
ShardState.PAUSED_FOR_NODE_REMOVAL,
463467
"paused",
464-
masterShardSnapshotStatus.generation(),
468+
masterShardSnapshotStatusCopy.generation(),
465469
// Shard snapshot never began, so there is no status object to update
466470
(outcomeInfoString) -> {}
467471
);

0 commit comments

Comments
 (0)