Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions include/gz/transport/SubscriptionHandler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ namespace gz::transport
/// \param[in] _topic The topic.
public: void CreateGenericZenohSubscriber(
std::shared_ptr<zenoh::Session> _session,
const std::string &_topic);
const FullyQualifiedTopic &_fullyQualifiedTopic);
#endif
};

Expand Down Expand Up @@ -225,10 +225,10 @@ namespace gz::transport
/// \param[in] _topic The topic associated to this callback.
public: void SetCallback(const MsgCallback<T> &_cb,
std::shared_ptr<zenoh::Session> _session,
const std::string &_topic)
const FullyQualifiedTopic &_fullyQualifiedTopic)
{
this->SetCallback(std::move(_cb));
this->CreateGenericZenohSubscriber(_session, _topic);
this->CreateGenericZenohSubscriber(_session, _fullyQualifiedTopic);
}
#endif

Expand Down Expand Up @@ -356,10 +356,10 @@ namespace gz::transport
/// \param[in] _topic The topic associated to this callback.
public: void SetCallback(const MsgCallback<ProtoMsg> &_cb,
std::shared_ptr<zenoh::Session> _session,
const std::string &_topic)
const FullyQualifiedTopic &_fullyQualifiedTopic)
{
this->SetCallback(std::move(_cb));
this->CreateGenericZenohSubscriber(_session, _topic);
this->CreateGenericZenohSubscriber(_session, _fullyQualifiedTopic);
}
#endif

Expand Down
11 changes: 6 additions & 5 deletions include/gz/transport/detail/Node.hh
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,10 @@ namespace gz::transport
std::string topic = _topic;
this->Options().TopicRemap(_topic, topic);

std::string fullyQualifiedTopic;
if (!TopicUtils::FullyQualifiedName(this->Options().Partition(),
this->Options().NameSpace(), topic, fullyQualifiedTopic))
FullyQualifiedTopic fullyQualifiedTopic(
this->Options().Partition(), this->Options().NameSpace(), topic);

if (!fullyQualifiedTopic.FullTopic())
{
std::cerr << "Topic [" << topic << "] is not valid." << std::endl;
return nullptr;
Expand Down Expand Up @@ -184,9 +185,9 @@ namespace gz::transport
// it will recover the subscription handler associated to the topic and
// will invoke the callback.
this->Shared()->localSubscribers.normal.AddHandler(
fullyQualifiedTopic, this->NodeUuid(), subscrHandlerPtr);
*fullyQualifiedTopic.FullTopic(), this->NodeUuid(), subscrHandlerPtr);

if (!this->SubscribeHelper(fullyQualifiedTopic))
if (!this->SubscribeHelper(*fullyQualifiedTopic.FullTopic()))
return nullptr;

return subscrHandlerPtr;
Expand Down
5 changes: 4 additions & 1 deletion src/Node_TEST.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,15 @@ void rawCbInfo(const char *_msgData, const size_t _size,

//////////////////////////////////////////////////
/// \brief A generic callback.
void genericCb(const transport::ProtoMsg &_msg)
void genericCb(const transport::ProtoMsg &_msg,
const transport::MessageInfo &_info)
{
std::string content;
ASSERT_TRUE(google::protobuf::TextFormat::PrintToString(_msg, &content));
EXPECT_TRUE(content.find(std::to_string(data)) != std::string::npos);
genericCbExecuted = true;

EXPECT_EQ(_info.Topic().find("@"), std::string::npos);
}

//////////////////////////////////////////////////
Expand Down
9 changes: 5 additions & 4 deletions src/SubscriptionHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ namespace gz::transport
/////////////////////////////////////////////////
void ISubscriptionHandler::CreateGenericZenohSubscriber(
std::shared_ptr<zenoh::Session> _session,
const std::string &_topic)
const FullyQualifiedTopic &_fullyQualifiedTopic)
{
MessageInfo msgInfo;
msgInfo.SetTopic(_topic);
msgInfo.SetTopic(_fullyQualifiedTopic.Topic());
msgInfo.SetType(this->TypeName());
auto dataHandler = [this, msgInfo](const zenoh::Sample &_sample)
{
Expand Down Expand Up @@ -186,10 +186,11 @@ namespace gz::transport

this->dataPtr->zSub = std::make_unique<zenoh::Subscriber<void>>(
_session->declare_subscriber(
_topic, dataHandler, zenoh::closures::none));
*_fullyQualifiedTopic.FullTopic(), dataHandler, zenoh::closures::none));

std::string token = TopicUtils::CreateLivelinessToken(
_topic, this->ProcUuid(), this->NodeUuid(), "MS", this->TypeName());
*_fullyQualifiedTopic.FullTopic(), this->ProcUuid(), this->NodeUuid(),
"MS", this->TypeName());

if (token.empty())
return;
Expand Down
8 changes: 6 additions & 2 deletions test/integration/twoProcsPubSub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,13 @@ void cbInfo(const msgs::Int32 &_msg,

//////////////////////////////////////////////////
/// \brief A generic callback.
void genericCb(const transport::ProtoMsg &/*_msg*/)
void genericCb(const transport::ProtoMsg &/*_msg*/,
const transport::MessageInfo &_info)
{
genericCbExecuted = true;
++counter;

EXPECT_EQ(_info.Topic().find("@"), std::string::npos);
}

//////////////////////////////////////////////////
Expand Down Expand Up @@ -210,9 +213,10 @@ TEST(twoProcPubSub, PubSubWrongTypesTwoRawSubscribers)
};

auto genericCb = [&](const char *, const size_t /*_size*/,
const transport::MessageInfo &)
const transport::MessageInfo &_info)
{
genericRawCbExecuted = true;
EXPECT_EQ(_info.Topic().find("@"), std::string::npos);
};

transport::Node node1;
Expand Down
Loading