Skip to content

Commit 5e2d0d3

Browse files
authored
test: update logs and test for debug purpose (#4309)
test: update logs and test for devug purpose
1 parent 84fa6bc commit 5e2d0d3

File tree

3 files changed

+16
-11
lines changed

3 files changed

+16
-11
lines changed

src/server/cluster/incoming_slot_migration.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,14 @@ class ClusterShardMigration {
8181
VLOG(1) << "Finalized flow " << source_shard_id_;
8282
return;
8383
}
84-
VLOG(2) << "Attempt failed to finalize flow " << source_shard_id_;
84+
if (!tx_data->command.cmd_args.empty()) {
85+
VLOG(1) << "Flow finalization failed " << source_shard_id_ << " by "
86+
<< tx_data->command.cmd_args[0];
87+
} else {
88+
VLOG(1) << "Flow finalization failed " << source_shard_id_ << " by opcode "
89+
<< (int)tx_data->opcode;
90+
}
91+
8592
bc_->Add(); // the flow isn't finished so we lock it again
8693
}
8794
if (tx_data->opcode == journal::Op::PING) {

src/server/journal/streamer.cc

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ void JournalStreamer::AsyncWrite() {
103103
const auto& cur_buf = pending_buf_.PrepareSendingBuf();
104104

105105
in_flight_bytes_ = cur_buf.mem_size;
106-
total_sent_ += cur_buf.mem_size;
106+
total_sent_ += in_flight_bytes_;
107107

108108
const auto v_size = cur_buf.buf.size();
109109
absl::InlinedVector<iovec, 8> v(v_size);
@@ -113,7 +113,7 @@ void JournalStreamer::AsyncWrite() {
113113
v[i] = IoVec(io::Bytes(uptr, cur_buf.buf[i].size()));
114114
}
115115

116-
dest_->AsyncWrite(v.data(), v.size(), [this, len = cur_buf.mem_size](std::error_code ec) {
116+
dest_->AsyncWrite(v.data(), v.size(), [this, len = in_flight_bytes_](std::error_code ec) {
117117
OnCompletion(std::move(ec), len);
118118
});
119119
}
@@ -128,13 +128,11 @@ void JournalStreamer::Write(std::string str) {
128128
}
129129

130130
void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
131-
DCHECK_GE(in_flight_bytes_, len);
131+
DCHECK_EQ(in_flight_bytes_, len);
132132

133-
DVLOG(3) << "Completing from " << in_flight_bytes_ << " to " << in_flight_bytes_ - len;
134-
in_flight_bytes_ -= len;
135-
if (in_flight_bytes_ == 0) {
136-
pending_buf_.Pop();
137-
}
133+
DVLOG(3) << "Completing " << in_flight_bytes_;
134+
in_flight_bytes_ = 0;
135+
pending_buf_.Pop();
138136
if (ec && !IsStopped()) {
139137
cntx_->ReportError(ec);
140138
} else if (!pending_buf_.Empty() && !IsStopped()) {

tests/dragonfly/cluster_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,7 +1256,7 @@ async def test_cluster_flushall_during_migration(
12561256
df_factory.create(
12571257
port=next(next_port),
12581258
admin_port=next(next_port),
1259-
vmodule="cluster_family=9,outgoing_slot_migration=9,incoming_slot_migration=9",
1259+
vmodule="cluster_family=9,outgoing_slot_migration=9,incoming_slot_migration=9,streamer=9",
12601260
logtostdout=True,
12611261
)
12621262
for i in range(2)
@@ -1507,7 +1507,7 @@ async def test_cluster_fuzzymigration(
15071507
df_factory.create(
15081508
port=next(next_port),
15091509
admin_port=next(next_port),
1510-
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
1510+
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=9",
15111511
serialization_max_chunk_size=huge_values,
15121512
replication_stream_output_limit=10,
15131513
)

0 commit comments

Comments
 (0)