Skip to content

Commit d9cb943

Browse files
Merge pull request #18115 from a-serebryanskiy/ydb-topics-kafka-api-big-messages-hotfix-25-1
Fix reading big messages in Kafka Proxy (from #18079)
2 parents dd1e0fa + bb41562 commit d9cb943

File tree

4 files changed

+86
-69
lines changed

4 files changed

+86
-69
lines changed

ydb/core/kafka_proxy/kafka_connection.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,10 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
569569
responseHeader.Write(writable, headerVersion);
570570
reply->Write(writable, version);
571571

572-
Buffer.flush();
572+
ssize_t res = Buffer.flush();
573+
if (res < 0) {
574+
ythrow yexception() << "Error during flush of the written to socket data. Error code: " << strerror(-res) << " (" << res << ")";
575+
}
573576

574577
KAFKA_LOG_D("Sent reply: ApiKey=" << header->RequestApiKey << ", Version=" << version << ", Correlation=" << responseHeader.CorrelationId << ", Size=" << size);
575578
} catch(const yexception& e) {

ydb/core/kafka_proxy/kafka_messages_int.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ TKafkaWritable& TKafkaWritable::operator<<(const TKafkaUuid& val) {
2020
}
2121

2222
void TKafkaWritable::write(const char* val, size_t length) {
23-
Buffer.write(val, length);
23+
ssize_t res = Buffer.write(val, length);
24+
if (res < 0) {
25+
ythrow yexception() << "Error during flush of the written to socket data. Error code: " << strerror(-res) << " (" << res << ")";
26+
}
2427
}
2528

2629
TKafkaReadable& TKafkaReadable::operator>>(TKafkaUuid& val) {

ydb/core/kafka_proxy/ut/ut_serialization.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,8 @@ Y_UNIT_TEST(RequestHeader_reference) {
648648
0x65, 0x72, 0x2D, 0x31};
649649

650650
TWritableBuf sb(nullptr, BUFFER_SIZE);
651-
sb.write((char*)reference, sizeof(reference));
651+
ssize_t res = sb.write((char*)reference, sizeof(reference));
652+
UNIT_ASSERT_GE(res, 0);
652653

653654
TKafkaReadable readable(sb.GetBuffer());
654655
TRequestHeaderData result;

ydb/core/raw_socket/sock_impl.h

Lines changed: 76 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -121,62 +121,72 @@ class TBufferedWriter {
121121
public:
122122
TBufferedWriter(TSocketDescriptor* socket, size_t size)
123123
: Socket(socket)
124-
, Buffer(size)
125-
, BufferSize(size) {
126-
}
127-
128-
void write(const char* src, size_t length) {
129-
size_t possible = std::min(length, Buffer.Avail());
130-
if (possible > 0) {
131-
Buffer.Append(src, possible);
132-
}
133-
if (0 == Buffer.Avail() && Socket) {
134-
flush();
135-
}
136-
size_t left = length - possible;
137-
if (left >= BufferSize) {
138-
if (Chunks.empty()) {
139-
// optimization for reduce memory copy
140-
ssize_t res = Socket->Send(src + possible, left);
141-
if (res > 0) {
142-
left -= res;
143-
possible += res;
124+
, Buffer(size) {
125+
}
126+
127+
/**
128+
* Writes data to the socket buffer.
129+
*
130+
* This method writes the specified number of bytes from the source buffer to the internal buffer.
131+
* If the internal buffer becomes full, it flushes the buffer to the socket. The process repeats until all data is written.
132+
*
133+
* @param src A pointer to the source buffer containing the data to be written.
134+
* @param length The number of bytes to write from the source buffer.
135+
* @return The total number of bytes written to the socket. If an error occurs during writing, a negative value is returned.
136+
*/
137+
[[nodiscard]] ssize_t write(const char* src, size_t length) {
138+
size_t left = length;
139+
size_t offset = 0;
140+
ssize_t totalWritten = 0;
141+
do {
142+
if (Buffer.Avail() < left) { // time to flush
143+
// flush the remains from buffer, than write straight to socket if we have a lot data
144+
if (!Empty()) {
145+
ssize_t flushRes = flush();
146+
if (flushRes < 0) {
147+
// less than zero means error
148+
return flushRes;
149+
} else {
150+
totalWritten += flushRes;
151+
}
144152
}
145-
}
146-
if (left > 0) {
147-
Buffer.Reserve(left);
148-
Buffer.Append(src + possible, left);
149-
flush();
150-
}
151-
} else if (left > 0) {
152-
Buffer.Append(src + possible, left);
153-
}
154-
}
155-
156-
ssize_t flush() {
157-
if (!Buffer.Empty()) {
158-
Chunks.emplace_back(std::move(Buffer));
159-
Buffer.Reserve(BufferSize);
160-
}
161-
while(!Chunks.empty()) {
162-
auto& chunk = Chunks.front();
163-
ssize_t res = Socket->Send(chunk.Data(), chunk.Size());
164-
if (res > 0) {
165-
if (static_cast<size_t>(res) == chunk.Size()) {
166-
Chunks.pop_front();
153+
// if we have a lot data, skip copying it to buffer, just send ot straight to socket
154+
if (left > Buffer.Capacity()) {
155+
// we send only small batch to socket, cause we know for sure that it will be written to socket without error
156+
// there was a bug when we wrote to socket one big batch and OS closed the connection in case message was bigger than 6mb and SSL was enabled
157+
size_t bytesToSend = std::min(left, MAX_SOCKET_BATCH_SIZE);
158+
ssize_t sendRes = Send(src + offset, bytesToSend);
159+
if (sendRes <= 0) {
160+
// less than zero means error
161+
// exactly zero is also interpreted as error
162+
return sendRes;
163+
} else {
164+
left -= sendRes;
165+
offset += sendRes;
166+
totalWritten += sendRes;
167+
}
167168
} else {
168-
chunk.Shift(res);
169+
Buffer.Append(src + offset, left);
170+
left = 0;
169171
}
170-
} else if (-res == EINTR) {
171-
continue;
172-
} else if (-res == EAGAIN || -res == EWOULDBLOCK) {
173-
return 0;
174172
} else {
175-
return res;
173+
Buffer.Append(src + offset, left);
174+
left = 0;
176175
}
177-
}
176+
} while (left > 0);
177+
178+
return totalWritten;
179+
}
178180

179-
return 0;
181+
[[nodiscard]] ssize_t flush() {
182+
if (Empty()) {
183+
return 0;
184+
}
185+
ssize_t res = Send(Data(), Size());
186+
if (res > 0) {
187+
Buffer.Clear();
188+
}
189+
return res;
180190
}
181191

182192
const char* Data() {
@@ -192,29 +202,29 @@ class TBufferedWriter {
192202
}
193203

194204
bool Empty() {
195-
return Buffer.Empty() && Chunks.empty();
205+
return Buffer.Empty();
196206
}
197207

198208
private:
209+
static constexpr ui32 MAX_RETRY_ATTEMPTS = 3;
210+
static constexpr size_t MAX_SOCKET_BATCH_SIZE = 1_MB;
199211
TSocketDescriptor* Socket;
200212
TBuffer Buffer;
201-
size_t BufferSize;
202213

203-
struct Chunk {
204-
Chunk(TBuffer&& buffer)
205-
: Buffer(std::move(buffer))
206-
, Position(0) {
214+
ssize_t Send(const char* data, size_t length) {
215+
ui32 retryAttemtpts = MAX_RETRY_ATTEMPTS;
216+
while (true) {
217+
ssize_t res = Socket->Send(data, length);
218+
// retry
219+
if ((-res == EAGAIN || -res == EWOULDBLOCK || -res == EINTR) && retryAttemtpts--) {
220+
continue;
221+
}
222+
223+
return res;
207224
}
208-
209-
TBuffer Buffer;
210-
size_t Position;
211-
212-
const char* Data() { return Buffer.Data() + Position; }
213-
size_t Size() { return Buffer.Size() - Position; }
214-
void Shift(size_t size) { Position += size; }
215-
};
216-
std::deque<Chunk> Chunks;
217-
225+
226+
Y_UNREACHABLE();
227+
}
218228
};
219229

220230
} // namespace NKikimr::NRawSocket

0 commit comments

Comments
 (0)