29
29
#include < sys/syscall.h>
30
30
#include < tbox/base/defines.h>
31
31
#include < tbox/base/log.h>
32
+ #include < tbox/base/assert.h>
32
33
#include < tbox/util/fs.h>
33
34
#include < tbox/util/string.h>
34
35
#include < tbox/util/scalable_integer.h>
@@ -168,6 +169,7 @@ void Sink::commitRecord(const char *name, const char *module, uint32_t line, uin
168
169
void Sink::onBackendRecvData (const void *data, size_t size)
169
170
{
170
171
buffer_.append (data, size);
172
+ std::vector<uint8_t > write_cache;
171
173
172
174
while (buffer_.readableSize () > sizeof (RecordHeader)) {
173
175
RecordHeader header;
@@ -176,21 +178,36 @@ void Sink::onBackendRecvData(const void *data, size_t size)
176
178
// 而是要先copy到header再使用?
177
179
// ! A: 因为直接指针引用会有对齐的问题。
178
180
181
+ auto packet_size = header.name_size + header.module_size + sizeof (header);
179
182
// ! 如果长度不够,就跳过,等下一次
180
- if ((header.name_size + header.module_size + sizeof (header)) > buffer_.readableSize ())
183
+ if (packet_size > buffer_.readableSize ()) {
184
+ TBOX_ASSERT (packet_size < 1024 ); // ! 正常不应该超过1K的,如果真超过了就是Bug
181
185
break ;
186
+ }
182
187
183
188
buffer_.hasRead (sizeof (header));
184
189
185
190
const char *name = reinterpret_cast <const char *>(buffer_.readableBegin ());
186
191
const char *module = name + header.name_size ;
187
- onBackendRecvRecord (header, name, module );
192
+ onBackendRecvRecord (header, name, module , write_cache );
188
193
189
194
buffer_.hasRead (header.name_size + header.module_size );
190
195
}
196
+
197
+ if (!write_cache.empty ()) {
198
+ auto wsize = ::write (curr_record_fd_, write_cache.data (), write_cache.size ());
199
+ if (wsize != static_cast <ssize_t >(write_cache.size ())) {
200
+ LogErrno (errno, " write record file '%s' fail" , curr_record_filename_.c_str ());
201
+ return ;
202
+ }
203
+
204
+ total_write_size_ += wsize;
205
+ if (total_write_size_ >= record_file_max_size_)
206
+ CHECK_CLOSE_RESET_FD (curr_record_fd_);
207
+ }
191
208
}
192
209
193
- void Sink::onBackendRecvRecord (const RecordHeader &record, const char *name, const char *module )
210
+ void Sink::onBackendRecvRecord (const RecordHeader &record, const char *name, const char *module , std::vector< uint8_t > &write_cache )
194
211
{
195
212
if (!isFilterPassed (module ))
196
213
return ;
@@ -213,17 +230,10 @@ void Sink::onBackendRecvRecord(const RecordHeader &record, const char *name, con
213
230
data_size += util::DumpScalableInteger (name_index, (buffer + data_size), (kBufferSize - data_size));
214
231
data_size += util::DumpScalableInteger (module_index, (buffer + data_size), (kBufferSize - data_size));
215
232
216
- auto wsize = ::write (curr_record_fd_, buffer, data_size);
217
- if (wsize != static_cast <ssize_t >(data_size)) {
218
- LogErrno (errno, " write record file '%s' fail" , curr_record_filename_.c_str ());
219
- return ;
220
- }
221
-
222
233
last_timepoint_us_ = record.end_ts_us ;
223
- total_write_size_ += wsize;
224
234
225
- if (total_write_size_ >= record_file_max_size_)
226
- CHECK_CLOSE_RESET_FD (curr_record_fd_ );
235
+ std::back_insert_iterator<std::vector< uint8_t >> back_insert_iter (write_cache);
236
+ std::copy (buffer, buffer + data_size, back_insert_iter );
227
237
}
228
238
229
239
bool Sink::checkAndCreateRecordFile ()
0 commit comments