Skip to content

Topic metadata #531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions examples/topic/basic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async def connect(endpoint: str, database: str) -> ydb.aio.Driver:
config = ydb.DriverConfig(endpoint=endpoint, database=database)
config.credentials = ydb.credentials_from_env_variables()
driver = ydb.aio.Driver(config)
await driver.wait(15)
await driver.wait(5, fail_fast=True)
return driver


Expand All @@ -25,7 +25,8 @@ async def create_topic(driver: ydb.aio.Driver, topic: str, consumer: str):
async def write_messages(driver: ydb.aio.Driver, topic: str):
async with driver.topic_client.writer(topic) as writer:
for i in range(10):
await writer.write(f"mess-{i}")
mess = ydb.TopicWriterMessage(data=f"mess-{i}", metadata_items={"index": f"{i}"})
await writer.write(mess)
await asyncio.sleep(1)


Expand All @@ -38,6 +39,7 @@ async def read_messages(driver: ydb.aio.Driver, topic: str, consumer: str):
print(mess.seqno)
print(mess.created_at)
print(mess.data.decode())
print(mess.metadata_items)
reader.commit(mess)
except asyncio.TimeoutError:
return
Expand Down
25 changes: 25 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,31 @@ async def topic_with_messages(driver, topic_consumer, database):
return topic_path


@pytest.fixture()
@pytest.mark.asyncio()
async def topic_with_messages_with_metadata(driver, topic_consumer, database):
topic_path = database + "/test-topic-with-messages-with-metadata"
try:
await driver.topic_client.drop_topic(topic_path)
except issues.SchemeError:
pass

await driver.topic_client.create_topic(
path=topic_path,
consumers=[topic_consumer],
)

writer = driver.topic_client.writer(topic_path, producer_id="fixture-producer-id", codec=ydb.TopicCodec.RAW)
await writer.write_with_ack(
[
ydb.TopicWriterMessage(data="123".encode(), metadata_items={"key": "value"}),
ydb.TopicWriterMessage(data="456".encode(), metadata_items={"key": b"value"}),
]
)
await writer.close()
return topic_path


@pytest.fixture()
@pytest.mark.asyncio()
async def topic_reader(driver, topic_consumer, topic_path) -> ydb.TopicReaderAsyncIO:
Expand Down
29 changes: 29 additions & 0 deletions tests/topics/test_topic_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ async def test_read_message(self, driver, topic_with_messages, topic_consumer):

await reader.close()

async def test_read_metadata(self, driver, topic_with_messages_with_metadata, topic_consumer):
reader = driver.topic_client.reader(topic_with_messages_with_metadata, topic_consumer)

expected_metadata_items = {"key": b"value"}

for _ in range(2):
await reader.wait_message()
msg = await reader.receive_message()

assert msg is not None
assert msg.metadata_items
assert msg.metadata_items == expected_metadata_items

await reader.close()

async def test_read_and_commit_with_close_reader(self, driver, topic_with_messages, topic_consumer):
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
message = await reader.receive_message()
Expand Down Expand Up @@ -135,6 +150,20 @@ def test_read_message(self, driver_sync, topic_with_messages, topic_consumer):

reader.close()

def test_read_metadata(self, driver_sync, topic_with_messages_with_metadata, topic_consumer):
reader = driver_sync.topic_client.reader(topic_with_messages_with_metadata, topic_consumer)

expected_metadata_items = {"key": b"value"}

for _ in range(2):
msg = reader.receive_message()

assert msg is not None
assert msg.metadata_items
assert msg.metadata_items == expected_metadata_items

reader.close()

def test_read_and_commit_with_close_reader(self, driver_sync, topic_with_messages, topic_consumer):
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
message = reader.receive_message()
Expand Down
10 changes: 10 additions & 0 deletions tests/topics/test_topic_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ async def test_send_message(self, driver: ydb.aio.Driver, topic_path):
await writer.write(ydb.TopicWriterMessage(data="123".encode()))
await writer.close()

async def test_send_message_with_metadata(self, driver: ydb.aio.Driver, topic_path):
writer = driver.topic_client.writer(topic_path, producer_id="test")
await writer.write(ydb.TopicWriterMessage(data="123".encode(), metadata_items={"key": "value"}))
await writer.close()

async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
async with driver.topic_client.writer(
topic_path,
Expand Down Expand Up @@ -136,6 +141,11 @@ def test_send_message(self, driver_sync: ydb.Driver, topic_path):
writer.write(ydb.TopicWriterMessage(data="123".encode()))
writer.close()

def test_send_message_with_metadata(self, driver_sync: ydb.Driver, topic_path):
writer = driver_sync.topic_client.writer(topic_path, producer_id="test")
writer.write(ydb.TopicWriterMessage(data="123".encode(), metadata_items={"key": "value"}))
writer.close()

def test_wait_last_seqno(self, driver_sync: ydb.Driver, topic_path):
with driver_sync.topic_client.writer(
topic_path,
Expand Down
8 changes: 8 additions & 0 deletions ydb/_grpc/grpcwrapper/ydb_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class MessageData(IToProto):
data: bytes
uncompressed_size: int
partitioning: "StreamWriteMessage.PartitioningType"
metadata_items: Dict[str, bytes]

def to_proto(
self,
Expand All @@ -218,6 +219,10 @@ def to_proto(
proto.data = self.data
proto.uncompressed_size = self.uncompressed_size

for key, value in self.metadata_items.items():
item = ydb_topic_pb2.MetadataItem(key=key, value=value)
proto.metadata_items.append(item)

if self.partitioning is None:
pass
elif isinstance(self.partitioning, StreamWriteMessage.PartitioningPartitionID):
Expand Down Expand Up @@ -489,16 +494,19 @@ class MessageData(IFromProto):
data: bytes
uncompresed_size: int
message_group_id: str
metadata_items: Dict[str, bytes]

@staticmethod
def from_proto(
msg: ydb_topic_pb2.StreamReadMessage.ReadResponse.MessageData,
) -> "StreamReadMessage.ReadResponse.MessageData":
metadata_items = {meta.key: meta.value for meta in msg.metadata_items}
return StreamReadMessage.ReadResponse.MessageData(
offset=msg.offset,
seq_no=msg.seq_no,
created_at=msg.created_at.ToDatetime(),
data=msg.data,
metadata_items=metadata_items,
uncompresed_size=msg.uncompressed_size,
message_group_id=msg.message_group_id,
)
Expand Down
1 change: 1 addition & 0 deletions ydb/_topic_reader/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class PublicMessage(ICommittable, ISessionAlive):
written_at: datetime.datetime
producer_id: str
data: Union[bytes, Any] # set as original decompressed bytes or deserialized object if deserializer set in reader
metadata_items: Dict[str, bytes]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key of the dict can be raw bytes, not string only

_partition_session: PartitionSession
_commit_start_offset: int
_commit_end_offset: int
Expand Down
1 change: 1 addition & 0 deletions ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) ->
written_at=server_batch.written_at,
producer_id=server_batch.producer_id,
data=message_data.data,
metadata_items=message_data.metadata_items,
_partition_session=partition_session,
_commit_start_offset=partition_session._next_message_start_commit_offset,
_commit_end_offset=message_data.offset + 1,
Expand Down
20 changes: 20 additions & 0 deletions ydb/_topic_reader/topic_reader_asyncio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def stub_message(id: int):
written_at=datetime.datetime(2023, 3, 18, 14, 15),
producer_id="",
data=bytes(),
metadata_items={},
_partition_session=stub_partition_session(),
_commit_start_offset=0,
_commit_end_offset=1,
Expand Down Expand Up @@ -207,6 +208,7 @@ def create_message(
written_at=datetime.datetime(2023, 2, 3, 14, 16),
producer_id="test-producer-id",
data=bytes(),
metadata_items={},
_partition_session=partition_session,
_commit_start_offset=partition_session._next_message_start_commit_offset + offset_delta - 1,
_commit_end_offset=partition_session._next_message_start_commit_offset + offset_delta,
Expand Down Expand Up @@ -250,6 +252,7 @@ def batch_size():
seq_no=message.seqno,
created_at=message.created_at,
data=message.data,
metadata_items={},
uncompresed_size=len(message.data),
message_group_id=message.message_group_id,
)
Expand Down Expand Up @@ -445,6 +448,7 @@ async def test_commit_ranges_for_received_messages(
written_at=datetime.datetime(2023, 3, 14, 15, 42),
producer_id="asd",
data=rb"123",
metadata_items={},
_partition_session=None,
_commit_start_offset=5,
_commit_end_offset=15,
Expand All @@ -468,6 +472,7 @@ async def test_commit_ranges_for_received_messages(
written_at=datetime.datetime(2023, 3, 14, 15, 42),
producer_id="asd",
data=gzip.compress(rb"123"),
metadata_items={},
_partition_session=None,
_commit_start_offset=5,
_commit_end_offset=15,
Expand All @@ -490,6 +495,7 @@ async def test_commit_ranges_for_received_messages(
offset=1,
written_at=datetime.datetime(2023, 3, 14, 15, 42),
producer_id="asd",
metadata_items={},
data=rb"123",
_partition_session=None,
_commit_start_offset=5,
Expand All @@ -504,6 +510,7 @@ async def test_commit_ranges_for_received_messages(
written_at=datetime.datetime(2023, 3, 14, 15, 42),
producer_id="asd",
data=rb"456",
metadata_items={},
_partition_session=None,
_commit_start_offset=5,
_commit_end_offset=15,
Expand All @@ -527,6 +534,7 @@ async def test_commit_ranges_for_received_messages(
written_at=datetime.datetime(2023, 3, 14, 15, 42),
producer_id="asd",
data=gzip.compress(rb"123"),
metadata_items={},
_partition_session=None,
_commit_start_offset=5,
_commit_end_offset=15,
Expand All @@ -540,6 +548,7 @@ async def test_commit_ranges_for_received_messages(
written_at=datetime.datetime(2023, 3, 14, 15, 42),
producer_id="asd",
data=gzip.compress(rb"456"),
metadata_items={},
_partition_session=None,
_commit_start_offset=5,
_commit_end_offset=15,
Expand Down Expand Up @@ -766,6 +775,7 @@ async def test_free_buffer_after_partition_stop(self, stream, stream_reader, par
seq_no=123,
created_at=t,
data=bytes(),
metadata_items={},
uncompresed_size=message_size,
message_group_id="test-message-group",
)
Expand Down Expand Up @@ -846,6 +856,7 @@ def reader_batch_count():
created_at=created_at,
data=data,
uncompresed_size=len(data),
metadata_items={},
message_group_id=message_group_id,
)
],
Expand Down Expand Up @@ -877,6 +888,7 @@ def reader_batch_count():
written_at=written_at,
producer_id=producer_id,
data=data,
metadata_items={},
_partition_session=partition_session,
_commit_start_offset=expected_message_offset,
_commit_end_offset=expected_message_offset + 1,
Expand Down Expand Up @@ -923,6 +935,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
seq_no=3,
created_at=created_at,
data=data,
metadata_items={},
uncompresed_size=len(data),
message_group_id=message_group_id,
)
Expand All @@ -944,6 +957,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
seq_no=2,
created_at=created_at2,
data=data,
metadata_items={},
uncompresed_size=len(data),
message_group_id=message_group_id,
)
Expand All @@ -960,6 +974,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
seq_no=3,
created_at=created_at3,
data=data2,
metadata_items={},
uncompresed_size=len(data2),
message_group_id=message_group_id,
),
Expand All @@ -968,6 +983,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
seq_no=5,
created_at=created_at4,
data=data,
metadata_items={},
uncompresed_size=len(data),
message_group_id=message_group_id2,
),
Expand Down Expand Up @@ -998,6 +1014,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
written_at=written_at,
producer_id=producer_id,
data=data,
metadata_items={},
_partition_session=partition_session,
_commit_start_offset=partition1_mess1_expected_offset,
_commit_end_offset=partition1_mess1_expected_offset + 1,
Expand All @@ -1018,6 +1035,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
written_at=written_at2,
producer_id=producer_id,
data=data,
metadata_items={},
_partition_session=second_partition_session,
_commit_start_offset=partition2_mess1_expected_offset,
_commit_end_offset=partition2_mess1_expected_offset + 1,
Expand All @@ -1038,6 +1056,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
written_at=written_at2,
producer_id=producer_id2,
data=data2,
metadata_items={},
_partition_session=second_partition_session,
_commit_start_offset=partition2_mess2_expected_offset,
_commit_end_offset=partition2_mess2_expected_offset + 1,
Expand All @@ -1051,6 +1070,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
written_at=written_at2,
producer_id=producer_id,
data=data,
metadata_items={},
_partition_session=second_partition_session,
_commit_start_offset=partition2_mess3_expected_offset,
_commit_end_offset=partition2_mess3_expected_offset + 1,
Expand Down
Loading
Loading