diff --git a/tests/topics/test_control_plane.py b/tests/topics/test_control_plane.py index 79b5976c..5498e832 100644 --- a/tests/topics/test_control_plane.py +++ b/tests/topics/test_control_plane.py @@ -34,6 +34,9 @@ async def test_describe_topic(self, driver, topic_path: str, topic_consumer): has_consumer = False for consumer in res.consumers: + assert consumer.consumer_stats is not None + for stat in ["min_partitions_last_read_time", "max_read_time_lag", "max_write_time_lag", "bytes_read"]: + assert getattr(consumer.consumer_stats, stat, None) is not None if consumer.name == topic_consumer: has_consumer = True break diff --git a/ydb/_grpc/grpcwrapper/ydb_topic.py b/ydb/_grpc/grpcwrapper/ydb_topic.py index 600dfb69..c41eab27 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic.py @@ -907,10 +907,11 @@ def to_public(self) -> ydb_topic_public_types.PublicConsumer: read_from=self.read_from, supported_codecs=self.supported_codecs.to_public(), attributes=self.attributes, + consumer_stats=self.consumer_stats.to_public(), ) @dataclass - class ConsumerStats(IFromProto): + class ConsumerStats(IFromProto, IToPublic): min_partitions_last_read_time: datetime.datetime max_read_time_lag: datetime.timedelta max_write_time_lag: datetime.timedelta @@ -927,6 +928,14 @@ def from_proto( bytes_read=MultipleWindowsStat.from_proto(msg.bytes_read), ) + def to_public(self) -> ydb_topic_public_types.PublicConsumer.ConsumerStats: + return ydb_topic_public_types.PublicConsumer.ConsumerStats( + min_partitions_last_read_time=self.min_partitions_last_read_time, + max_read_time_lag=self.max_read_time_lag, + max_write_time_lag=self.max_write_time_lag, + bytes_read=self.bytes_read, + ) + @dataclass class AlterConsumer(IToProto, IFromPublic): diff --git a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py index 37528a2f..b07aa3fd 100644 --- a/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py +++ b/ydb/_grpc/grpcwrapper/ydb_topic_public_types.py @@ -117,6 +117,28 @@ class PublicConsumer: attributes: Dict[str, str] = field(default_factory=lambda: dict()) "Attributes of consumer" + consumer_stats: Optional["PublicConsumer.ConsumerStats"] = None + + @dataclass + class ConsumerStats: + min_partitions_last_read_time: datetime.datetime + "Minimal timestamp of last read from partitions." + + max_read_time_lag: datetime.timedelta + """ + Maximum of differences between timestamp of read and write timestamp for all messages, + read during last minute. + """ + + max_write_time_lag: datetime.timedelta + """ + Maximum of differences between write timestamp and create timestamp for all messages, + written during last minute. + """ + + bytes_read: "PublicMultipleWindowsStat" + "Bytes read statistics." + @dataclass class PublicAlterConsumer: