Skip to content

Commit 913ba29

Browse files
author
Daniil Zulin
committed
Add base implementation for auto partitioning settings
1 parent 943d5aa commit 913ba29

File tree

7 files changed

+329
-3
lines changed

7 files changed

+329
-3
lines changed

topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,12 @@
3535
import tech.ydb.topic.read.SyncReader;
3636
import tech.ydb.topic.read.impl.AsyncReaderImpl;
3737
import tech.ydb.topic.read.impl.SyncReaderImpl;
38+
import tech.ydb.topic.settings.AlterAutoPartitioningWriteStrategySettings;
3839
import tech.ydb.topic.settings.AlterConsumerSettings;
3940
import tech.ydb.topic.settings.AlterPartitioningSettings;
4041
import tech.ydb.topic.settings.AlterTopicSettings;
42+
import tech.ydb.topic.settings.AutoPartitioningStrategy;
43+
import tech.ydb.topic.settings.AutoPartitioningWriteStrategySettings;
4144
import tech.ydb.topic.settings.CommitOffsetSettings;
4245
import tech.ydb.topic.settings.CreateTopicSettings;
4346
import tech.ydb.topic.settings.DescribeConsumerSettings;
@@ -104,7 +107,19 @@ public CompletableFuture<Status> createTopic(String path, CreateTopicSettings se
104107
if (partitioningSettings != null) {
105108
requestBuilder.setPartitioningSettings(YdbTopic.PartitioningSettings.newBuilder()
106109
.setMinActivePartitions(partitioningSettings.getMinActivePartitions())
107-
.setPartitionCountLimit(partitioningSettings.getPartitionCountLimit()));
110+
.setPartitionCountLimit(partitioningSettings.getPartitionCountLimit())
111+
.setAutoPartitioningSettings(YdbTopic.AutoPartitioningSettings.newBuilder()
112+
.setStrategy(partitioningSettings.getAutoPartitioningStrategy().getProtoReference())));
113+
114+
AutoPartitioningWriteStrategySettings writeStrategySettings = partitioningSettings.getWriteStrategySettings();
115+
if (writeStrategySettings != null) {
116+
requestBuilder.getPartitioningSettingsBuilder().getAutoPartitioningSettingsBuilder()
117+
.setPartitionWriteSpeed(YdbTopic.AutoPartitioningWriteSpeedStrategy.newBuilder()
118+
.setStabilizationWindow(ProtobufUtils.durationToProto(writeStrategySettings.getStabilizationWindow()))
119+
.setDownUtilizationPercent(writeStrategySettings.getDownUtilizationPercent())
120+
.setUpUtilizationPercent(writeStrategySettings.getUpUtilizationPercent())
121+
);
122+
}
108123
}
109124

110125
Duration retentionPeriod = settings.getRetentionPeriod();
@@ -145,6 +160,28 @@ public CompletableFuture<Status> alterTopic(String path, AlterTopicSettings sett
145160
if (partitionCountLimit != null) {
146161
builder.setSetPartitionCountLimit(partitionCountLimit);
147162
}
163+
AutoPartitioningStrategy autoPartitioningStrategy = partitioningSettings.getAutoPartitioningStrategy();
164+
if (autoPartitioningStrategy != null) {
165+
builder.getAlterAutoPartitioningSettingsBuilder().setSetStrategy(autoPartitioningStrategy.getProtoReference());
166+
}
167+
AlterAutoPartitioningWriteStrategySettings writeStrategySettings = partitioningSettings.getWriteStrategySettings();
168+
if (writeStrategySettings != null) {
169+
Duration stabilizationWindow = writeStrategySettings.getStabilizationWindow();
170+
if (stabilizationWindow != null) {
171+
builder.getAlterAutoPartitioningSettingsBuilder().getSetPartitionWriteSpeedBuilder()
172+
.setSetStabilizationWindow(ProtobufUtils.durationToProto(stabilizationWindow));
173+
}
174+
Integer upUtilizationPercent = writeStrategySettings.getUpUtilizationPercent();
175+
if (upUtilizationPercent != null) {
176+
builder.getAlterAutoPartitioningSettingsBuilder().getSetPartitionWriteSpeedBuilder()
177+
.setSetUpUtilizationPercent(upUtilizationPercent);
178+
}
179+
Integer downUtilizationPercent = writeStrategySettings.getDownUtilizationPercent();
180+
if (downUtilizationPercent != null) {
181+
builder.getAlterAutoPartitioningSettingsBuilder().getSetPartitionWriteSpeedBuilder()
182+
.setSetDownUtilizationPercent(downUtilizationPercent);
183+
}
184+
}
148185
requestBuilder.setAlterPartitioningSettings(builder);
149186
}
150187

@@ -273,11 +310,20 @@ private TopicDescription mapDescribeTopic(YdbTopic.DescribeTopicResult result) {
273310
.setMeteringMode(fromProto(result.getMeteringMode()));
274311

275312
YdbTopic.PartitioningSettings partitioningSettings = result.getPartitioningSettings();
276-
description.setPartitioningSettings(PartitioningSettings.newBuilder()
313+
PartitioningSettings.Builder partitioningDescription = PartitioningSettings.newBuilder()
277314
.setMinActivePartitions(partitioningSettings.getMinActivePartitions())
278315
.setPartitionCountLimit(partitioningSettings.getPartitionCountLimit())
316+
.setAutoPartitioningStrategy(AutoPartitioningStrategy.fromProto(partitioningSettings.getAutoPartitioningSettings().getStrategy()));
317+
318+
YdbTopic.AutoPartitioningWriteSpeedStrategy partitionWriteSpeed = partitioningSettings.getAutoPartitioningSettings().getPartitionWriteSpeed();
319+
partitioningDescription.setWriteStrategySettings(AutoPartitioningWriteStrategySettings.newBuilder()
320+
.setStabilizationWindow(ProtobufUtils.protoToDuration(partitionWriteSpeed.getStabilizationWindow()))
321+
.setUpUtilizationPercent(partitionWriteSpeed.getUpUtilizationPercent())
322+
.setDownUtilizationPercent(partitionWriteSpeed.getDownUtilizationPercent())
279323
.build());
280324

325+
description.setPartitioningSettings(partitioningDescription.build());
326+
281327
List<PartitionInfo> partitions = new ArrayList<>();
282328
for (YdbTopic.DescribeTopicResult.PartitionInfo partition : result.getPartitionsList()) {
283329
PartitionInfo.Builder partitionBuilder = PartitionInfo.newBuilder()
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package tech.ydb.topic.settings;
2+
3+
import javax.annotation.Nullable;
4+
5+
import java.time.Duration;
6+
7+
// TODO add proper javadocs
8+
public class AlterAutoPartitioningWriteStrategySettings {
9+
@Nullable
10+
private final Duration stabilizationWindow;
11+
@Nullable
12+
private final Integer upUtilizationPercent;
13+
@Nullable
14+
private final Integer downUtilizationPercent;
15+
16+
public AlterAutoPartitioningWriteStrategySettings(Builder builder) {
17+
this.stabilizationWindow = builder.stabilizationWindow;
18+
this.upUtilizationPercent = builder.upUtilizationPercent;
19+
this.downUtilizationPercent = builder.downUtilizationPercent;
20+
}
21+
22+
@Nullable
23+
public Duration getStabilizationWindow() {
24+
return stabilizationWindow;
25+
}
26+
27+
@Nullable
28+
public Integer getUpUtilizationPercent() {
29+
return upUtilizationPercent;
30+
}
31+
32+
@Nullable
33+
public Integer getDownUtilizationPercent() {
34+
return downUtilizationPercent;
35+
}
36+
37+
public static Builder newBuilder() {
38+
return new Builder();
39+
}
40+
41+
public static class Builder {
42+
private Duration stabilizationWindow = null;
43+
private Integer upUtilizationPercent = null;
44+
private Integer downUtilizationPercent = null;
45+
46+
/**
47+
* @param stabilizationWindow
48+
* @return strategy builder
49+
*/
50+
public Builder setStabilizationWindow(Duration stabilizationWindow) {
51+
this.stabilizationWindow = stabilizationWindow;
52+
return this;
53+
}
54+
55+
/**
56+
* @param upUtilizationPercent
57+
* @return strategy builder
58+
*/
59+
public Builder setUpUtilizationPercent(int upUtilizationPercent) {
60+
this.upUtilizationPercent = upUtilizationPercent;
61+
return this;
62+
}
63+
64+
/**
65+
* @param downUtilizationPercent
66+
* @return strategy builder
67+
*/
68+
public Builder setDownUtilizationPercent(int downUtilizationPercent) {
69+
this.downUtilizationPercent = downUtilizationPercent;
70+
return this;
71+
}
72+
73+
public AlterAutoPartitioningWriteStrategySettings build() {
74+
return new AlterAutoPartitioningWriteStrategySettings(this);
75+
}
76+
}
77+
}

topic/src/main/java/tech/ydb/topic/settings/AlterPartitioningSettings.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,16 @@ public class AlterPartitioningSettings {
1010
private final Long minActivePartitions;
1111
@Nullable
1212
private final Long partitionCountLimit;
13+
@Nullable
14+
private final AutoPartitioningStrategy autoPartitioningStrategy;
15+
@Nullable
16+
private final AlterAutoPartitioningWriteStrategySettings writeStrategySettings;
1317

1418
private AlterPartitioningSettings(Builder builder) {
1519
this.minActivePartitions = builder.minActivePartitions;
1620
this.partitionCountLimit = builder.partitionCountLimit;
21+
this.autoPartitioningStrategy = builder.autoPartitioningStrategy;
22+
this.writeStrategySettings = builder.writeStrategySettings;
1723
}
1824

1925
public static Builder newBuilder() {
@@ -30,12 +36,24 @@ public Long getPartitionCountLimit() {
3036
return partitionCountLimit;
3137
}
3238

39+
@Nullable
40+
public AutoPartitioningStrategy getAutoPartitioningStrategy() {
41+
return autoPartitioningStrategy;
42+
}
43+
44+
@Nullable
45+
public AlterAutoPartitioningWriteStrategySettings getWriteStrategySettings() {
46+
return writeStrategySettings;
47+
}
48+
3349
/**
3450
* BUILDER
3551
*/
3652
public static class Builder {
3753
private Long minActivePartitions = null;
3854
private Long partitionCountLimit = null;
55+
private AutoPartitioningStrategy autoPartitioningStrategy = null;
56+
private AlterAutoPartitioningWriteStrategySettings writeStrategySettings = null;
3957

4058
/**
4159
* @param minActivePartitions minimum partition count auto merge would stop working at.
@@ -58,6 +76,28 @@ public Builder setPartitionCountLimit(long partitionCountLimit) {
5876
return this;
5977
}
6078

79+
/**
80+
* @param autoPartitioningStrategy Strategy for auto partitioning.
81+
* Auto partitioning is disabled by default.
82+
* @return settings builder
83+
* @see AutoPartitioningStrategy#DISABLED
84+
*/
85+
public Builder setAutoPartitioningStrategy(AutoPartitioningStrategy autoPartitioningStrategy) {
86+
this.autoPartitioningStrategy = autoPartitioningStrategy;
87+
return this;
88+
}
89+
90+
/**
91+
* @param writeStrategySettings Settings for auto partitioning write strategy.
92+
* Does not have any effect if auto partitioning is disabled.
93+
* See {@link AlterAutoPartitioningWriteStrategySettings} for defaults
94+
* @return settings builder
95+
*/
96+
public Builder setWriteStrategySettings(AlterAutoPartitioningWriteStrategySettings writeStrategySettings) {
97+
this.writeStrategySettings = writeStrategySettings;
98+
return this;
99+
}
100+
61101
public AlterPartitioningSettings build() {
62102
return new AlterPartitioningSettings(this);
63103
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package tech.ydb.topic.settings;
2+
3+
import tech.ydb.proto.topic.YdbTopic;
4+
5+
import javax.annotation.Nullable;
6+
7+
public enum AutoPartitioningStrategy {
8+
/**
9+
* The auto partitioning is disabled.
10+
* You cannot disable the auto partitioning after it has been enabled.
11+
* @see AutoPartitioningStrategy#PAUSED
12+
*/
13+
DISABLED(YdbTopic.AutoPartitioningStrategy.AUTO_PARTITIONING_STRATEGY_DISABLED),
14+
/**
15+
* The auto partitioning algorithm will increase the partition count depending on the load characteristics.
16+
* The auto partitioning algorithm will never decrease the number of partitions.
17+
* @see AlterAutoPartitioningWriteStrategySettings
18+
*/
19+
SCALE_UP(YdbTopic.AutoPartitioningStrategy.AUTO_PARTITIONING_STRATEGY_SCALE_UP),
20+
/**
21+
* The auto partitioning algorithm will both increase and decrease partitions count depending on the load characteristics.
22+
* @see AlterAutoPartitioningWriteStrategySettings
23+
*/
24+
SCALE_UP_AND_DOWN(YdbTopic.AutoPartitioningStrategy.AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN),
25+
/**
26+
* The auto partitioning is paused.
27+
*/
28+
PAUSED(YdbTopic.AutoPartitioningStrategy.AUTO_PARTITIONING_STRATEGY_PAUSED);
29+
30+
private final YdbTopic.AutoPartitioningStrategy protoReference;
31+
32+
AutoPartitioningStrategy(YdbTopic.AutoPartitioningStrategy protoReference) {
33+
this.protoReference = protoReference;
34+
}
35+
36+
public YdbTopic.AutoPartitioningStrategy getProtoReference() {
37+
return protoReference;
38+
}
39+
40+
public static @Nullable AutoPartitioningStrategy fromProto(YdbTopic.AutoPartitioningStrategy protoReference) {
41+
for (AutoPartitioningStrategy value : values()) {
42+
if (value.getProtoReference() == protoReference) {
43+
return value;
44+
}
45+
}
46+
return null;
47+
}
48+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package tech.ydb.topic.settings;
2+
3+
import java.time.Duration;
4+
5+
public class AutoPartitioningWriteStrategySettings {
6+
private final Duration stabilizationWindow;
7+
private final int upUtilizationPercent;
8+
private final int downUtilizationPercent;
9+
10+
public AutoPartitioningWriteStrategySettings(Builder builder) {
11+
this.stabilizationWindow = builder.stabilizationWindow;
12+
this.upUtilizationPercent = builder.upUtilizationPercent;
13+
this.downUtilizationPercent = builder.downUtilizationPercent;
14+
}
15+
16+
public Duration getStabilizationWindow() {
17+
return stabilizationWindow;
18+
}
19+
20+
public int getUpUtilizationPercent() {
21+
return upUtilizationPercent;
22+
}
23+
24+
public int getDownUtilizationPercent() {
25+
return downUtilizationPercent;
26+
}
27+
28+
public static Builder newBuilder() {
29+
return new Builder();
30+
}
31+
32+
public static class Builder {
33+
private Duration stabilizationWindow = Duration.ofMinutes(5);
34+
private int upUtilizationPercent = 90;
35+
private int downUtilizationPercent = 30;
36+
37+
/**
38+
* @param stabilizationWindow
39+
* @return strategy builder
40+
*/
41+
public Builder setStabilizationWindow(Duration stabilizationWindow) {
42+
this.stabilizationWindow = stabilizationWindow;
43+
return this;
44+
}
45+
46+
/**
47+
* @param upUtilizationPercent
48+
* @return strategy builder
49+
*/
50+
public Builder setUpUtilizationPercent(int upUtilizationPercent) {
51+
this.upUtilizationPercent = upUtilizationPercent;
52+
return this;
53+
}
54+
55+
/**
56+
* @param downUtilizationPercent
57+
* @return strategy builder
58+
*/
59+
public Builder setDownUtilizationPercent(int downUtilizationPercent) {
60+
this.downUtilizationPercent = downUtilizationPercent;
61+
return this;
62+
}
63+
64+
public AutoPartitioningWriteStrategySettings build() {
65+
return new AutoPartitioningWriteStrategySettings(this);
66+
}
67+
}
68+
}

0 commit comments

Comments
 (0)