Skip to content

Commit 0f415ac

Browse files
authored
chore: StoredCmd to support both owned and external arguments (#5010)
Before: StoredCmd always copied the backing buffer of the commands. this of course sub-optimal if the bucking buffer exists during the life-time of StoredCmd. This is exactly the case in `Service::DispatchManyCommands`. This PR: 1. Adds support for both owned and non-owned arguments. 2. Improves the interfaces around StoredCmd and removes some code duplication. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
1 parent d7a7591 commit 0f415ac

File tree

5 files changed

+90
-84
lines changed

5 files changed

+90
-84
lines changed

src/server/conn_context.cc

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,51 +32,62 @@ static void SendSubscriptionChangedResponse(string_view action, std::optional<st
3232
rb->SendLong(count);
3333
}
3434

35-
StoredCmd::StoredCmd(const CommandId* cid, ArgSlice args, facade::ReplyMode mode)
36-
: cid_{cid}, buffer_{}, sizes_(args.size()), reply_mode_{mode} {
35+
StoredCmd::StoredCmd(const CommandId* cid, bool own_args, ArgSlice args)
36+
: cid_{cid}, args_{args}, reply_mode_{facade::ReplyMode::FULL} {
37+
if (!own_args)
38+
return;
39+
40+
auto& own_storage = args_.emplace<OwnStorage>(args.size());
3741
size_t total_size = 0;
3842
for (auto args : args) {
3943
total_size += args.size();
4044
}
41-
42-
buffer_.resize(total_size);
43-
char* next = buffer_.data();
45+
own_storage.buffer.resize(total_size);
46+
char* next = own_storage.buffer.data();
4447
for (unsigned i = 0; i < args.size(); i++) {
4548
if (args[i].size() > 0)
4649
memcpy(next, args[i].data(), args[i].size());
4750
next += args[i].size();
48-
sizes_[i] = args[i].size();
51+
own_storage.sizes[i] = args[i].size();
4952
}
5053
}
5154

5255
StoredCmd::StoredCmd(string&& buffer, const CommandId* cid, ArgSlice args, facade::ReplyMode mode)
53-
: cid_{cid}, buffer_{std::move(buffer)}, sizes_(args.size()), reply_mode_{mode} {
56+
: cid_{cid}, args_{OwnStorage{args.size()}}, reply_mode_{mode} {
57+
OwnStorage& own_storage = std::get<OwnStorage>(args_);
58+
own_storage.buffer = std::move(buffer);
59+
5460
for (unsigned i = 0; i < args.size(); i++) {
5561
// Assume tightly packed list.
5662
DCHECK(i + 1 == args.size() || args[i].data() + args[i].size() == args[i + 1].data());
57-
sizes_[i] = args[i].size();
58-
}
59-
}
60-
61-
void StoredCmd::Fill(absl::Span<std::string_view> args) {
62-
DCHECK_GE(args.size(), sizes_.size());
63-
64-
unsigned offset = 0;
65-
for (unsigned i = 0; i < sizes_.size(); i++) {
66-
args[i] = MutableSlice{buffer_.data() + offset, sizes_[i]};
67-
offset += sizes_[i];
63+
own_storage.sizes[i] = args[i].size();
6864
}
6965
}
7066

71-
size_t StoredCmd::NumArgs() const {
72-
return sizes_.size();
67+
CmdArgList StoredCmd::ArgList(CmdArgVec* scratch) const {
68+
return std::visit(
69+
Overloaded{[&](const OwnStorage& s) {
70+
unsigned offset = 0;
71+
scratch->resize(s.sizes.size());
72+
for (unsigned i = 0; i < s.sizes.size(); i++) {
73+
(*scratch)[i] = string_view{s.buffer.data() + offset, s.sizes[i]};
74+
offset += s.sizes[i];
75+
}
76+
return CmdArgList{*scratch};
77+
},
78+
[&](const CmdArgList& s) { return s; }},
79+
args_);
7380
}
7481

7582
std::string StoredCmd::FirstArg() const {
76-
if (sizes_.size() == 0) {
83+
if (NumArgs() == 0) {
7784
return {};
7885
}
79-
return buffer_.substr(0, sizes_[0]);
86+
return std::visit(Overloaded{[&](const OwnStorage& s) { return s.buffer.substr(0, s.sizes[0]); },
87+
[&](const ArgSlice& s) {
88+
return std::string{s[0].data(), s[0].size()};
89+
}},
90+
args_);
8091
}
8192

8293
facade::ReplyMode StoredCmd::ReplyMode() const {
@@ -91,9 +102,14 @@ template <typename C> size_t IsStoredInlined(const C& c) {
91102
}
92103

93104
size_t StoredCmd::UsedMemory() const {
94-
size_t buffer_size = IsStoredInlined(buffer_) ? 0 : buffer_.size();
95-
size_t sz_size = IsStoredInlined(sizes_) ? 0 : sizes_.size() * sizeof(uint32_t);
96-
return buffer_size + sz_size;
105+
return std::visit(Overloaded{[&](const OwnStorage& s) {
106+
size_t buffer_size =
107+
IsStoredInlined(s.buffer) ? 0 : s.buffer.capacity();
108+
size_t sz_size = IsStoredInlined(s.sizes) ? 0 : s.sizes.memsize();
109+
return buffer_size + sz_size;
110+
},
111+
[&](const ArgSlice&) -> size_t { return 0U; }},
112+
args_);
97113
}
98114

99115
const CommandId* StoredCmd::Cid() const {

src/server/conn_context.h

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <absl/container/flat_hash_set.h>
99

1010
#include "acl/acl_commands_def.h"
11+
#include "core/overloaded.h"
1112
#include "facade/acl_commands_def.h"
1213
#include "facade/conn_context.h"
1314
#include "facade/reply_capture.h"
@@ -27,36 +28,37 @@ struct FlowInfo;
2728
// Used for storing MULTI/EXEC commands.
2829
class StoredCmd {
2930
public:
30-
StoredCmd(const CommandId* cid, ArgSlice args, facade::ReplyMode mode = facade::ReplyMode::FULL);
31+
StoredCmd(const CommandId* cid, bool own_args, CmdArgList args);
3132

3233
// Create on top of already filled tightly-packed buffer.
33-
StoredCmd(std::string&& buffer, const CommandId* cid, ArgSlice args,
34-
facade::ReplyMode mode = facade::ReplyMode::FULL);
34+
StoredCmd(std::string&& buffer, const CommandId* cid, CmdArgList args, facade::ReplyMode mode);
3535

36-
size_t NumArgs() const;
37-
38-
size_t UsedMemory() const;
39-
40-
// Fill the arg list with stored arguments, it should be at least of size NumArgs().
41-
// Between filling and invocation, cmd should NOT be moved.
42-
void Fill(absl::Span<std::string_view> args);
43-
44-
void Fill(CmdArgVec* dest) {
45-
dest->resize(sizes_.size());
46-
Fill(absl::MakeSpan(*dest));
36+
size_t NumArgs() const {
37+
return std::visit(Overloaded{//
38+
[](const OwnStorage& s) { return s.sizes.size(); },
39+
[](const CmdArgList& s) { return s.size(); }},
40+
args_);
4741
}
4842

43+
size_t UsedMemory() const;
44+
facade::CmdArgList ArgList(CmdArgVec* scratch) const;
4945
std::string FirstArg() const;
5046

5147
const CommandId* Cid() const;
5248

5349
facade::ReplyMode ReplyMode() const;
5450

5551
private:
56-
const CommandId* cid_; // underlying command
57-
std::string buffer_; // underlying buffer
58-
absl::FixedArray<uint32_t, 4> sizes_; // sizes of arg part
59-
facade::ReplyMode reply_mode_; // reply mode
52+
const CommandId* cid_; // underlying command
53+
struct OwnStorage {
54+
std::string buffer; // underlying buffer
55+
absl::FixedArray<uint32_t, 4> sizes; // sizes of arg part
56+
explicit OwnStorage(size_t sz) : sizes(sz) {
57+
}
58+
};
59+
60+
std::variant<OwnStorage, CmdArgList> args_; // args storage
61+
facade::ReplyMode reply_mode_; // reply mode
6062
};
6163

6264
struct ConnectionState {

src/server/main_service.cc

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -634,9 +634,8 @@ Transaction::MultiMode DeduceExecMode(ExecScriptUse state,
634634
// We can only tell if eval is transactional based on they keycount
635635
if (absl::StartsWith(scmd.Cid()->name(), "EVAL")) {
636636
CmdArgVec arg_vec{};
637-
StoredCmd cmd = scmd;
638-
cmd.Fill(&arg_vec);
639-
auto keys = DetermineKeys(scmd.Cid(), absl::MakeSpan(arg_vec));
637+
auto args = scmd.ArgList(&arg_vec);
638+
auto keys = DetermineKeys(scmd.Cid(), args);
640639
transactional |= (keys && keys.value().NumArgs() > 0);
641640
} else {
642641
transactional |= scmd.Cid()->IsTransactional();
@@ -1200,9 +1199,8 @@ void Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
12001199
bool is_trans_cmd = CO::IsTransKind(cid->name());
12011200
if (dfly_cntx->conn_state.exec_info.IsCollecting() && !is_trans_cmd) {
12021201
// TODO: protect against aggregating huge transactions.
1203-
StoredCmd stored_cmd{cid, args_no_cmd};
1204-
dfly_cntx->conn_state.exec_info.body.push_back(std::move(stored_cmd));
1205-
if (stored_cmd.Cid()->IsWriteOnly()) {
1202+
dfly_cntx->conn_state.exec_info.body.emplace_back(cid, true, args_no_cmd);
1203+
if (cid->IsWriteOnly()) {
12061204
dfly_cntx->conn_state.exec_info.is_write = true;
12071205
}
12081206
return builder->SendSimpleString("QUEUED");
@@ -1412,11 +1410,14 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list, SinkReply
14121410
DCHECK(!dfly_cntx->conn_state.exec_info.IsRunning());
14131411
DCHECK_EQ(builder->GetProtocol(), Protocol::REDIS);
14141412

1413+
auto* ss = dfly::ServerState::tlocal();
1414+
// Don't even start when paused. We can only continue if DispatchTracker is aware of us running.
1415+
if (ss->IsPaused())
1416+
return 0;
1417+
14151418
vector<StoredCmd> stored_cmds;
14161419
intrusive_ptr<Transaction> dist_trans;
1417-
14181420
size_t dispatched = 0;
1419-
auto* ss = dfly::ServerState::tlocal();
14201421

14211422
auto perform_squash = [&] {
14221423
if (stored_cmds.empty())
@@ -1445,10 +1446,6 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list, SinkReply
14451446
stored_cmds.clear();
14461447
};
14471448

1448-
// Don't even start when paused. We can only continue if DispatchTracker is aware of us running.
1449-
if (ss->IsPaused())
1450-
return 0;
1451-
14521449
for (auto args : args_list) {
14531450
string cmd = absl::AsciiStrToUpper(ArgS(args, 0));
14541451
const auto [cid, tail_args] = registry_.FindExtended(cmd, args.subspan(1));
@@ -1468,7 +1465,7 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list, SinkReply
14681465

14691466
if (!is_multi && !is_eval && !is_blocking && cid != nullptr) {
14701467
stored_cmds.reserve(args_list.size());
1471-
stored_cmds.emplace_back(cid, tail_args);
1468+
stored_cmds.emplace_back(cid, false /* do not deep-copy commands*/, tail_args);
14721469
continue;
14731470
}
14741471

@@ -2103,19 +2100,18 @@ bool IsWatchingOtherDbs(DbIndex db_indx, const ConnectionState::ExecInfo& exec_i
21032100
[db_indx](const auto& pair) { return pair.first != db_indx; });
21042101
}
21052102

2106-
template <typename F> void IterateAllKeys(ConnectionState::ExecInfo* exec_info, F&& f) {
2103+
template <typename F> void IterateAllKeys(const ConnectionState::ExecInfo* exec_info, F&& f) {
21072104
for (auto& [dbid, key] : exec_info->watched_keys)
21082105
f(MutableSlice{key.data(), key.size()});
21092106

21102107
CmdArgVec arg_vec{};
21112108

2112-
for (auto& scmd : exec_info->body) {
2109+
for (const auto& scmd : exec_info->body) {
21132110
if (!scmd.Cid()->IsTransactional())
21142111
continue;
21152112

2116-
scmd.Fill(&arg_vec);
2117-
2118-
auto key_res = DetermineKeys(scmd.Cid(), absl::MakeSpan(arg_vec));
2113+
auto args = scmd.ArgList(&arg_vec);
2114+
auto key_res = DetermineKeys(scmd.Cid(), args);
21192115
if (!key_res.ok())
21202116
continue;
21212117

@@ -2217,15 +2213,12 @@ void Service::Exec(CmdArgList args, const CommandContext& cmd_cntx) {
22172213
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), rb, cntx, this, opts);
22182214
} else {
22192215
CmdArgVec arg_vec;
2220-
for (auto& scmd : exec_info.body) {
2216+
for (const auto& scmd : exec_info.body) {
22212217
VLOG(2) << "TX CMD " << scmd.Cid()->name() << " " << scmd.NumArgs();
22222218

22232219
cntx->SwitchTxCmd(scmd.Cid());
22242220

2225-
arg_vec.resize(scmd.NumArgs());
2226-
scmd.Fill(&arg_vec);
2227-
2228-
CmdArgList args = absl::MakeSpan(arg_vec);
2221+
CmdArgList args = scmd.ArgList(&arg_vec);
22292222

22302223
if (scmd.Cid()->IsTransactional()) {
22312224
OpStatus st = cmd_cntx.tx->InitByArgs(cntx->ns, cntx->conn_state.db_index, args);

src/server/multi_command_squasher.cc

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(Shar
9292
return sinfo;
9393
}
9494

95-
MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cmd) {
95+
MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(const StoredCmd* cmd) {
9696
DCHECK(cmd->Cid());
9797

9898
if (!cmd->Cid()->IsTransactional() || (cmd->Cid()->opt_mask() & CO::BLOCKING) ||
@@ -103,8 +103,7 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm
103103
return SquashResult::NOT_SQUASHED;
104104
}
105105

106-
cmd->Fill(&tmp_keylist_);
107-
auto args = absl::MakeSpan(tmp_keylist_);
106+
auto args = cmd->ArgList(&tmp_keylist_);
108107
if (args.empty())
109108
return SquashResult::NOT_SQUASHED;
110109

@@ -136,11 +135,10 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm
136135
return need_flush ? SquashResult::SQUASHED_FULL : SquashResult::SQUASHED;
137136
}
138137

139-
bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, StoredCmd* cmd) {
138+
bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, const StoredCmd* cmd) {
140139
DCHECK(order_.empty()); // check no squashed chain is interrupted
141140

142-
cmd->Fill(&tmp_keylist_);
143-
auto args = absl::MakeSpan(tmp_keylist_);
141+
auto args = cmd->ArgList(&tmp_keylist_);
144142

145143
if (opts_.verify_commands) {
146144
if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) {
@@ -170,13 +168,11 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v
170168
if (cntx_->conn()) {
171169
local_cntx.skip_acl_validation = cntx_->conn()->IsPrivileged();
172170
}
173-
absl::InlinedVector<MutableSlice, 4> arg_vec;
174171

175-
for (auto* cmd : sinfo.cmds) {
176-
arg_vec.resize(cmd->NumArgs());
177-
auto args = absl::MakeSpan(arg_vec);
178-
cmd->Fill(args);
172+
CmdArgVec arg_vec;
179173

174+
for (const auto* cmd : sinfo.cmds) {
175+
auto args = cmd->ArgList(&arg_vec);
180176
if (opts_.verify_commands) {
181177
// The shared context is used for state verification, the local one is only for replies
182178
if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) {

src/server/multi_command_squasher.h

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class MultiCommandSquasher {
2828
unsigned max_squash_size = 32; // How many commands to squash at once
2929
};
3030

31+
// Returns number of processed commands.
3132
static size_t Execute(absl::Span<StoredCmd> cmds, facade::RedisReplyBuilder* rb,
3233
ConnectionContext* cntx, Service* service, const Opts& opts) {
3334
return MultiCommandSquasher{cmds, cntx, service, opts}.Run(rb);
@@ -40,42 +41,40 @@ class MultiCommandSquasher {
4041
private:
4142
// Per-shard execution info.
4243
struct ShardExecInfo {
43-
ShardExecInfo() : cmds{}, replies{}, local_tx{nullptr} {
44+
ShardExecInfo() : local_tx{nullptr} {
4445
}
4546

46-
std::vector<StoredCmd*> cmds; // accumulated commands
47+
std::vector<const StoredCmd*> cmds; // accumulated commands
4748
std::vector<facade::CapturingReplyBuilder::Payload> replies;
4849
unsigned reply_id = 0;
4950
boost::intrusive_ptr<Transaction> local_tx; // stub-mode tx for use inside shard
5051
};
5152

5253
enum class SquashResult { SQUASHED, SQUASHED_FULL, NOT_SQUASHED, ERROR };
5354

54-
private:
5555
MultiCommandSquasher(absl::Span<StoredCmd> cmds, ConnectionContext* cntx, Service* Service,
5656
const Opts& opts);
5757

5858
// Lazy initialize shard info.
5959
ShardExecInfo& PrepareShardInfo(ShardId sid);
6060

6161
// Retrun squash flags
62-
SquashResult TrySquash(StoredCmd* cmd);
62+
SquashResult TrySquash(const StoredCmd* cmd);
6363

6464
// Execute separate non-squashed cmd. Return false if aborting on error.
65-
bool ExecuteStandalone(facade::RedisReplyBuilder* rb, StoredCmd* cmd);
65+
bool ExecuteStandalone(facade::RedisReplyBuilder* rb, const StoredCmd* cmd);
6666

6767
// Callback that runs on shards during squashed hop.
6868
facade::OpStatus SquashedHopCb(EngineShard* es, facade::RespVersion resp_v);
6969

7070
// Execute all currently squashed commands. Return false if aborting on error.
7171
bool ExecuteSquashed(facade::RedisReplyBuilder* rb);
7272

73-
// Run all commands until completion. Returns number of squashed commands.
73+
// Run all commands until completion. Returns number of processed commands.
7474
size_t Run(facade::RedisReplyBuilder* rb);
7575

7676
bool IsAtomic() const;
7777

78-
private:
7978
absl::Span<StoredCmd> cmds_; // Input range of stored commands
8079
ConnectionContext* cntx_; // Underlying context
8180
Service* service_;

0 commit comments

Comments
 (0)