@@ -115,6 +115,8 @@ ABSL_FLAG(size_t, serialization_max_chunk_size, 64_KB,
115
115
" Maximum size of a value that may be serialized at once during snapshotting or full "
116
116
" sync. Values bigger than this threshold will be serialized using streaming "
117
117
" serialization. 0 - to disable streaming mode" );
118
+ ABSL_FLAG (uint32_t , max_squashed_cmd_num, 32 ,
119
+ " Max number of commands squashed in command squash optimizaiton" );
118
120
119
121
namespace dfly {
120
122
@@ -708,6 +710,11 @@ void SetSerializationMaxChunkSize(size_t val) {
708
710
shard_set->pool ()->AwaitBrief (cb);
709
711
}
710
712
713
+ void SetMaxSquashedCmdNum (int32_t val) {
714
+ auto cb = [val](unsigned , auto *) { ServerState::tlocal ()->max_squash_cmd_num = val; };
715
+ shard_set->pool ()->AwaitBrief (cb);
716
+ }
717
+
711
718
} // namespace
712
719
713
720
Service::Service (ProactorPool* pp)
@@ -787,6 +794,9 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
787
794
[val](unsigned tid, auto *) { facade::Connection::SetPipelineBufferLimit (tid, val); });
788
795
});
789
796
797
+ config_registry.RegisterSetter <uint32_t >(" max_squashed_cmd_num" ,
798
+ [](uint32_t val) { SetMaxSquashedCmdNum (val); });
799
+
790
800
config_registry.RegisterMutable (" replica_partial_sync" );
791
801
config_registry.RegisterMutable (" replication_timeout" );
792
802
config_registry.RegisterMutable (" migration_finalization_timeout_ms" );
@@ -857,6 +867,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
857
867
858
868
SetRssOomDenyRatioOnAllThreads (absl::GetFlag (FLAGS_rss_oom_deny_ratio));
859
869
SetSerializationMaxChunkSize (absl::GetFlag (FLAGS_serialization_max_chunk_size));
870
+ SetMaxSquashedCmdNum (absl::GetFlag (FLAGS_max_squashed_cmd_num));
860
871
861
872
// Requires that shard_set will be initialized before because server_family_.Init might
862
873
// load the snapshot.
@@ -1429,9 +1440,13 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list, SinkReply
1429
1440
}
1430
1441
1431
1442
dfly_cntx->transaction = dist_trans.get ();
1443
+ MultiCommandSquasher::Opts opts;
1444
+ opts.verify_commands = true ;
1445
+ opts.max_squash_size = ss->max_squash_cmd_num ;
1446
+
1432
1447
size_t squashed_num = MultiCommandSquasher::Execute (absl::MakeSpan (stored_cmds),
1433
1448
static_cast <RedisReplyBuilder*>(builder),
1434
- dfly_cntx, this , {. verify_commands = true } );
1449
+ dfly_cntx, this , opts );
1435
1450
dfly_cntx->transaction = nullptr ;
1436
1451
1437
1452
dispatched += stored_cmds.size ();
@@ -1735,6 +1750,7 @@ optional<CapturingReplyBuilder::Payload> Service::FlushEvalAsyncCmds(ConnectionC
1735
1750
MultiCommandSquasher::Opts opts;
1736
1751
opts.verify_commands = true ;
1737
1752
opts.error_abort = true ;
1753
+ opts.max_squash_size = ServerState::tlocal ()->max_squash_cmd_num ;
1738
1754
MultiCommandSquasher::Execute (absl::MakeSpan (info->async_cmds ), &crb, cntx, this , opts);
1739
1755
1740
1756
info->async_cmds_heap_mem = 0 ;
@@ -2209,7 +2225,9 @@ void Service::Exec(CmdArgList args, const CommandContext& cmd_cntx) {
2209
2225
2210
2226
if (absl::GetFlag (FLAGS_multi_exec_squash) && state != ExecScriptUse::SCRIPT_RUN &&
2211
2227
!cntx->conn_state .tracking_info_ .IsTrackingOn ()) {
2212
- MultiCommandSquasher::Execute (absl::MakeSpan (exec_info.body ), rb, cntx, this , {});
2228
+ MultiCommandSquasher::Opts opts;
2229
+ opts.max_squash_size = ServerState::tlocal ()->max_squash_cmd_num ;
2230
+ MultiCommandSquasher::Execute (absl::MakeSpan (exec_info.body ), rb, cntx, this , opts);
2213
2231
} else {
2214
2232
CmdArgVec arg_vec;
2215
2233
for (auto & scmd : exec_info.body ) {
0 commit comments