@@ -20,6 +20,34 @@ const TDuration UPDATE_TOKEN_PERIOD = TDuration::Hours(1);
20
20
// Error code from file ydb/public/api/protos/persqueue_error_codes_v1.proto
21
21
const ui64 WRITE_ERROR_PARTITION_INACTIVE = 500029 ;
22
22
23
+ namespace {
24
+
25
+ using TTxId = std::pair<std::string_view, std::string_view>;
26
+ using TTxIdOpt = std::optional<TTxId>;
27
+
28
+ TTxIdOpt GetTransactionId (const Ydb::Topic::StreamWriteMessage_WriteRequest& request)
29
+ {
30
+ Y_ABORT_UNLESS (request.messages_size ());
31
+
32
+ if (!request.has_tx ()) {
33
+ return std::nullopt;
34
+ }
35
+
36
+ const Ydb::Topic::TransactionIdentity& tx = request.tx ();
37
+ return TTxId (tx.session (), tx.id ());
38
+ }
39
+
40
+ TTxIdOpt GetTransactionId (const NTable::TTransaction* tx)
41
+ {
42
+ if (!tx) {
43
+ return std::nullopt;
44
+ }
45
+
46
+ return TTxId (tx->GetSession ().GetId (), tx->GetId ());
47
+ }
48
+
49
+ }
50
+
23
51
namespace NCompressionDetails {
24
52
THolder<IOutputStream> CreateCoder (ECodec codec, TBuffer& result, int quality);
25
53
}
@@ -1308,6 +1336,19 @@ void TWriteSessionImpl::UpdateTokenIfNeededImpl() {
1308
1336
Processor->Write (std::move (clientMessage));
1309
1337
}
1310
1338
1339
+ bool TWriteSessionImpl::TxIsChanged (const Ydb::Topic::StreamWriteMessage_WriteRequest* writeRequest) const
1340
+ {
1341
+ Y_ABORT_UNLESS (writeRequest);
1342
+
1343
+ if (!writeRequest->messages_size ()) {
1344
+ return false ;
1345
+ }
1346
+
1347
+ Y_ABORT_UNLESS (!OriginalMessagesToSend.empty ());
1348
+
1349
+ return GetTransactionId (*writeRequest) != GetTransactionId (OriginalMessagesToSend.front ().Tx );
1350
+ }
1351
+
1311
1352
void TWriteSessionImpl::SendImpl () {
1312
1353
Y_ABORT_UNLESS (Lock.IsLocked ());
1313
1354
@@ -1324,6 +1365,9 @@ void TWriteSessionImpl::SendImpl() {
1324
1365
if (writeRequest->messages_size () > 0 && prevCodec != block.CodecID ) {
1325
1366
break ;
1326
1367
}
1368
+ if (TxIsChanged (writeRequest)) {
1369
+ break ;
1370
+ }
1327
1371
prevCodec = block.CodecID ;
1328
1372
writeRequest->set_codec (static_cast <i32 >(block.CodecID ));
1329
1373
Y_ABORT_UNLESS (block.MessageCount == 1 );
0 commit comments