@@ -80,10 +80,26 @@ TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const TString& path, c
80
80
return status.GetConsumerDescription ();
81
81
}
82
82
83
- TStatus TTopicSdkTestSetup::Commit (const TString& path, const TString& consumerName, size_t partitionId, size_t offset ) {
83
+ void TTopicSdkTestSetup::Write (const std::string& message, ui32 partitionId) {
84
84
TTopicClient client (MakeDriver ());
85
85
86
- return client.CommitOffset (path, partitionId, consumerName, offset).GetValueSync ();
86
+ TWriteSessionSettings settings;
87
+ settings.Path (TEST_TOPIC);
88
+ settings.PartitionId (partitionId);
89
+ settings.DeduplicationEnabled (false );
90
+ auto session = client.CreateSimpleBlockingWriteSession (settings);
91
+
92
+ TWriteMessage msg (TStringBuilder () << message);
93
+ UNIT_ASSERT (session->Write (std::move (msg)));
94
+
95
+ session->Close (TDuration::Seconds (5 ));
96
+ }
97
+
98
+ TStatus TTopicSdkTestSetup::Commit (const TString& path, const TString& consumerName, size_t partitionId, size_t offset, std::optional<std::string> sessionId) {
99
+ TTopicClient client (MakeDriver ());
100
+
101
+ TCommitOffsetSettings commitSettings {.ReadSessionId_ = sessionId};
102
+ return client.CommitOffset (path, partitionId, consumerName, offset, commitSettings).GetValueSync ();
87
103
}
88
104
89
105
0 commit comments