@@ -115,10 +115,10 @@ public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean ins
115
115
logger .info ("[{}] Message queue in-flight limit of {} reached. Putting the message into incoming " +
116
116
"waiting queue" , id , settings .getMaxSendBufferMessagesCount ());
117
117
}
118
- } else if (availableSizeBytes < message .getLength ()) {
118
+ } else if (availableSizeBytes < message .getSize ()) {
119
119
if (instant ) {
120
120
String errorMessage = "[" + id + "] Rejecting a message of " +
121
- message .getLength () +
121
+ message .getSize () +
122
122
" bytes: not enough space in message queue. Buffer currently has " + currentInFlightCount +
123
123
" messages with " + availableSizeBytes + " / " + settings .getMaxSendBufferMemorySize () +
124
124
" bytes available" ;
@@ -129,7 +129,7 @@ public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean ins
129
129
} else {
130
130
logger .info ("[{}] Can't accept a message of {} bytes into message queue. Buffer currently has " +
131
131
"{} messages with {} / {} bytes available. Putting the message into incoming " +
132
- "waiting queue." , id , message .getLength (), currentInFlightCount ,
132
+ "waiting queue." , id , message .getSize (), currentInFlightCount ,
133
133
availableSizeBytes , settings .getMaxSendBufferMemorySize ());
134
134
}
135
135
} else if (incomingQueue .isEmpty ()) {
@@ -149,10 +149,10 @@ public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage message, boolean ins
149
149
private void acceptMessageIntoSendingQueue (EnqueuedMessage message ) {
150
150
this .lastAcceptedMessageFuture = message .getFuture ();
151
151
this .currentInFlightCount ++;
152
- this .availableSizeBytes -= message .getOriginLength ();
152
+ this .availableSizeBytes -= message .gitOriginSize ();
153
153
if (logger .isDebugEnabled ()) {
154
154
logger .debug ("[{}] Accepted 1 message of {} uncompressed bytes. Current In-flight: {}, " +
155
- "AvailableSizeBytes: {} ({} / {} acquired)" , id , message .getOriginLength (),
155
+ "AvailableSizeBytes: {} ({} / {} acquired)" , id , message .gitOriginSize (),
156
156
currentInFlightCount , availableSizeBytes , maxSendBufferMemorySize - availableSizeBytes ,
157
157
maxSendBufferMemorySize );
158
158
}
@@ -189,15 +189,14 @@ private void moveEncodedMessagesToSendingQueue() {
189
189
IOException error = msg .getCompressError ();
190
190
if (error != null ) { // just skip
191
191
logger .warn ("[{}] Message wasn't sent because of processing error" , id , error );
192
- free (1 , msg .getOriginLength ());
192
+ free (1 , msg .gitOriginSize ());
193
193
continue ;
194
194
}
195
195
196
- if (msg .getOriginLength () != msg .getLength ()) {
197
- logger .trace ("[{}] Message compressed from {} to {} bytes" , id , msg .getOriginLength (),
198
- msg .getLength ());
196
+ if (msg .gitOriginSize () != msg .getSize ()) {
197
+ logger .trace ("[{}] Message compressed from {} to {} bytes" , id , msg .gitOriginSize (), msg .getSize ());
199
198
// message was actually encoded. Need to free some bytes
200
- long bytesFreed = msg .getOriginLength () - msg .getLength ();
199
+ long bytesFreed = msg .gitOriginSize () - msg .getSize ();
201
200
// bytesFreed can be less than 0
202
201
free (0 , bytesFreed );
203
202
}
@@ -288,7 +287,7 @@ private void free(int messageCount, long sizeBytes) {
288
287
if (incomingMessage == null ) {
289
288
break ;
290
289
}
291
- if (incomingMessage .message .getOriginLength () > availableSizeBytes
290
+ if (incomingMessage .message .gitOriginSize () > availableSizeBytes
292
291
|| currentInFlightCount >= settings .getMaxSendBufferMessagesCount ()) {
293
292
logger .trace ("[{}] There are messages in incomingQueue still, but no space in send buffer" , id );
294
293
return ;
@@ -431,7 +430,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response)
431
430
}
432
431
if (sentMessage .getSeqNo () == ack .getSeqNo ()) {
433
432
inFlightFreed ++;
434
- bytesFreed += sentMessage .getLength ();
433
+ bytesFreed += sentMessage .getSize ();
435
434
sentMessages .remove ();
436
435
processWriteAck (sentMessage , ack );
437
436
break ;
@@ -443,7 +442,7 @@ private void onWriteResponse(YdbTopic.StreamWriteMessage.WriteResponse response)
443
442
sentMessage .getFuture ().completeExceptionally (
444
443
new RuntimeException ("Didn't get ack from server for this message" ));
445
444
inFlightFreed ++;
446
- bytesFreed += sentMessage .getLength ();
445
+ bytesFreed += sentMessage .getSize ();
447
446
sentMessages .remove ();
448
447
// Checking next message waiting for ack
449
448
} else {
0 commit comments