13
13
#include < numeric>
14
14
#include < variant>
15
15
16
+ #include " base/cycle_clock.h"
16
17
#include " base/flags.h"
17
18
#include " base/histogram.h"
18
19
#include " base/io_buf.h"
23
24
#include " facade/memcache_parser.h"
24
25
#include " facade/redis_parser.h"
25
26
#include " facade/service_interface.h"
27
+ #include " glog/logging.h"
26
28
#include " io/file.h"
29
+ #include " util/fibers/fibers.h"
27
30
#include " util/fibers/proactor_base.h"
28
31
29
32
#ifdef DFLY_USE_SSL
@@ -93,6 +96,10 @@ ABSL_FLAG(bool, migrate_connections, true,
93
96
" they operate. Currently this is only supported for Lua script invocations, and can "
94
97
" happen at most once per connection." );
95
98
99
+ ABSL_FLAG (uint32_t , max_busy_read_usec, 100 ,
100
+ " Maximum time we read and parse from "
101
+ " a socket without yielding. In microseconds." );
102
+
96
103
using namespace util ;
97
104
using namespace std ;
98
105
using absl::GetFlag;
@@ -146,7 +153,7 @@ struct TrafficLogger {
146
153
147
154
void TrafficLogger::ResetLocked () {
148
155
if (log_file) {
149
- log_file->Close ();
156
+ std::ignore = log_file->Close ();
150
157
log_file.reset ();
151
158
}
152
159
}
@@ -196,7 +203,7 @@ void OpenTrafficLogger(string_view base_path) {
196
203
197
204
// Write version, incremental numbering :)
198
205
uint8_t version[1 ] = {2 };
199
- tl_traffic_logger.log_file ->Write (version);
206
+ std::ignore = tl_traffic_logger.log_file ->Write (version);
200
207
}
201
208
202
209
void LogTraffic (uint32_t id, bool has_more, absl::Span<RespExpr> resp,
@@ -876,6 +883,7 @@ pair<string, string> Connection::GetClientInfoBeforeAfterTid() const {
876
883
absl::StrAppend (&after, " irqmatch=" , int (cpu == my_cpu_id));
877
884
if (dispatch_q_.size ()) {
878
885
absl::StrAppend (&after, " pipeline=" , dispatch_q_.size ());
886
+ absl::StrAppend (&after, " pbuf=" , pending_pipeline_bytes_);
879
887
}
880
888
absl::StrAppend (&after, " age=" , now - creation_time_, " idle=" , now - last_interaction_);
881
889
string_view phase_name = PHASE_NAMES[phase_];
@@ -1028,7 +1036,7 @@ void Connection::ConnectionFlow() {
1028
1036
if (io_buf_.InputLen () > 0 ) {
1029
1037
phase_ = PROCESS;
1030
1038
if (redis_parser_) {
1031
- parse_status = ParseRedis ();
1039
+ parse_status = ParseRedis (10000 );
1032
1040
} else {
1033
1041
DCHECK (memcache_parser_);
1034
1042
parse_status = ParseMemcache ();
@@ -1136,19 +1144,6 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_
1136
1144
// Dispatch async if we're handling a pipeline or if we can't dispatch sync.
1137
1145
if (optimize_for_async || !can_dispatch_sync) {
1138
1146
SendAsync (cmd_msg_cb ());
1139
-
1140
- auto epoch = fb2::FiberSwitchEpoch ();
1141
-
1142
- if (async_fiber_epoch_ == epoch) {
1143
- // If we pushed too many items without context switching - yield
1144
- if (++async_streak_len_ >= 10 && !cc_->async_dispatch ) {
1145
- async_streak_len_ = 0 ;
1146
- ThisFiber::Yield ();
1147
- }
1148
- } else {
1149
- async_streak_len_ = 0 ;
1150
- async_fiber_epoch_ = epoch;
1151
- }
1152
1147
} else {
1153
1148
ShrinkPipelinePool (); // Gradually release pipeline request pool.
1154
1149
{
@@ -1164,20 +1159,17 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_
1164
1159
}
1165
1160
}
1166
1161
1167
- Connection::ParserStatus Connection::ParseRedis () {
1162
+ Connection::ParserStatus Connection::ParseRedis (unsigned max_busy_cycles ) {
1168
1163
uint32_t consumed = 0 ;
1169
1164
RedisParser::Result result = RedisParser::OK;
1170
1165
1171
- // Re-use connection local resources to reduce allocations
1172
- RespVec& parse_args = tmp_parse_args_;
1173
- CmdArgVec& cmd_vec = tmp_cmd_vec_;
1174
-
1175
- auto dispatch_sync = [this , &parse_args, &cmd_vec] {
1176
- RespExpr::VecToArgList (parse_args, &cmd_vec);
1177
- service_->DispatchCommand (absl::MakeSpan (cmd_vec), reply_builder_.get (), cc_.get ());
1166
+ auto dispatch_sync = [this ] {
1167
+ RespExpr::VecToArgList (tmp_parse_args_, &tmp_cmd_vec_);
1168
+ service_->DispatchCommand (absl::MakeSpan (tmp_cmd_vec_), reply_builder_.get (), cc_.get ());
1178
1169
};
1179
- auto dispatch_async = [this , &parse_args, tlh = mi_heap_get_backing ()]() -> MessageHandle {
1180
- return {FromArgs (std::move (parse_args), tlh)};
1170
+
1171
+ auto dispatch_async = [this , tlh = mi_heap_get_backing ()]() -> MessageHandle {
1172
+ return {FromArgs (std::move (tmp_parse_args_), tlh)};
1181
1173
};
1182
1174
1183
1175
ReadBuffer read_buffer = GetReadBuffer ();
@@ -1186,10 +1178,10 @@ Connection::ParserStatus Connection::ParseRedis() {
1186
1178
if (read_buffer.ShouldAdvance ()) { // can happen only with io_uring/bundles
1187
1179
read_buffer.slice = NextBundleBuffer (read_buffer.available_bytes );
1188
1180
}
1189
- result = redis_parser_->Parse (read_buffer.slice , &consumed, &parse_args );
1181
+ result = redis_parser_->Parse (read_buffer.slice , &consumed, &tmp_parse_args_ );
1190
1182
request_consumed_bytes_ += consumed;
1191
- if (result == RedisParser::OK && !parse_args .empty ()) {
1192
- if (RespExpr& first = parse_args .front (); first.type == RespExpr::STRING)
1183
+ if (result == RedisParser::OK && !tmp_parse_args_ .empty ()) {
1184
+ if (RespExpr& first = tmp_parse_args_ .front (); first.type == RespExpr::STRING)
1193
1185
DVLOG (2 ) << " Got Args with first token " << ToSV (first.GetBuf ());
1194
1186
1195
1187
if (io_req_size_hist)
@@ -1198,12 +1190,20 @@ Connection::ParserStatus Connection::ParseRedis() {
1198
1190
bool has_more = consumed < read_buffer.available_bytes ;
1199
1191
1200
1192
if (tl_traffic_logger.log_file && IsMain () /* log only on the main interface */ ) {
1201
- LogTraffic (id_, has_more, absl::MakeSpan (parse_args), service_->GetContextInfo (cc_.get ()));
1193
+ LogTraffic (id_, has_more, absl::MakeSpan (tmp_parse_args_),
1194
+ service_->GetContextInfo (cc_.get ()));
1202
1195
}
1203
1196
1204
1197
DispatchSingle (has_more, dispatch_sync , dispatch_async );
1205
1198
}
1206
1199
read_buffer.Consume (consumed);
1200
+
1201
+ // We must yield from time to time to allow other fibers to run.
1202
+ // Specifically, if a client sends a huge chunk of data resulting in a very long pipeline,
1203
+ // we want to yield to allow AsyncFiber to actually execute on the pending pipeline.
1204
+ if (ThisFiber::GetRunningTimeCycles () > max_busy_cycles) {
1205
+ ThisFiber::Yield ();
1206
+ }
1207
1207
} while (RedisParser::OK == result && read_buffer.available_bytes > 0 &&
1208
1208
!reply_builder_->GetError ());
1209
1209
@@ -1390,6 +1390,9 @@ auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
1390
1390
ParserStatus parse_status = OK;
1391
1391
1392
1392
size_t max_iobfuf_len = GetFlag (FLAGS_max_client_iobuf_len);
1393
+ unsigned max_busy_read_cycles =
1394
+ (base::CycleClock::Frequency () * GetFlag (FLAGS_max_busy_read_usec)) / 1000000U ;
1395
+
1393
1396
auto * peer = socket_.get ();
1394
1397
recv_buf_.res_len = 0 ;
1395
1398
@@ -1404,12 +1407,16 @@ auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
1404
1407
bool is_iobuf_full = io_buf_.AppendLen () == 0 ;
1405
1408
1406
1409
if (redis_parser_) {
1407
- parse_status = ParseRedis ();
1410
+ parse_status = ParseRedis (max_busy_read_cycles );
1408
1411
} else {
1409
1412
DCHECK (memcache_parser_);
1410
1413
parse_status = ParseMemcache ();
1411
1414
}
1412
1415
1416
+ if (reply_builder_->GetError ()) {
1417
+ return reply_builder_->GetError ();
1418
+ }
1419
+
1413
1420
if (parse_status == NEED_MORE) {
1414
1421
parse_status = OK;
1415
1422
@@ -1429,33 +1436,21 @@ auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
1429
1436
[&]() { io_buf_.Reserve (std::min (max_iobfuf_len, parser_hint)); });
1430
1437
}
1431
1438
1432
- // If we got a partial request and we couldn't parse the length, just
1433
- // double the capacity.
1434
1439
// If we got a partial request because iobuf was full, grow it up to
1435
1440
// a reasonable limit to save on Recv() calls.
1436
- if (io_buf_. AppendLen () < 64u || ( is_iobuf_full && capacity < 4096 ) ) {
1441
+ if (is_iobuf_full && capacity < max_iobfuf_len / 2 ) {
1437
1442
// Last io used most of the io_buf to the end.
1438
1443
UpdateIoBufCapacity (io_buf_, stats_, [&]() {
1439
1444
io_buf_.Reserve (capacity * 2 ); // Valid growth range.
1440
1445
});
1441
1446
}
1442
1447
1443
1448
DCHECK_GT (io_buf_.AppendLen (), 0U );
1444
- } else if (io_buf_.AppendLen () == 0 ) {
1445
- // We have a full buffer and we can not progress with parsing.
1446
- // This means that we have request too large.
1447
- LOG (ERROR) << " Request is too large, closing connection" ;
1448
- parse_status = ERROR;
1449
- break ;
1450
1449
}
1451
1450
} else if (parse_status != OK) {
1452
1451
break ;
1453
1452
}
1454
- ec = reply_builder_->GetError ();
1455
- } while (peer->IsOpen () && !ec);
1456
-
1457
- if (ec)
1458
- return ec;
1453
+ } while (peer->IsOpen ());
1459
1454
1460
1455
return parse_status;
1461
1456
}
@@ -1833,6 +1828,7 @@ void Connection::SendAsync(MessageHandle msg) {
1833
1828
// Squashing is only applied to redis commands
1834
1829
if (std::holds_alternative<PipelineMessagePtr>(msg.handle )) {
1835
1830
pending_pipeline_cmd_cnt_++;
1831
+ pending_pipeline_bytes_ += used_mem;
1836
1832
}
1837
1833
1838
1834
if (msg.IsControl ()) {
@@ -1869,7 +1865,10 @@ void Connection::RecycleMessage(MessageHandle msg) {
1869
1865
1870
1866
// Retain pipeline message in pool.
1871
1867
if (auto * pipe = get_if<PipelineMessagePtr>(&msg.handle ); pipe ) {
1868
+ DCHECK_GE (pending_pipeline_bytes_, used_mem);
1869
+ DCHECK_GE (pending_pipeline_cmd_cnt_, 1u );
1872
1870
pending_pipeline_cmd_cnt_--;
1871
+ pending_pipeline_bytes_ -= used_mem;
1873
1872
if (stats_->pipeline_cmd_cache_bytes < qbp.pipeline_cache_limit ) {
1874
1873
stats_->pipeline_cmd_cache_bytes += (*pipe )->StorageCapacity ();
1875
1874
pipeline_req_pool_.push_back (std::move (*pipe ));
0 commit comments