Skip to content

Partition autosplit feature #549

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions tests/topics/test_control_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest

import ydb
from ydb import issues


Expand Down Expand Up @@ -56,6 +57,39 @@ async def test_alter_existed_topic(self, driver, topic_path):
topic_after = await client.describe_topic(topic_path)
assert topic_after.min_active_partitions == target_min_active_partitions

async def test_alter_auto_partitioning_settings(self, driver, topic_path):
client = driver.topic_client

topic_before = await client.describe_topic(topic_path)

expected = topic_before.auto_partitioning_settings

expected.strategy = ydb.TopicAutoPartitioningStrategy.SCALE_UP

await client.alter_topic(
topic_path,
alter_auto_partitioning_settings=ydb.TopicAlterAutoPartitioningSettings(
set_strategy=ydb.TopicAutoPartitioningStrategy.SCALE_UP,
),
)

topic_after = await client.describe_topic(topic_path)

assert topic_after.auto_partitioning_settings == expected

expected.up_utilization_percent = 88

await client.alter_topic(
topic_path,
alter_auto_partitioning_settings=ydb.TopicAlterAutoPartitioningSettings(
set_up_utilization_percent=88,
),
)

topic_after = await client.describe_topic(topic_path)

assert topic_after.auto_partitioning_settings == expected


class TestTopicClientControlPlane:
def test_create_topic(self, driver_sync, database):
Expand Down
214 changes: 212 additions & 2 deletions ydb/_grpc/grpcwrapper/ydb_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,12 +419,14 @@ def from_proto(
class InitRequest(IToProto):
topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"]
consumer: str
auto_partitioning_support: bool = False

def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest:
res = ydb_topic_pb2.StreamReadMessage.InitRequest()
res.consumer = self.consumer
for settings in self.topics_read_settings:
res.topics_read_settings.append(settings.to_proto())
res.auto_partitioning_support = self.auto_partitioning_support
return res

@dataclass
Expand Down Expand Up @@ -696,6 +698,20 @@ def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.StopPartitionSessionRespon
partition_session_id=self.partition_session_id,
)

@dataclass
class EndPartitionSession(IFromProto):
partition_session_id: int
adjacent_partition_ids: List[int]
child_partition_ids: List[int]

@staticmethod
def from_proto(msg: ydb_topic_pb2.StreamReadMessage.EndPartitionSession):
return StreamReadMessage.EndPartitionSession(
partition_session_id=msg.partition_session_id,
adjacent_partition_ids=list(msg.adjacent_partition_ids),
child_partition_ids=list(msg.child_partition_ids),
)

@dataclass
class FromClient(IToProto):
client_message: "ReaderMessagesFromClientToServer"
Expand Down Expand Up @@ -775,6 +791,13 @@ def from_proto(
msg.partition_session_status_response
),
)
elif mess_type == "end_partition_session":
return StreamReadMessage.FromServer(
server_status=server_status,
server_message=StreamReadMessage.EndPartitionSession.from_proto(
msg.end_partition_session,
),
)
else:
raise issues.UnexpectedGrpcMessage(
"Unexpected message while parse ReaderMessagesFromServerToClient: '%s'" % mess_type
Expand All @@ -799,6 +822,7 @@ def from_proto(
UpdateTokenResponse,
StreamReadMessage.StartPartitionSessionRequest,
StreamReadMessage.StopPartitionSessionRequest,
StreamReadMessage.EndPartitionSession,
]


Expand Down Expand Up @@ -943,30 +967,196 @@ def from_public(alter_consumer: ydb_topic_public_types.PublicAlterConsumer) -> A
class PartitioningSettings(IToProto, IFromProto):
min_active_partitions: int
partition_count_limit: int
max_active_partitions: int
auto_partitioning_settings: AutoPartitioningSettings

@staticmethod
def from_proto(msg: ydb_topic_pb2.PartitioningSettings) -> "PartitioningSettings":
return PartitioningSettings(
min_active_partitions=msg.min_active_partitions,
partition_count_limit=msg.partition_count_limit,
max_active_partitions=msg.max_active_partitions,
auto_partitioning_settings=AutoPartitioningSettings.from_proto(msg.auto_partitioning_settings),
)

def to_proto(self) -> ydb_topic_pb2.PartitioningSettings:
auto_partitioning_settings = None
if self.auto_partitioning_settings is not None:
auto_partitioning_settings = self.auto_partitioning_settings.to_proto()

return ydb_topic_pb2.PartitioningSettings(
min_active_partitions=self.min_active_partitions,
partition_count_limit=self.partition_count_limit,
max_active_partitions=self.max_active_partitions,
auto_partitioning_settings=auto_partitioning_settings,
)


class AutoPartitioningStrategy(int, IFromProto, IFromPublic, IToPublic):
UNSPECIFIED = 0
DISABLED = 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about use grpc constants instead of direct numbers?
for show dependency

SCALE_UP = 2
SCALE_UP_AND_DOWN = 3
PAUSED = 4

@staticmethod
def from_public(
strategy: Optional[ydb_topic_public_types.PublicAutoPartitioningStrategy],
) -> Optional["AutoPartitioningStrategy"]:
if strategy is None:
return None

return AutoPartitioningStrategy(strategy)

@staticmethod
def from_proto(code: Optional[int]) -> Optional["AutoPartitioningStrategy"]:
if code is None:
return None

return AutoPartitioningStrategy(code)

def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningStrategy:
try:
return ydb_topic_public_types.PublicAutoPartitioningStrategy(int(self))
except KeyError:
return ydb_topic_public_types.PublicAutoPartitioningStrategy.UNSPECIFIED


@dataclass
class AutoPartitioningSettings(IToProto, IFromProto, IFromPublic, IToPublic):
strategy: AutoPartitioningStrategy
partition_write_speed: AutoPartitioningWriteSpeedStrategy

@staticmethod
def from_public(
settings: Optional[ydb_topic_public_types.PublicAutoPartitioningSettings],
) -> Optional[AutoPartitioningSettings]:
if not settings:
return None

return AutoPartitioningSettings(
strategy=settings.strategy,
partition_write_speed=AutoPartitioningWriteSpeedStrategy(
stabilization_window=settings.stabilization_window,
up_utilization_percent=settings.up_utilization_percent,
down_utilization_percent=settings.down_utilization_percent,
),
)

@staticmethod
def from_proto(msg: ydb_topic_pb2.AutoPartitioningSettings) -> AutoPartitioningSettings:
if msg is None:
return None

return AutoPartitioningSettings(
strategy=AutoPartitioningStrategy.from_proto(msg.strategy),
partition_write_speed=AutoPartitioningWriteSpeedStrategy.from_proto(msg.partition_write_speed),
)

def to_proto(self) -> ydb_topic_pb2.AutoPartitioningSettings:
return ydb_topic_pb2.AutoPartitioningSettings(
strategy=self.strategy, partition_write_speed=self.partition_write_speed.to_proto()
)

def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningSettings:
return ydb_topic_public_types.PublicAutoPartitioningSettings(
strategy=self.strategy.to_public(),
stabilization_window=self.partition_write_speed.stabilization_window,
up_utilization_percent=self.partition_write_speed.up_utilization_percent,
down_utilization_percent=self.partition_write_speed.down_utilization_percent,
)


@dataclass
class AutoPartitioningWriteSpeedStrategy(IToProto, IFromProto):
stabilization_window: Optional[datetime.timedelta]
up_utilization_percent: Optional[int]
down_utilization_percent: Optional[int]

def to_proto(self):
return ydb_topic_pb2.AutoPartitioningWriteSpeedStrategy(
stabilization_window=proto_duration_from_timedelta(self.stabilization_window),
up_utilization_percent=self.up_utilization_percent,
down_utilization_percent=self.down_utilization_percent,
)

@staticmethod
def from_proto(
msg: Optional[ydb_topic_pb2.AutoPartitioningWriteSpeedStrategy],
) -> Optional[AutoPartitioningWriteSpeedStrategy]:
if msg is None:
return None

return AutoPartitioningWriteSpeedStrategy(
stabilization_window=timedelta_from_proto_duration(msg.stabilization_window),
up_utilization_percent=msg.up_utilization_percent,
down_utilization_percent=msg.down_utilization_percent,
)


@dataclass
class AlterPartitioningSettings(IToProto):
set_min_active_partitions: Optional[int]
set_partition_count_limit: Optional[int]
set_max_active_partitions: Optional[int]
alter_auto_partitioning_settings: Optional[AlterAutoPartitioningSettings]

def to_proto(self) -> ydb_topic_pb2.AlterPartitioningSettings:
alter_auto_partitioning_settings = None
if self.alter_auto_partitioning_settings is not None:
alter_auto_partitioning_settings = self.alter_auto_partitioning_settings.to_proto()

return ydb_topic_pb2.AlterPartitioningSettings(
set_min_active_partitions=self.set_min_active_partitions,
set_partition_count_limit=self.set_partition_count_limit,
set_max_active_partitions=self.set_max_active_partitions,
alter_auto_partitioning_settings=alter_auto_partitioning_settings,
)


@dataclass
class AlterAutoPartitioningSettings(IToProto, IFromPublic):
set_strategy: Optional[AutoPartitioningStrategy]
set_partition_write_speed: Optional[AlterAutoPartitioningWriteSpeedStrategy]

@staticmethod
def from_public(
settings: Optional[ydb_topic_public_types.PublicAlterAutoPartitioningSettings],
) -> Optional[AlterAutoPartitioningSettings]:
if not settings:
return None

return AlterAutoPartitioningSettings(
set_strategy=settings.set_strategy,
set_partition_write_speed=AlterAutoPartitioningWriteSpeedStrategy(
set_stabilization_window=settings.set_stabilization_window,
set_up_utilization_percent=settings.set_up_utilization_percent,
set_down_utilization_percent=settings.set_down_utilization_percent,
),
)

def to_proto(self) -> ydb_topic_pb2.AlterAutoPartitioningSettings:
set_partition_write_speed = None
if self.set_partition_write_speed:
set_partition_write_speed = self.set_partition_write_speed.to_proto()

return ydb_topic_pb2.AlterAutoPartitioningSettings(
set_strategy=self.set_strategy,
set_partition_write_speed=set_partition_write_speed,
)


@dataclass
class AlterAutoPartitioningWriteSpeedStrategy(IToProto):
set_stabilization_window: Optional[datetime.timedelta]
set_up_utilization_percent: Optional[int]
set_down_utilization_percent: Optional[int]

def to_proto(self) -> ydb_topic_pb2.AlterAutoPartitioningWriteSpeedStrategy:
return ydb_topic_pb2.AlterAutoPartitioningWriteSpeedStrategy(
set_stabilization_window=proto_duration_from_timedelta(self.set_stabilization_window),
set_up_utilization_percent=self.set_up_utilization_percent,
set_down_utilization_percent=self.set_down_utilization_percent,
)


Expand All @@ -993,7 +1183,7 @@ def from_proto(code: Optional[int]) -> Optional["MeteringMode"]:

def to_public(self) -> ydb_topic_public_types.PublicMeteringMode:
try:
ydb_topic_public_types.PublicMeteringMode(int(self))
return ydb_topic_public_types.PublicMeteringMode(int(self))
except KeyError:
return ydb_topic_public_types.PublicMeteringMode.UNSPECIFIED

Expand All @@ -1012,9 +1202,13 @@ class CreateTopicRequest(IToProto, IFromPublic):
metering_mode: "MeteringMode"

def to_proto(self) -> ydb_topic_pb2.CreateTopicRequest:
partitioning_settings = None
if self.partitioning_settings is not None:
partitioning_settings = self.partitioning_settings.to_proto()

return ydb_topic_pb2.CreateTopicRequest(
path=self.path,
partitioning_settings=self.partitioning_settings.to_proto(),
partitioning_settings=partitioning_settings,
retention_period=proto_duration_from_timedelta(self.retention_period),
retention_storage_mb=self.retention_storage_mb,
supported_codecs=self.supported_codecs.to_proto(),
Expand All @@ -1039,11 +1233,17 @@ def from_public(req: ydb_topic_public_types.CreateTopicRequestParams):
consumer = ydb_topic_public_types.PublicConsumer(name=consumer)
consumers.append(Consumer.from_public(consumer))

auto_partitioning_settings = None
if req.auto_partitioning_settings is not None:
auto_partitioning_settings = AutoPartitioningSettings.from_public(req.auto_partitioning_settings)

return CreateTopicRequest(
path=req.path,
partitioning_settings=PartitioningSettings(
min_active_partitions=req.min_active_partitions,
partition_count_limit=req.partition_count_limit,
max_active_partitions=req.max_active_partitions,
auto_partitioning_settings=auto_partitioning_settings,
),
retention_period=req.retention_period,
retention_storage_mb=req.retention_storage_mb,
Expand Down Expand Up @@ -1114,13 +1314,21 @@ def from_public(req: ydb_topic_public_types.AlterTopicRequestParams) -> AlterTop
consumer = ydb_topic_public_types.PublicAlterConsumer(name=consumer)
alter_consumers.append(AlterConsumer.from_public(consumer))

alter_auto_partitioning_settings = None
if req.alter_auto_partitioning_settings is not None:
alter_auto_partitioning_settings = AlterAutoPartitioningSettings.from_public(
req.alter_auto_partitioning_settings
)

drop_consumers = req.drop_consumers if req.drop_consumers else []

return AlterTopicRequest(
path=req.path,
alter_partitioning_settings=AlterPartitioningSettings(
set_min_active_partitions=req.set_min_active_partitions,
set_partition_count_limit=req.set_partition_count_limit,
set_max_active_partitions=req.set_max_active_partitions,
alter_auto_partitioning_settings=alter_auto_partitioning_settings,
),
add_consumers=add_consumers,
set_retention_period=req.set_retention_period,
Expand Down Expand Up @@ -1181,6 +1389,8 @@ def to_public(self) -> ydb_topic_public_types.PublicDescribeTopicResult:
return ydb_topic_public_types.PublicDescribeTopicResult(
self=scheme._wrap_scheme_entry(self.self_proto),
min_active_partitions=self.partitioning_settings.min_active_partitions,
max_active_partitions=self.partitioning_settings.max_active_partitions,
auto_partitioning_settings=self.partitioning_settings.auto_partitioning_settings.to_public(),
partition_count_limit=self.partitioning_settings.partition_count_limit,
partitions=list(map(DescribeTopicResult.PartitionInfo.to_public, self.partitions)),
retention_period=self.retention_period,
Expand Down
Loading
Loading