Skip to content

Commit 545def5

Browse files
committed
chore: Pipelining fixes
Addresses #4998. 1. Reduces agressive yielding when reading multiple requests since it humpers pipeline efficiency. Now we yield consistently based on cpu time spend since the last resume point. 2. Increases socket read buffer size effectively allowing processing more requests in bulk. 3. Changes the sharding function for cluster mode to shard by slot id. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
1 parent a3aa588 commit 545def5

File tree

5 files changed

+226
-94
lines changed

5 files changed

+226
-94
lines changed

src/facade/dragonfly_connection.cc

+13-26
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <numeric>
1414
#include <variant>
1515

16+
#include "base/cycle_clock.h"
1617
#include "base/flags.h"
1718
#include "base/histogram.h"
1819
#include "base/io_buf.h"
@@ -24,6 +25,7 @@
2425
#include "facade/redis_parser.h"
2526
#include "facade/service_interface.h"
2627
#include "io/file.h"
28+
#include "util/fibers/fibers.h"
2729
#include "util/fibers/proactor_base.h"
2830

2931
#ifdef DFLY_USE_SSL
@@ -1136,19 +1138,6 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_
11361138
// Dispatch async if we're handling a pipeline or if we can't dispatch sync.
11371139
if (optimize_for_async || !can_dispatch_sync) {
11381140
SendAsync(cmd_msg_cb());
1139-
1140-
auto epoch = fb2::FiberSwitchEpoch();
1141-
1142-
if (async_fiber_epoch_ == epoch) {
1143-
// If we pushed too many items without context switching - yield
1144-
if (++async_streak_len_ >= 10 && !cc_->async_dispatch) {
1145-
async_streak_len_ = 0;
1146-
ThisFiber::Yield();
1147-
}
1148-
} else {
1149-
async_streak_len_ = 0;
1150-
async_fiber_epoch_ = epoch;
1151-
}
11521141
} else {
11531142
ShrinkPipelinePool(); // Gradually release pipeline request pool.
11541143
{
@@ -1393,6 +1382,8 @@ auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
13931382
auto* peer = socket_.get();
13941383
recv_buf_.res_len = 0;
13951384

1385+
const uint64_t kCyclesPerJiffy = base::CycleClock::Frequency() >> 16; // ~15usec.
1386+
13961387
do {
13971388
HandleMigrateRequest();
13981389
ec = HandleRecvSocket();
@@ -1410,6 +1401,10 @@ auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
14101401
parse_status = ParseMemcache();
14111402
}
14121403

1404+
if (reply_builder_->GetError()) {
1405+
return reply_builder_->GetError();
1406+
}
1407+
14131408
if (parse_status == NEED_MORE) {
14141409
parse_status = OK;
14151410

@@ -1429,33 +1424,25 @@ auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
14291424
[&]() { io_buf_.Reserve(std::min(max_iobfuf_len, parser_hint)); });
14301425
}
14311426

1432-
// If we got a partial request and we couldn't parse the length, just
1433-
// double the capacity.
14341427
// If we got a partial request because iobuf was full, grow it up to
14351428
// a reasonable limit to save on Recv() calls.
1436-
if (io_buf_.AppendLen() < 64u || (is_iobuf_full && capacity < 4096)) {
1429+
if (is_iobuf_full && capacity < max_iobfuf_len / 2) {
14371430
// Last io used most of the io_buf to the end.
14381431
UpdateIoBufCapacity(io_buf_, stats_, [&]() {
14391432
io_buf_.Reserve(capacity * 2); // Valid growth range.
14401433
});
14411434
}
14421435

14431436
DCHECK_GT(io_buf_.AppendLen(), 0U);
1444-
} else if (io_buf_.AppendLen() == 0) {
1445-
// We have a full buffer and we can not progress with parsing.
1446-
// This means that we have request too large.
1447-
LOG(ERROR) << "Request is too large, closing connection";
1448-
parse_status = ERROR;
1449-
break;
14501437
}
14511438
} else if (parse_status != OK) {
14521439
break;
14531440
}
1454-
ec = reply_builder_->GetError();
1455-
} while (peer->IsOpen() && !ec);
14561441

1457-
if (ec)
1458-
return ec;
1442+
if (ThisFiber::GetRunningTimeCycles() > kCyclesPerJiffy) {
1443+
ThisFiber::Yield();
1444+
}
1445+
} while (peer->IsOpen());
14591446

14601447
return parse_status;
14611448
}

src/facade/dragonfly_connection.h

-4
Original file line numberDiff line numberDiff line change
@@ -455,10 +455,6 @@ class Connection : public util::Connection {
455455

456456
unsigned parser_error_ = 0;
457457

458-
// amount of times we enqued requests asynchronously during the same async_fiber_epoch_.
459-
unsigned async_streak_len_ = 0;
460-
uint64_t async_fiber_epoch_ = 0;
461-
462458
BreakerCb breaker_cb_;
463459

464460
// Used by redis parser to avoid allocations

src/server/server_family.cc

+14-4
Original file line numberDiff line numberDiff line change
@@ -1314,6 +1314,20 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
13141314
AppendMetricWithoutLabels("pipeline_commands_duration_seconds", "",
13151315
conn_stats.pipelined_cmd_latency * 1e-6, MetricType::COUNTER,
13161316
&resp->body());
1317+
1318+
AppendMetricWithoutLabels("cmd_squash_hop_total", "", m.coordinator_stats.multi_squash_executions,
1319+
MetricType::COUNTER, &resp->body());
1320+
1321+
AppendMetricWithoutLabels("cmd_squash_commands_total", "", m.coordinator_stats.squashed_commands,
1322+
MetricType::COUNTER, &resp->body());
1323+
1324+
AppendMetricWithoutLabels("cmd_squash_hop_duration_seconds", "",
1325+
m.coordinator_stats.multi_squash_exec_hop_usec * 1e-6,
1326+
MetricType::COUNTER, &resp->body());
1327+
AppendMetricWithoutLabels("cmd_squash_hop_reply_seconds", "",
1328+
m.coordinator_stats.multi_squash_exec_reply_usec * 1e-6,
1329+
MetricType::COUNTER, &resp->body());
1330+
13171331
AppendMetricWithoutLabels("commands_squashing_replies_bytes", "",
13181332
MultiCommandSquasher::GetRepliesMemSize(), MetricType::GAUGE,
13191333
&resp->body());
@@ -2486,7 +2500,6 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
24862500
append("total_commands_processed", conn_stats.command_cnt_main + conn_stats.command_cnt_other);
24872501
append("instantaneous_ops_per_sec", m.qps);
24882502
append("total_pipelined_commands", conn_stats.pipelined_cmd_cnt);
2489-
append("total_pipelined_squashed_commands", m.coordinator_stats.squashed_commands);
24902503
append("pipeline_throttle_total", conn_stats.pipeline_throttle_count);
24912504
append("pipelined_latency_usec", conn_stats.pipelined_cmd_latency);
24922505
append("total_net_input_bytes", conn_stats.io_read_bytes);
@@ -2628,9 +2641,6 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
26282641
append("eval_shardlocal_coordination_total",
26292642
m.coordinator_stats.eval_shardlocal_coordination_cnt);
26302643
append("eval_squashed_flushes", m.coordinator_stats.eval_squashed_flushes);
2631-
append("multi_squash_execution_total", m.coordinator_stats.multi_squash_executions);
2632-
append("multi_squash_execution_hop_usec", m.coordinator_stats.multi_squash_exec_hop_usec);
2633-
append("multi_squash_execution_reply_usec", m.coordinator_stats.multi_squash_exec_reply_usec);
26342644
};
26352645

26362646
auto add_repl_info = [&] {

tests/dragonfly/connection_test.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ async def measure(aw):
561561
e = async_client.pipeline(transaction=True)
562562
for _ in range(100):
563563
e.incr("num-1")
564-
assert await measure(e.execute()) == 2 # OK + Response
564+
assert await measure(e.execute()) == 1
565565

566566
# Just pipeline
567567
p = async_client.pipeline(transaction=False)

0 commit comments

Comments
 (0)