Skip to content

Commit 3f09012

Browse files
committed
fix: use static lib for mcap
1 parent 9499808 commit 3f09012

File tree

1 file changed

+387
-0
lines changed

1 file changed

+387
-0
lines changed
Lines changed: 387 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,387 @@
1+
/*
2+
* Copyright(c) 2025 ZettaScale Technology and others
3+
*
4+
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
5+
*/
6+
7+
#include <cassert>
8+
#include <cstdint>
9+
#include <cstring>
10+
#include <chrono>
11+
#include <iostream>
12+
#include <optional>
13+
#include <string>
14+
#include <thread>
15+
#include <unordered_map>
16+
#include <vector>
17+
18+
#include "dds/dds.h"
19+
#include "dds/ddsrt/iovec.h"
20+
#include "dds/ddsi/ddsi_serdata.h"
21+
22+
#include "mcap/reader.hpp"
23+
24+
#include "dds_topic_descriptor_serde.h"
25+
26+
namespace {
27+
28+
// CLI config
29+
enum class Mode { Fast, Realtime, Period };
30+
31+
struct Config {
32+
std::string mcapPath;
33+
Mode mode = Mode::Realtime; // default to realtime
34+
uint32_t periodMs = 0; // required when mode == Period
35+
bool loop = false;
36+
uint32_t waitSec = 5; // default 5
37+
};
38+
39+
void printUsage(const char* argv0) {
40+
std::cerr
41+
<< "Usage:\n"
42+
<< " " << argv0 << " <file.mcap> [--mode fast|realtime|period] [--period-ms N]\n"
43+
<< " [--loop] [--wait-sec N]\n"
44+
<< "\n"
45+
<< "Notes:\n"
46+
<< " - --topic filters by literal topic name; if omitted, all topics are replayed.\n"
47+
<< " - Period mode requires --period-ms N (milliseconds). It is only valid with --mode period.\n"
48+
<< " - Realtime mode uses global relative timing based on the first message logTime.\n"
49+
<< " - Fast mode publishes as quickly as possible (no sleeps).\n"
50+
<< " - --wait-sec defaults to 5 (wait before starting to publish).\n"
51+
<< " - Error out if channel messageEncoding != \"cdr\" or if embedded topic_descriptor metadata is missing.\n";
52+
}
53+
54+
bool parseArgs(int argc, char** argv, Config& cfg) {
55+
if (argc < 2) {
56+
printUsage(argv[0]);
57+
return false;
58+
}
59+
cfg.mcapPath = argv[1];
60+
61+
for (int i = 2; i < argc; ++i) {
62+
std::string arg(argv[i]);
63+
if (arg == "--mode" && i + 1 < argc) {
64+
std::string val(argv[++i]);
65+
if (val == "fast") cfg.mode = Mode::Fast;
66+
else if (val == "realtime") cfg.mode = Mode::Realtime;
67+
else if (val == "period") cfg.mode = Mode::Period;
68+
else {
69+
std::cerr << "Unknown mode: " << val << "\n";
70+
return false;
71+
}
72+
} else if (arg == "--period-ms" && i + 1 < argc) {
73+
cfg.periodMs = static_cast<uint32_t>(std::stoul(argv[++i]));
74+
} else if (arg == "--loop") {
75+
cfg.loop = true;
76+
} else if (arg == "--wait-sec" && i + 1 < argc) {
77+
cfg.waitSec = static_cast<uint32_t>(std::stoul(argv[++i]));
78+
} else {
79+
std::cerr << "Unknown or incomplete argument: " << arg << "\n";
80+
printUsage(argv[0]);
81+
return false;
82+
}
83+
}
84+
85+
if (cfg.mode == Mode::Period) {
86+
if (cfg.periodMs == 0) {
87+
std::cerr << "Error: --period-ms N is required and must be > 0 when --mode period\n";
88+
return false;
89+
}
90+
} else {
91+
if (cfg.periodMs != 0) {
92+
std::cerr << "Error: --period-ms is only valid with --mode period\n";
93+
return false;
94+
}
95+
}
96+
97+
return true;
98+
}
99+
100+
// Per-channel DDS context
101+
struct ChannelCtx {
102+
dds_entity_t topic = DDS_ENTITY_NIL;
103+
dds_entity_t writer = DDS_ENTITY_NIL;
104+
const struct ddsi_sertype* sertype = nullptr;
105+
};
106+
107+
// Build DDS writer(s) from MCAP channel metadata (topic_descriptor)
108+
bool buildWritersFromChannels(
109+
dds_entity_t participant,
110+
const std::unordered_map<mcap::ChannelId, mcap::ChannelPtr>& channels,
111+
const std::optional<std::string>& topicFilter,
112+
std::unordered_map<mcap::ChannelId, ChannelCtx>& out) {
113+
114+
out.clear();
115+
116+
// First, validate channels and select the subset we’ll replay
117+
std::vector<mcap::ChannelPtr> selected;
118+
selected.reserve(channels.size());
119+
for (const auto& kv : channels) {
120+
const auto& ch = kv.second;
121+
if (topicFilter.has_value()) {
122+
if (ch->topic != *topicFilter) continue;
123+
}
124+
selected.push_back(ch);
125+
}
126+
127+
if (topicFilter.has_value() && selected.empty()) {
128+
std::cerr << "Error: no channel found with topic '" << *topicFilter << "'\n";
129+
return false;
130+
}
131+
if (selected.empty()) {
132+
// Not a hard error if no channels exist at all; but nothing to replay
133+
std::cerr << "Error: MCAP file contains no channels to replay\n";
134+
return false;
135+
}
136+
137+
// Validate and build each selected channel
138+
for (const auto& ch : selected) {
139+
if (ch->messageEncoding != "cdr") {
140+
std::cerr << "Error: channel '" << ch->topic << "' has encoding '" << ch->messageEncoding
141+
<< "', expected 'cdr'\n";
142+
return false;
143+
}
144+
auto it = ch->metadata.find("topic_descriptor");
145+
if (it == ch->metadata.end()) {
146+
std::cerr << "Error: channel '" << ch->topic
147+
<< "' missing required metadata 'topic_descriptor'\n";
148+
return false;
149+
}
150+
151+
// The metadata value is a std::string that contains raw bytes of the serialized topic descriptor
152+
const std::string& descBytes = it->second;
153+
if (descBytes.empty()) {
154+
std::cerr << "Error: channel '" << ch->topic << "' has empty topic_descriptor\n";
155+
return false;
156+
}
157+
158+
// Deserialize topic descriptor
159+
dds_topic_descriptor_t* desc =
160+
dds_topic_descriptor_deserialize(descBytes.data(), descBytes.size());
161+
if (!desc) {
162+
std::cerr << "Error: failed to deserialize topic descriptor for topic '" << ch->topic << "'\n";
163+
return false;
164+
}
165+
166+
// Create topic & writer
167+
dds_entity_t topic = dds_create_topic(participant, desc, ch->topic.c_str(), nullptr, nullptr);
168+
dds_delete_topic_descriptor(desc);
169+
if (topic < 0) {
170+
std::cerr << "Error: dds_create_topic failed for topic '" << ch->topic
171+
<< "': " << dds_strretcode(topic) << "\n";
172+
return false;
173+
}
174+
175+
dds_entity_t writer = dds_create_writer(participant, topic, nullptr, nullptr);
176+
if (writer < 0) {
177+
std::cerr << "Error: dds_create_writer failed for topic '" << ch->topic
178+
<< "': " << dds_strretcode(writer) << "\n";
179+
dds_delete(topic);
180+
return false;
181+
}
182+
183+
const struct ddsi_sertype* st = nullptr;
184+
dds_return_t rc = dds_get_entity_sertype(writer, &st);
185+
if (rc != DDS_RETCODE_OK || st == nullptr) {
186+
std::cerr << "Error: dds_get_entity_sertype failed for topic '" << ch->topic
187+
<< "': " << dds_strretcode(rc) << "\n";
188+
dds_delete(writer);
189+
dds_delete(topic);
190+
return false;
191+
}
192+
193+
ChannelCtx ctx;
194+
ctx.topic = topic;
195+
ctx.writer = writer;
196+
ctx.sertype = st;
197+
198+
out.emplace(ch->id, std::move(ctx));
199+
}
200+
201+
return true;
202+
}
203+
204+
void sleepUntilSteady(std::chrono::steady_clock::time_point target) {
205+
using namespace std::chrono;
206+
while (true) {
207+
auto now = steady_clock::now();
208+
if (now >= target) break;
209+
auto remaining = target - now;
210+
211+
// Sleep in chunks to be resilient against spurious wakeups
212+
if (remaining > milliseconds(1)) {
213+
std::this_thread::sleep_for(milliseconds(1));
214+
} else {
215+
std::this_thread::sleep_for(remaining);
216+
}
217+
}
218+
}
219+
220+
bool publishCdr(dds_entity_t writer, const struct ddsi_sertype* sertype,
221+
const std::byte* data, size_t size) {
222+
// Create serdata from the raw CDR with encapsulation header included
223+
ddsrt_iovec_t iov{};
224+
iov.iov_base = const_cast<std::byte*>(data);
225+
iov.iov_len = static_cast<ddsrt_iov_len_t>(size);
226+
227+
struct ddsi_serdata* sd = ddsi_serdata_from_ser_iov(sertype, SDK_DATA, 1, &iov, size);
228+
if (!sd) {
229+
std::cerr << "Error: ddsi_serdata_from_ser_iov returned null\n";
230+
return false;
231+
}
232+
233+
dds_return_t rc = dds_writecdr(writer, sd);
234+
// The writer keeps/makes its own ref/copy; we now drop our ref
235+
ddsi_serdata_unref(sd);
236+
237+
if (rc != DDS_RETCODE_OK) {
238+
std::cerr << "Error: dds_writecdr failed: " << dds_strretcode(rc) << "\n";
239+
return false;
240+
}
241+
return true;
242+
}
243+
244+
bool replayOnce(mcap::McapReader& reader,
245+
const Config& cfg,
246+
const std::unordered_map<mcap::ChannelId, ChannelCtx>& channelMap) {
247+
using namespace std::chrono;
248+
249+
const auto onProblem = [](const mcap::Status& s) {
250+
std::cerr << "MCAP read warning/error: [" << static_cast<int>(s.code) << "] " << s.message << "\n";
251+
};
252+
253+
// iterate in global logTime order
254+
// Note: If the file lacks summary, the reader handles ordering as best as possible.
255+
auto messages = reader.readMessages(onProblem);
256+
257+
bool firstPublished = false;
258+
mcap::Timestamp firstLogTime = 0;
259+
steady_clock::time_point t0_steady;
260+
261+
for (const auto& view : messages) {
262+
const auto chId = view.channel->id;
263+
264+
// Filter to selected channels (by topic or all)
265+
auto it = channelMap.find(chId);
266+
if (it == channelMap.end())
267+
continue;
268+
269+
// Determine sleep based on mode
270+
if (cfg.mode == Mode::Realtime) {
271+
if (!firstPublished) {
272+
firstPublished = true;
273+
firstLogTime = view.message.logTime;
274+
t0_steady = steady_clock::now();
275+
} else {
276+
// relative to first logTime, global
277+
const uint64_t dt_ns = (view.message.logTime >= firstLogTime)
278+
? (view.message.logTime - firstLogTime)
279+
: 0;
280+
auto target = t0_steady + nanoseconds(dt_ns);
281+
sleepUntilSteady(target);
282+
}
283+
} else if (cfg.mode == Mode::Period) {
284+
if (!firstPublished) {
285+
firstPublished = true;
286+
} else {
287+
std::this_thread::sleep_for(milliseconds(cfg.periodMs));
288+
}
289+
} else {
290+
// Mode::Fast -> no sleep
291+
}
292+
293+
// Publish
294+
const auto& msg = view.message;
295+
const auto& ctx = it->second;
296+
if (!publishCdr(ctx.writer, ctx.sertype, msg.data, static_cast<size_t>(msg.dataSize))) {
297+
return false;
298+
}
299+
}
300+
301+
return true;
302+
}
303+
304+
} // namespace
305+
306+
int main(int argc, char** argv) {
307+
Config cfg;
308+
if (!parseArgs(argc, argv, cfg)) {
309+
return 2;
310+
}
311+
312+
// Open MCAP
313+
mcap::McapReader reader;
314+
auto st = reader.open(cfg.mcapPath);
315+
if (!st.ok()) {
316+
std::cerr << "Failed to open MCAP '" << cfg.mcapPath << "': " << st.message << "\n";
317+
return 2;
318+
}
319+
320+
// Parse summary (to collect channels, schemas etc.)
321+
st = reader.readSummary(mcap::ReadSummaryMethod::AllowFallbackFileScan);
322+
if (!st.ok()) {
323+
std::cerr << "Failed to read MCAP summary: " << st.message
324+
<< " (continuing; reader may still iterate messages)\n";
325+
// not fatal; but channels() might be incomplete until iteration progresses
326+
}
327+
328+
// Create participant (default domain, no QoS)
329+
dds_entity_t participant = dds_create_participant(DDS_DOMAIN_DEFAULT, nullptr, nullptr);
330+
if (participant < 0) {
331+
std::cerr << "dds_create_participant failed: " << dds_strretcode(participant) << "\n";
332+
reader.close();
333+
return 2;
334+
}
335+
336+
// Build writers from channels (+validate)
337+
std::unordered_map<mcap::ChannelId, ChannelCtx> channelMap;
338+
if (!buildWritersFromChannels(participant, reader.channels(), cfg.topicFilter, channelMap)) {
339+
dds_delete(participant);
340+
reader.close();
341+
return 2;
342+
}
343+
344+
// Wait before start to allow subscribers to match
345+
if (cfg.waitSec > 0) {
346+
std::cout << "Waiting " << cfg.waitSec << "s before start...\n";
347+
std::this_thread::sleep_for(std::chrono::seconds(cfg.waitSec));
348+
}
349+
350+
// Replay (possibly loop)
351+
bool ok = true;
352+
do {
353+
// Re-open for a fresh pass if looping (messages' data pointers are ephemeral)
354+
// For the first iteration we already opened the reader above.
355+
if (!ok || (&reader == nullptr)) {
356+
// unreachable; kept for clarity
357+
}
358+
359+
ok = replayOnce(reader, cfg, channelMap);
360+
if (!ok) break;
361+
362+
if (cfg.loop) {
363+
// Close and reopen to reset iteration state cleanly
364+
reader.close();
365+
auto rst = reader.open(cfg.mcapPath);
366+
if (!rst.ok()) {
367+
std::cerr << "Failed to reopen MCAP for loop: " << rst.message << "\n";
368+
ok = false;
369+
break;
370+
}
371+
rst = reader.readSummary(mcap::ReadSummaryMethod::AllowFallbackFileScan);
372+
if (!rst.ok()) {
373+
std::cerr << "Failed to re-read MCAP summary on loop: " << rst.message << "\n";
374+
}
375+
}
376+
} while (cfg.loop);
377+
378+
// Cleanup
379+
for (auto& kv : channelMap) {
380+
if (kv.second.writer != DDS_ENTITY_NIL) dds_delete(kv.second.writer);
381+
if (kv.second.topic != DDS_ENTITY_NIL) dds_delete(kv.second.topic);
382+
}
383+
dds_delete(participant);
384+
reader.close();
385+
386+
return ok ? 0 : 1;
387+
}

0 commit comments

Comments
 (0)