-
Notifications
You must be signed in to change notification settings - Fork 413
Description
Hi,
in our system we use CycloneDDS implementation, commit hash - 16928eb.
Above the cyclone implementation we have our own cpp layer and above this we C# layer
In the system we ENV with around 100 computers and ~500 processes that uses DDS.
during the system run we create 20 partition - each partition has a round 20-25 processes
we experiencing the following issues:
- on system star - we have some application that responsible to launch all applications and have watchdog on them to restart them if they crush - when we start all our applications we have a huge network burst on all out applications of around 50-100 MB/s
- then on every time we create a new partition (notifying the relevant applications to subscribe to a specific partition - nothing was happening on this partition before), we have the same burst of network
- after we have 15-16 partitions, the network use stays high - around ~50MB/s and even more on every app that uses DDS
- after we new partition we load a scenario on the system - basically start using the system and sending DDS samples - in one of the application, the memory use keep rising up and up - in some cases it riches to 45 GB - sometimes it stays steady on 1-2 GB of RAM
we tried changing the domain id, we made sure we dispose Readers and Writers on partition change
this is our cyclonedds.xml
default 65500B 500kB severe config cyclonedds.log <Partitioning>
<NetworkPartitions>
<!-- Empty DCPS partition stays on Cyclone's default multicast -->
<NetworkPartition name="ex_empty" Address="239.255.0.1"/>
<!-- One multicast group per exercise partition, starting at 239.255.0.2 -->
<NetworkPartition name="ex1" Address="239.255.0.2"/>
<NetworkPartition name="ex2" Address="239.255.0.3"/>
<NetworkPartition name="ex3" Address="239.255.0.4"/>
<NetworkPartition name="ex4" Address="239.255.0.5"/>
<NetworkPartition name="ex5" Address="239.255.0.6"/>
<NetworkPartition name="ex6" Address="239.255.0.7"/>
<NetworkPartition name="ex7" Address="239.255.0.8"/>
<NetworkPartition name="ex8" Address="239.255.0.9"/>
<NetworkPartition name="ex9" Address="239.255.0.10"/>
<NetworkPartition name="ex10" Address="239.255.0.11"/>
<NetworkPartition name="ex11" Address="239.255.0.12"/>
<NetworkPartition name="ex12" Address="239.255.0.13"/>
<NetworkPartition name="ex13" Address="239.255.0.14"/>
<NetworkPartition name="ex14" Address="239.255.0.15"/>
<NetworkPartition name="ex15" Address="239.255.0.16"/>
<NetworkPartition name="ex16" Address="239.255.0.17"/>
<NetworkPartition name="ex17" Address="239.255.0.18"/>
<NetworkPartition name="ex18" Address="239.255.0.19"/>
<NetworkPartition name="ex19" Address="239.255.0.20"/>
<NetworkPartition name="ex20" Address="239.255.0.21"/>
<NetworkPartition name="ex21" Address="239.255.0.22"/>
<NetworkPartition name="ex22" Address="239.255.0.23"/>
<NetworkPartition name="ex23" Address="239.255.0.24"/>
<NetworkPartition name="ex24" Address="239.255.0.25"/>
<NetworkPartition name="ex25" Address="239.255.0.26"/>
</NetworkPartitions>
<PartitionMappings>
<!-- Specific partitions 1..25 -->
<PartitionMapping DCPSPartitionTopic="1.*" NetworkPartition="ex1"/>
<PartitionMapping DCPSPartitionTopic="2.*" NetworkPartition="ex2"/>
<PartitionMapping DCPSPartitionTopic="3.*" NetworkPartition="ex3"/>
<PartitionMapping DCPSPartitionTopic="4.*" NetworkPartition="ex4"/>
<PartitionMapping DCPSPartitionTopic="5.*" NetworkPartition="ex5"/>
<PartitionMapping DCPSPartitionTopic="6.*" NetworkPartition="ex6"/>
<PartitionMapping DCPSPartitionTopic="7.*" NetworkPartition="ex7"/>
<PartitionMapping DCPSPartitionTopic="8.*" NetworkPartition="ex8"/>
<PartitionMapping DCPSPartitionTopic="9.*" NetworkPartition="ex9"/>
<PartitionMapping DCPSPartitionTopic="10.*" NetworkPartition="ex10"/>
<PartitionMapping DCPSPartitionTopic="11.*" NetworkPartition="ex11"/>
<PartitionMapping DCPSPartitionTopic="12.*" NetworkPartition="ex12"/>
<PartitionMapping DCPSPartitionTopic="13.*" NetworkPartition="ex13"/>
<PartitionMapping DCPSPartitionTopic="14.*" NetworkPartition="ex14"/>
<PartitionMapping DCPSPartitionTopic="15.*" NetworkPartition="ex15"/>
<PartitionMapping DCPSPartitionTopic="16.*" NetworkPartition="ex16"/>
<PartitionMapping DCPSPartitionTopic="17.*" NetworkPartition="ex17"/>
<PartitionMapping DCPSPartitionTopic="18.*" NetworkPartition="ex18"/>
<PartitionMapping DCPSPartitionTopic="19.*" NetworkPartition="ex19"/>
<PartitionMapping DCPSPartitionTopic="20.*" NetworkPartition="ex20"/>
<PartitionMapping DCPSPartitionTopic="21.*" NetworkPartition="ex21"/>
<PartitionMapping DCPSPartitionTopic="22.*" NetworkPartition="ex22"/>
<PartitionMapping DCPSPartitionTopic="23.*" NetworkPartition="ex23"/>
<PartitionMapping DCPSPartitionTopic="24.*" NetworkPartition="ex24"/>
<PartitionMapping DCPSPartitionTopic="25.*" NetworkPartition="ex25"/>
<!-- Catch-all for EMPTY DCPS partition (must be last) -->
<PartitionMapping DCPSPartitionTopic=".*" NetworkPartition="ex_empty"/>
</PartitionMappings>
</Partitioning>
</Domain>
this is our TopicReade.cpp file
#include <stdio.h>
#include "TopicReader.h"
#include "DataListener.h"
#include <memory.h>
#include "dds/dds.h"
#include "DdsGuid.h"
namespace Dds::Base
{
TopicReader::TopicReader( Dds::Ptr<Participant> participant, TTopicSettings* topicSettings, DataListener* listener, const char* partitionName )
{
//printf("TopicReader()\n");
_topicSettings = topicSettings;
_participant = participant;
_topic = dds_create_topic( _participant->GetNativeHandle(), topicSettings->descriptor, topicSettings->name, NULL, NULL);
if( _topic < 0 )
DDS_FATAL("dds_create_topic %s: %s\n", topicSettings->name, dds_strretcode(-_topic));
dds_qos_t *qos = dds_create_qos ();
dds_qset_reliability( qos, topicSettings->reliability.kind, topicSettings->reliability.blocking_time );
dds_qset_durability( qos, topicSettings->durability.kind );
dds_qset_history( qos, topicSettings->history.kind, topicSettings->history.depth );
const char* finalPartitionName = partitionName;
if( !finalPartitionName ) finalPartitionName = participant->GetDefaultPartitionName();
if( finalPartitionName )
{
dds_qset_partition1( qos, finalPartitionName );
}
_reader = dds_create_reader( _participant->GetNativeHandle(), _topic, qos, NULL );
if (_reader < 0)
DDS_FATAL("dds_create_reader %s: %s\n", topicSettings->name, dds_strretcode(-_reader));
dds_delete_qos(qos);
SetListener( listener );
}
TopicReader::~TopicReader()
{
// dispose writer
// dispose topic...
printf("~TopicReader()\n");
// unregister potential listener
dds_set_listener( _reader, NULL );
// free the sample buffer
free( _samples );
// delete reader & topic handles
if( _reader >= 0 )
{
auto rc = dds_delete( _reader );
if (rc != DDS_RETCODE_OK)
DDS_FATAL("dds_delete(reader) %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
}
if( _topic >= 0 )
{
auto rc = dds_delete( _topic );
if (rc != DDS_RETCODE_OK)
DDS_FATAL("dds_delete(topic): %s %s\n", _topicSettings->name, dds_strretcode(-rc));
}
}
void TopicReader::ReturnLoan()
{
if( _numSamplesReturned == 0 )
return;
void* sample = _samples;
for( int i=0; i < _numSamplesReturned; i++ )
{
dds_sample_free( sample, _topicSettings->descriptor, DDS_FREE_CONTENTS );
sample = (void*)((uint8_t*)sample + _topicSettings->descriptor->m_size);
}
// zero the used part of the sample buffer to make it ready for next GetSamples()
memset( _samples, 0, _numSamplesReturned * _topicSettings->descriptor->m_size );
_numSamplesReturned = 0;
}
int TopicReader::ReserveSamples( int minCapacity )
{
if( minCapacity > _samplesCapacity ) // the buffer only grows
{
void* newBuf = realloc( _samples, minCapacity * _topicSettings->descriptor->m_size );
if( newBuf )
{
// zero the extra allocated memory to make it ready for the GetSamples()
memset(
(uint8_t*)newBuf + _samplesCapacity * _topicSettings->descriptor->m_size,
0,
(minCapacity - _samplesCapacity) * _topicSettings->descriptor->m_size
);
_samplesCapacity = minCapacity;
_samples = newBuf;
}
}
return _samplesCapacity;
}
dds_return_t TopicReader::GetSamples(
EAction action,
void** samples, // [out] preallocated array of sample pointers (contained pointers can be null, will be replaced with pointers to dds-allocated memory)
dds_sample_info_t* sampleInfos, // [out] preallocated array of SampleInfo structures
int maxSamples,
dds_instance_handle_t instanceHandle,
uint32_t sampleStateMask
)
{
// free previously taken samples (if any)
// and zero the used part of the sample buffer (mandatory - dds_read/take expects it !)
ReturnLoan();
// limit the request to the available memory we can get
maxSamples = std::min( maxSamples, ReserveSamples( maxSamples ) );
if ( maxSamples <= 0 )
return DDS_RETCODE_ERROR; // either bad parameter or failed to allocate buffer for samples
// return the pointers to our internal buffer
for(int i=0; i < maxSamples; i++ )
samples[i] = (void*)((uint8_t*)_samples + i*_topicSettings->descriptor->m_size);
// No need for zeroing here as the sample bufer is guaranteed to be already zeroed by ReturnLoan & ReserveSamples
//// zero the sample memory so that dds does not try to reuse any dynamic structures
//memset( _samples, 0, maxSamples * _topicSettings->descriptor->m_size );
if( sampleStateMask == 0 )
sampleStateMask = DDS_ANY_STATE;
dds_return_t rc;
switch( action )
{
case EAction::Read:
rc = dds_read_mask( _reader, samples, sampleInfos, maxSamples, maxSamples, sampleStateMask );
break;
case EAction::Take:
rc = dds_take_mask( _reader, samples, sampleInfos, maxSamples, maxSamples, sampleStateMask );
break;
case EAction::ReadInstance:
rc = dds_read_instance_mask( _reader, samples, sampleInfos, maxSamples, maxSamples, instanceHandle, sampleStateMask );
break;
case EAction::TakeInstance:
rc = dds_take_instance_mask( _reader, samples, sampleInfos, maxSamples, maxSamples, instanceHandle, sampleStateMask );
break;
default:
rc = DDS_RETCODE_UNSUPPORTED;
break;
}
if (rc < 0)
{
if ( rc != DDS_RETCODE_PRECONDITION_NOT_MET || action == EAction::Read || action == EAction::Take )
{
// read/take_instance before writer is matched results DDS_RETCODE_PRECONDITION_NOT_MET.
// However, this is a normal use case and the error stops once a writer is matched.
dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "dds read/take", "dds read/take %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
}
}
else
{
_numSamplesReturned = rc;
//{
// for(int i=0; i < rc; i++ )
// {
// auto* info = sampleInfos + i;
// printf("[DDS READ] topic=%s, sender=%s\n",
// GetTopicSettings()->GetTopicName(),
// GetSenderGuid(info->publication_handle).c_str()
// );
// }
//}
}
return rc;
}
dds_instance_handle_t TopicReader::LookupInstance( void* sample )
{
return dds_lookup_instance( _reader, sample );
}
void TopicReader::SetListener( DataListener* listener )
{
if( listener ) listener->SetReader( this );
dds_listener_t* ddsList = listener ? listener->GetNativeHandle() : NULL;
dds_set_listener( _reader, ddsList );
}
int TopicReader::GetSenderGuid(dds_instance_handle_t publicationHandle, char* buf, int bufsize)
{
dds_builtintopic_endpoint_t* publData = dds_get_matched_publication_data(_reader, publicationHandle);
if (!publData)
{
dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "dds_get_matched_publication_data", "dds_get_matched_publication_data could not be found from publication handle %0llx in reader of %s\n", publicationHandle, _topicSettings->name);
return 0;
}
dds_guid_t senderGuid = publData->participant_key;
int res = dds_guid_to_string(senderGuid, buf, bufsize );
dds_free(publData); publData = nullptr;
return res;
}
std::string TopicReader::GetSenderGuid( dds_instance_handle_t publicationHandle )
{
char guidStr[100] = {0};
GetSenderGuid( publicationHandle, guidStr, sizeof(guidStr) );
return guidStr;
}
dds_return_t TopicReader::WaitForSubscriptionMatched(int timeoutMsec)
{
auto ret = dds_set_status_mask(_reader, DDS_SUBSCRIPTION_MATCHED_STATUS);
if (ret != DDS_RETCODE_OK)
return ret;
const int quantumMsec = 20;
int timeRemaining = timeoutMsec;
while (true)
{
uint32_t status;
ret = dds_get_status_changes(_reader, &status);
if (ret != DDS_RETCODE_OK)
return ret;
if (status == DDS_SUBSCRIPTION_MATCHED_STATUS) {
break;
}
if (timeoutMsec >= 0) // do we wait for timeout?
{
if (timeRemaining <= 0)
return DDS_RETCODE_TIMEOUT;
}
dds_sleepfor(DDS_MSECS(quantumMsec));
if (timeoutMsec >= 0) // do we wait for timeout?
{
timeRemaining -= quantumMsec;
if (timeRemaining <= 0)
return DDS_RETCODE_TIMEOUT;
}
}
return DDS_RETCODE_OK;
}
}
TopicWriter.cpp
#include <stdio.h>
#include "TopicWriter.h"
#include "dds/dds.h"
#include "SenderId/ParticInfoSender.h"
namespace Dds::Base
{
TopicWriter::TopicWriter( Participant* participant, TTopicSettings* topicSettings, const char* partitionName )
{
//printf("TopicWriter()\n");
_topicSettings = topicSettings;
_participant = participant;
_topic = dds_create_topic( _participant->GetNativeHandle(), topicSettings->descriptor, topicSettings->name, NULL, NULL);
if( _topic < 0 )
DDS_FATAL("dds_create_topic %s: %s\n", topicSettings->name, dds_strretcode(-_topic));
dds_qos_t *qos = dds_create_qos ();
dds_qset_reliability( qos, topicSettings->reliability.kind, topicSettings->reliability.blocking_time );
dds_qset_durability( qos, topicSettings->durability.kind );
dds_qset_history( qos, topicSettings->history.kind, topicSettings->history.depth );
dds_qset_writer_data_lifecycle( qos, topicSettings->writer_data_lifecycle.autodispose );
const char* finalPartitionName = partitionName;
if( !finalPartitionName ) finalPartitionName = participant->GetDefaultPartitionName();
if( finalPartitionName )
{
dds_qset_partition1( qos, finalPartitionName );
}
_writer = dds_create_writer( _participant->GetNativeHandle(), _topic, qos, NULL );
if (_writer < 0)
DDS_FATAL("dds_create_writer %s: %s\n", topicSettings->name, dds_strretcode(-_writer));
dds_delete_qos(qos);
// publish the participant info
ParticInfoSender::GetInstance()->RegisterParticipant( _participant );
}
TopicWriter::~TopicWriter()
{
// dispose writer
// dispose topic...
printf("~TopicWriter()\n");
// unpublish the participant info
ParticInfoSender::GetInstance()->UnregisterParticipant(_participant);
if( _writer >= 0 )
{
auto rc = dds_delete( _writer );
if (rc != DDS_RETCODE_OK)
DDS_FATAL("dds_delete(writer) %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
}
if( _topic >= 0 )
{
auto rc = dds_delete( _topic );
if (rc != DDS_RETCODE_OK)
DDS_FATAL("dds_delete(topic) %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
}
}
int TopicWriter::Write( void* sample, bool dispose )
{
//{
// printf("[DDS WRITE] topic=%s, sender=%s\n",
// _topicSettings->GetTopicName(),
// _participant->GetGuid()
// );
//}
if( dispose )
{
auto rc = dds_writedispose( _writer, sample );
if( rc != DDS_RETCODE_OK )
{
dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "dds writedispose", "dds writedispose %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
}
return rc;
}
else
{
auto rc = dds_write( _writer, sample );
if (rc != DDS_RETCODE_OK)
{
dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "dds write", "dds write %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
}
return rc;
}
}
int TopicWriter::WriteWithTimeStamp( void* sample, dds_time_t timeStamp, bool dispose )
{
if( dispose )
{
auto rc = dds_writedispose_ts( _writer, sample, timeStamp );
if (rc != DDS_RETCODE_OK)
{
dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "dds writedispose_ts", "dds writedispose_ts %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
}
return rc;
}
else
{
auto rc = dds_write_ts( _writer, sample, timeStamp );
if (rc != DDS_RETCODE_OK)
{
dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "dds write_ts", "dds write_ts %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
}
return rc;
}
}
dds_instance_handle_t TopicWriter::LookupInstance( void* sample )
{
return dds_lookup_instance(_writer, sample);
}
dds_instance_handle_t TopicWriter::RegisterInstance( void* sample )
{
dds_instance_handle_t handle;
dds_return_t rc = dds_register_instance( _writer, &handle, sample );
if( rc == DDS_RETCODE_OK )
return handle;
return 0;
}
dds_return_t TopicWriter::UnregisterInstance( void* sample )
{
return dds_unregister_instance( _writer, sample );
}
dds_return_t TopicWriter::UnregisterInstance( dds_instance_handle_t instanceHandle )
{
return dds_unregister_instance_ih( _writer, instanceHandle );
}
dds_return_t TopicWriter::WaitForPublicationMatched( int timeoutMsec )
{
auto ret = dds_set_status_mask( _writer, DDS_PUBLICATION_MATCHED_STATUS);
if( ret != DDS_RETCODE_OK )
return ret;
const int quantumMsec = 20;
int timeRemaining = timeoutMsec;
while( true )
{
uint32_t status;
ret = dds_get_status_changes (_writer, &status);
if( ret != DDS_RETCODE_OK )
return ret;
if (status == DDS_PUBLICATION_MATCHED_STATUS) {
break;
}
if( timeoutMsec >= 0 ) // do we wait for timeout?
{
if( timeRemaining <= 0 )
return DDS_RETCODE_TIMEOUT;
}
dds_sleepfor (DDS_MSECS (quantumMsec));
if( timeoutMsec >= 0 ) // do we wait for timeout?
{
timeRemaining -= quantumMsec;
if( timeRemaining <= 0 )
return DDS_RETCODE_TIMEOUT;
}
}
return DDS_RETCODE_OK;
}
};
Participant.cpp
#include <stdio.h>
#include "Participant.h"
#include "dds/dds.h"
#include "DdsGuid.h"
#include <ostream>
#include <sstream>
namespace Dds
{
Participant::Participant(const char* partitionName, int appDomainId, int appNodeId)
{
printf("Participant()\n");
if (partitionName)
{
_partitionNameSet = true;
_partitionName = partitionName;
}
_participant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL);
if (_participant < 0)
{
_error = -_participant;
dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "Participant::ctor", "dds_create_participant: %s\n", dds_strretcode(-_participant));
}
// get the guid
dds_guid_t nativeGuid;
auto guid_rc = dds_get_guid(_participant, &nativeGuid);
if (guid_rc < 0)
{
_error = -_participant;
dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "Participant::ctor", "dds_get_guid: %s\n", dds_strretcode(-guid_rc));
}
char guidStr[100]; dds_guid_to_string(nativeGuid, guidStr, sizeof(guidStr));
_guid = guidStr;
_appDomainId = appDomainId;
_appNodeId = appNodeId;
}
Participant::~Participant()
{
printf("~Participant() guid=%s domainId=%d nodeId=%d\n", _guid.c_str(), _appDomainId, _appNodeId);
if (_participant == 0) return;
/* Deleting the participant will delete all its children recursively as well. */
auto rc = dds_delete(_participant);
if (rc != DDS_RETCODE_OK)
DDS_FATAL("dds_delete(participant): %s\n", dds_strretcode(-rc));
}
};
this is our API for the C# layer
using Communication.CustomException;
using Communication.CustomFilter;
using Dds;
using Dds.Base;
using Microsoft.Extensions.Logging;
using System;
namespace Communication
{
public class DDS : Disposable, ICommunicationLayer
{
private int _readingPause;
private bool _changeCPUClockResolution;
private TopicDataManager _topicDataManager;
private TopicReader _topicReader;
private TopicWriter _topicWriter;
private ILogger _logger;
private bool _isDisposing;
/// <summary>
/// WARNING: Not thread safe!!! Topic samples delivered in static reader thread. One thread for App.
/// </summary>
/// <param name="provideSenderInfo">If true, SenderInfo will be provided in OnTopicChange</param>
public DDS()
{
_readingPause = 15;
_topicDataManager = new TopicDataManager();
_topicReader = new TopicReader(_topicDataManager, _readingPause);
_topicWriter = new TopicWriter(_topicDataManager);
}
public DDS(ILogger logger) : this()
{
_logger = logger;
_topicReader.Logger = _logger;
_topicWriter.Logger = _logger;
}
public DDS(bool changeCPUClockResolution) : this()
{
_changeCPUClockResolution = changeCPUClockResolution;
if (_changeCPUClockResolution)
{
Win32API.SetCPUClockResolution(_logger);
_readingPause = 1;
_topicReader.UpdateReadingPause(_readingPause);
}
}
public DDS(bool changeCPUClockResolution, bool provideSenderInfo) : this(changeCPUClockResolution)
{
_topicReader.ProvideSenderInfo = provideSenderInfo;
}
public DDS(bool changeCPUClockResolution, bool provideSenderInfo, ILogger logger) : this(changeCPUClockResolution, provideSenderInfo)
{
_logger = logger;
_topicReader.Logger = _logger;
_topicWriter.Logger = _logger;
}
/// <summary>
/// Connect correct writer nad reader with given topic.
/// </summary>
/// <typeparam name="T">TopicBase</typeparam>
/// <typeparam name="W">TopicWriter</typeparam>
/// <typeparam name="R">TopicReader</typeparam>
/// <exception cref="TopicNotRegisteredException">Throws if Topic is not registered yet.</exception>
public void RegisterTopic<T, W, R, S>()
where T : TopicBase, new()
where W : Dds.Base.TopicWriter, new()
where R : Dds.Base.TopicReader, new()
where S : Dds.Base.TopicSettings, new()
{
_topicDataManager.RegisterHandlersForTopic<T, W, R, S>();
}
public void Subscribe<T>(Action<T, SampleInfo> onTopicChange, Func<T, bool> filter) where T : TopicBase
{
Subscribe(onTopicChange, string.Empty, filter);
}
/// <summary>
/// Subscribe an action, with specific filter on topic data update.
/// An action is subscribed just once.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="onTopicChange"></param>
/// <param name="partition"></param>
/// <param name="filter">Null if empty.</param>
/// <exception cref="TopicRegistrationException">Throws if TopicWriter, or TopicReader are incorrect.</exception>
public void Subscribe<T>(Action<T, SampleInfo> onTopicChange, IFilterHandler<T> filter, string partition = "") where T : TopicBase
{
Subscribe(typeof(T), onTopicChange, filter, partition);
}
public void Subscribe<T>(Action<T, SampleInfo> onTopicChange, string partition = "", Func<T, bool> filter = null) where T : TopicBase
{
Subscribe(onTopicChange, new GenericFilterHandler<T>(filter), partition);
}
/// <summary>
/// Subscribe an action, with specific filter on topic data update.
/// An action is subscribed just once.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="topicType"></param>
/// <param name="onTopicChange"></param>
/// <param name="partition"></param>
/// <param name="filter">Null if empty.</param>
/// <exception cref="TopicRegistrationException">Throws if TopicWriter, or TopicReader are incorrect.</exception>
public void Subscribe<T>(Type topicType, Action<T, SampleInfo> onTopicChange, IFilterHandler<T> filter, string partition = "") where T : TopicBase
{
_topicReader.SubscribeTopic(topicType, (T sample, SampleInfo info, ParticInfo sender) => onTopicChange(sample, info), onTopicChange.Target, onTopicChange.Method.MetadataToken, partition, filter);
_logger?.LogDebug("Topic = {topicName}, method = {methodName} Subscribed", typeof(T).Name, nameof(onTopicChange));
}
public void Subscribe<T>(Type topicType, Action<T, SampleInfo, ParticInfo> onTopicChange, IFilterHandler<T> filter, string partition = "") where T : TopicBase
{
_topicReader.SubscribeTopic(topicType, onTopicChange, onTopicChange.Target, onTopicChange.Method.MetadataToken, partition, filter);
_logger?.LogDebug("Topic = {topicName}, method = {methodName} Subscribed", typeof(T).Name, nameof(onTopicChange));
}
/// <summary>
/// Subscribe an action, with specific filter on topic data update.
/// An action is subscribed just once.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="topicType"></param>
/// <param name="onTopicChange"></param>
/// <param name="partition"></param>
/// <param name="filter">Null if empty.</param>
/// <exception cref="TopicRegistrationException">Throws if TopicWriter, or TopicReader are incorrect.</exception>
public void Subscribe<T>(Type topicType, Action<T, SampleInfo> onTopicChange, string partition = "", Func<T, bool> filter = null) where T : TopicBase
{
Subscribe(topicType, onTopicChange, new GenericFilterHandler<T>(filter), partition);
}
/// <summary>
/// Remove subscription of an action on given topic.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="onTopicChange"></param>
/// <param name="partition"></param>
public void Unsubscribe<T>(Action<T, SampleInfo> onTopicChange, string partition = "") where T : TopicBase
{
Unsubscribe(typeof(T), onTopicChange, partition);
}
/// <summary>
/// Remove subscription of an action on given topic.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="topicType"></param>
/// <param name="onTopicChange"></param>
/// <param name="partition"></param>
public void Unsubscribe<T>(Type topicType, Action<T, SampleInfo> onTopicChange, string partition = "") where T : TopicBase
{
_topicReader.UnsubscribeTopic(topicType, (T sample, SampleInfo info, ParticInfo sender) => onTopicChange(sample, info), onTopicChange.Target, onTopicChange.Method.MetadataToken, partition);
_logger?.LogDebug("Topic = {topicName}, method = {methodName} Unsubscribed", typeof(T).Name, nameof(onTopicChange));
}
public void Unsubscribe<T>(Type topicType, Action<T, SampleInfo, ParticInfo> onTopicChange, string partition = "") where T : TopicBase
{
_topicReader.UnsubscribeTopic(topicType, onTopicChange, onTopicChange.Target, onTopicChange.Method.MetadataToken, partition);
_logger?.LogDebug("Topic = {topicName}, method = {methodName} Unsubscribed", typeof(T).Name, nameof(onTopicChange));
}
/// <summary>
/// Publish topic change.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="sample"></param>
/// <param name="partition"></param>
/// <exception cref="Exception">Thrown when return code is not Ok.</exception>
/// <exception cref="TopicNotRegisteredException">Throws when topic is not registered yet</exception>
public void Publish<T>(T sample, string partition = "") where T : TopicBase
{
Publish(sample.GetType(), sample, partition);
}
public void Publish(Type topicType, TopicBase sample, string partition = "")
{
ProcessPublish(topicType, sample, partition, false);
_logger?.LogTrace("Sample = {topicName}, partition = {partition} Published", topicType.Name, partition);
}
private void ProcessPublish(Type topicType, TopicBase sample, string partition, bool dispose)
{
if (_isDisposing)
throw new ObjectDisposedException("ICommunicationLayer; DDS");
var result = _topicWriter.PublishTopicChange(topicType, sample, dispose, partition);
if (result != ReturnCode.Ok)
{
string currentProcess = dispose ? "Dispose" : "Publish";
throw new Exception($"{currentProcess} of topic={sample.GetType().Name} failed with returnCode={result}");
}
}
/// <summary>
/// Dispose concrete topic sample.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="sample"></param>
/// <param name="partition"></param>
/// <exception cref="Exception">Throws if dispose failed</exception>
public void DisposeTopicSample<T>(T sample, string partition = "") where T : TopicBase
{
ProcessPublish(sample.GetType(), sample, partition, true);
_logger?.LogTrace("Sample = {topicName}, partition = {partition} Disposed", typeof(T).Name, partition);
}
/// <summary>
/// Dispose the internal dds objects (reading thread, participants...)
/// You need to call this explicitly when you no longer need to work with the DDS messages, latest at the end of the app.
/// WARNING: Not thread safe!!! DO NOT call from multiple tasks/threads at the same time!
/// </summary>
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (!disposing)
{
return;
}
_isDisposing = disposing;
_topicReader?.Dispose();
_topicWriter?.Dispose();
_topicDataManager?.Dispose();
if (_changeCPUClockResolution)
{
Win32API.ResetCPUClockResolution();
}
_logger?.LogDebug($"DDS communication Disposed");
}
/// <summary>
/// Check if a type already subscribed with specific action
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="topicType"></param>
/// <param name="onTopicChange"></param>
/// <param name="partition"></param>
public bool IsTopicSubscribedToAction<T>(Type topicType, Action<T, SampleInfo> onTopicChange, string partition = "") where T : TopicBase
{
return _topicReader.IsTopicSubscribedToAction(topicType, (T sample, SampleInfo info, ParticInfo sender) => onTopicChange(sample, info), onTopicChange.Target, onTopicChange.Method.MetadataToken, partition);
}
public void DisposeAllWrites()
{
_topicWriter.ResetAllWriters();
}
}
}
and here are some of the classes that may be relevant
TopicReader.cs
using Communication.CustomException;
using Dds.Base;
using Dds;
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.Logging;
using System.Collections.Immutable;
namespace Communication
{
internal class TopicReader : Disposable
{
private TopicDataManager _topicDataManager;
internal ParticInfoMonitor _particInfoMonitor;
internal bool ProvideSenderInfo
{
set
{
if (value)
{
_particInfoMonitor = new ParticInfoMonitor();
}
}
}
private ILogger _logger;
internal ILogger Logger
{
set
{
_logger = value;
}
}
internal TopicReader(TopicDataManager topicDataManager, int readingPause)
{
_topicDataManager = topicDataManager;
// note: Max one thread runs per application, stopped on TopicDataManager disposal
_topicDataManager.StartTopicReaderThreadIfNotAlreadyRunning(readingPause, ReadTopics);
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
_particInfoMonitor?.Dispose();
_particInfoMonitor = null;
}
}
/// <summary>
/// Polls incoming samples of all subscribed topics, for those passing the filter calls their action handlers
/// </summary>
protected void ReadTopics()
{
foreach (var topicCallbacks in _topicDataManager.GetActionDataCollection()) // iterate all subscriptions
{
ITopicHandler topicHandler = _topicDataManager.GetTopicHandlerOrDefault(topicCallbacks.Key);
if (topicHandler != null)
{
while(true) // read all msgs buffered in the dds reader
// Intentionally there is no limit on how many msgs to read per iteration.
// If we keep getting more messages than what we can process quickly enough,
// reading thread will freeze and no other msgs will be received,
// indicating the problem very clearly. Maybe better than if dying slowly,
// showing just increasing delays in msg reception.
{
var sample = topicHandler.Topic;
var sampleInfo = topicHandler.SampleInfo;
var result = topicHandler.Read(ref sample, ref sampleInfo);
if (result != ReturnCode.Ok)
break;
ParticInfo sender = null;
if (_particInfoMonitor != null)
{
string senderGuid = topicHandler.GetSenderGuid(sampleInfo.PublicationHandle);
sender = _particInfoMonitor.FindByGuid(senderGuid);
}
topicHandler.Topic = _topicDataManager.GetNewTopicInstance(topicHandler.TopicName);
foreach (var callbackData in topicCallbacks.Value)
{
if (callbackData.Filter(sample))
{
callbackData.OnTopicChange(sample, sampleInfo, sender);
}
}
}
}
}
}
internal void SubscribeTopic<T>(Type topicType, Action<T, SampleInfo, ParticInfo> OnTopicChange, object id1, int id2, string partition, IFilterHandler<T> filter) where T : TopicBase
{
var subscriptionKey = _topicDataManager.GetDataKey(topicType.FullName, partition);
InitializeTopicHandler(topicType, _topicDataManager.GetParticipant(partition), subscriptionKey);
SubscribeTopic(OnTopicChange, id1, id2, filter, subscriptionKey);
}
private void InitializeTopicHandler(Type topicType, Participant participant, string subscriptionKey)
{
if (TopicDataManager.SubscribedTopicHandlers.ContainsKey(subscriptionKey))
{
if (!TopicDataManager.TopicActionData.ContainsKey(subscriptionKey))
{
TopicDataManager.TopicActionData[subscriptionKey] = ImmutableArray.Create<IActionData>();
}
return;
}
if (_topicDataManager.TryGetTopicHandler(topicType, participant, out ITopicHandler topicHandler))
{
TopicDataManager.TopicActionData[subscriptionKey] = ImmutableArray.Create<IActionData>();
TopicDataManager.SubscribedTopicHandlers[subscriptionKey] = topicHandler;
TopicDataManager.ExplicitlyUnsubscribedKeys.TryRemove(subscriptionKey, out _);
}
else
throw new TopicNotRegisteredException($"TOPIC NOT REGISTERED - Subscribed topic={topicType.Name} has to be registered first.");
}
private void SubscribeTopic<T>(Action<T, SampleInfo, ParticInfo> onTopicChange, object id1, int id2, IFilterHandler<T> filter, string subscriptionKey) where T : TopicBase
{
IActionData subscribeData = new ActionData<T>(onTopicChange, id1, id2, filter);
if (
TopicDataManager.TopicActionData.TryGetValue(subscriptionKey, out var actions) &&
!IsSubscribed(subscribeData, actions)
)
{
TopicDataManager.TopicActionData[subscriptionKey] = actions.Add(subscribeData);
}
}
private bool IsSubscribed(IActionData actionData, IEnumerable<IActionData> actions)
{
var isSubscribed = actions.Any(x => actionData.Equals(x));
return isSubscribed;
}
internal void UnsubscribeTopic<T>(Type topicType, Action<T, SampleInfo, ParticInfo> onTopicChange, object id1, int id2, string partition) where T : TopicBase
{
var key = _topicDataManager.GetDataKey(topicType.FullName, partition);
if (TopicDataManager.TopicActionData.TryGetValue(key, out var actions))
{
TopicDataManager.TopicActionData[key] = UnsubscribeTopic(onTopicChange, id1, id2, key, actions);
if(TopicDataManager.TopicActionData[key].Count() == 0 )
{
if(TopicDataManager.TopicActionData.TryRemove(key, out _))
{
if(TopicDataManager.SubscribedTopicHandlers.TryRemove(key, out var topicHandler))
{
topicHandler.Dispose();
TopicDataManager.ExplicitlyUnsubscribedKeys.TryAdd(key, 0);
}
}
}
}
}
private ImmutableArray<IActionData> UnsubscribeTopic<T>(Action<T, SampleInfo, ParticInfo> onTopicChange, object id1, int id2, string key, ImmutableArray<IActionData> actions) where T : TopicBase
{
IActionData actionData = new ActionData<T>(onTopicChange, id1, id2);
var subscriptionData = actions.Where(x => actionData.Equals(x)).FirstOrDefault();
if (subscriptionData != null)
{
return actions.Remove(subscriptionData);
}
return actions;
}
internal bool IsTopicSubscribedToAction<T>(Type topicType, Action<T, SampleInfo, ParticInfo> onTopicChange, object id1, int id2, string partition) where T : TopicBase
{
var subscriptionKey = _topicDataManager.GetDataKey(topicType.FullName, partition);
IActionData subscribeData = new ActionData<T>(onTopicChange, id1, id2);
bool subscribed = false;
if (TopicDataManager.TopicActionData.TryGetValue(subscriptionKey, out var actions))
{
subscribed = IsSubscribed(subscribeData, actions);
}
return subscribed;
}
internal void UpdateReadingPause(int readingPause)
{
_topicDataManager.UpdateReadingPause(readingPause);
}
}
}
TopicDataManager.cs
using Dds;
using Dds.Base;
using System;
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Threading;
namespace Communication
{
/// <summary>
/// Data storage for information on subscribed/published topics, keyed by topicName+partitionName. No logic.
/// Data is static, shared shared by TopicReader & TopicWriter. Dispose() should be called in pair with the ctor
/// to disposed the shared static data when no one is using it anymore.
/// Maintains various info like
/// - instances od participant for each individual partition (one per partition)
/// - instances of internal readers and writers (one per topic type)
/// - actions to call on topic arrival and associated filter method
/// </summary>
public class TopicDataManager : Disposable
{
public static ConcurrentDictionary<string, ImmutableArray<IActionData>> TopicActionData => TopicData.TopicActionData;
public static ConcurrentDictionary<string, ITopicHandlerInitializer> TopicHandlerInitializers => TopicData.TopicHandlerInitializers;
public static ConcurrentDictionary<string, ITopicHandler> SubscribedTopicHandlers => TopicData.SubscribedTopicHandlers;
public static string CurrentParticipantPartition => _currentPartition;
public static Participant Participant; // single live participant
private static string _currentPartition = ""; // last partition used for _participant
public static ConcurrentDictionary<string, byte> ExplicitlyUnsubscribedKeys = new ConcurrentDictionary<string, byte>();
public TopicDataManager()
{
TopicData.AddRef();
// We found a bug that in some cases the first volative message sent is not being received correctly.
// The problem seems to be fixed by creating a participant (any partition) a few ms before actually publishing the message.
GetParticipant(string.Empty);
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (!disposing) return;
// note: TopicData is shared between TopicReader & TopicWriter, we never know who is still using it => disposes once refcnt drops to zero
TopicData.RemoveRef();
}
internal void RegisterHandlersForTopic<T, W, R, S>()
where T : TopicBase, new()
where W : Dds.Base.TopicWriter, new()
where R : Dds.Base.TopicReader, new()
where S : Dds.Base.TopicSettings, new()
{
var key = GetDataKey(typeof(T).FullName, string.Empty);
if (!TopicData.TopicHandlerInitializers.ContainsKey(key))
{
var topicBaseType = typeof(T).Name;
var topicWriter = typeof(W).Name;
var topicReader = typeof(R).Name;
var topicSettings = typeof(S).Name;
CheckCorrectTypes(topicBaseType, topicWriter, topicReader, topicSettings);
TopicData.TopicHandlerInitializers[key] = new TopicHandlerInitializer<T, W, R, S>();
}
}
public string GetDataKey<T>(string partition) where T : TopicBase
{
return GetDataKey(typeof(T).FullName, partition);
}
public string GetDataKey(string topicName, string partition)
{
if (string.IsNullOrEmpty(partition))
return $"{topicName}";
else
return $"{topicName}_{partition}";
}
private void CheckCorrectTypes(string topicName, string writerName, string readerName, string settingsName)
{
string exceptionMessage = string.Empty;
bool correctWriterType = writerName.StartsWith(topicName);
if (!correctWriterType)
exceptionMessage = $"Incorrect TopicWriter type={writerName}, for topic={topicName}";
bool correctReaderType = readerName.StartsWith(topicName);
if (!correctReaderType)
exceptionMessage = $"{exceptionMessage}\n Incorrect TopicReader type={readerName}, for topic={topicName}";
bool correctSettingsType = settingsName.StartsWith(topicName);
if (!correctSettingsType)
exceptionMessage = $"{exceptionMessage}\n Incorrect TopicSettings type={settingsName}, for topic={topicName}";
if (!correctWriterType || !correctReaderType || !correctSettingsType)
throw new Exception(exceptionMessage);
}
internal ITopicHandler GetTopicHandlerOrDefault<T>(T sample, string partition) where T : TopicBase
{
var key = GetDataKey(sample.GetType().FullName, partition);
return GetTopicHandlerOrDefault(key);
}
internal ITopicHandler GetTopicHandlerOrDefault(string TopicHandlerKey)
{
if (TopicData.SubscribedTopicHandlers.TryGetValue(TopicHandlerKey, out var handler))
return handler;
return null;
}
// Disposes ONLY handlers that were explicitly unsubscribed by the application
// and that belong to the specified partition.
private void DisposeHandlersForPartition_ExplicitOnly(string partition)
{
// keys look like:
// empty partition: "Topic"
// non-empty: "Topic_<partition>"
string suffix = string.IsNullOrEmpty(partition) ? null : "_" + partition;
foreach (var kv in TopicData.SubscribedTopicHandlers)
{
string key = kv.Key;
// Must be explicitly unsubscribed:
if (!ExplicitlyUnsubscribedKeys.ContainsKey(key))
continue;
// Must belong to the old partition we are leaving:
bool match =
suffix == null
? !key.Contains("_") // empty partition keys have no suffix
: key.EndsWith(suffix, StringComparison.Ordinal);
if (!match)
continue;
if (TopicData.SubscribedTopicHandlers.TryRemove(key, out var handler))
{
try { handler.Dispose(); } catch { /* keep current behavior; swallow or log as you already do */ }
}
}
}
internal Participant GetParticipant(string partition)
{
if (partition == null)
{
partition = string.Empty;
}
// If we already have a participant for this exact partition, reuse it
if (Participant != null && string.Equals(_currentPartition, partition, StringComparison.Ordinal))
{
return Participant;
}
// If there is an old participant (for a different partition), dispose it first
if (Participant != null)
{
DisposeHandlersForPartition_ExplicitOnly(_currentPartition);
try
{
Participant.Dispose(); // IMPORTANT: this should delete contained entities as your wrapper already does
}
catch
{
// swallow to keep behavior robust; logging is ok if you prefer
}
Participant = null;
}
// Create a fresh participant for the requested partition
Participant = new Participant(partition);
// Remember which partition this participant represents
_currentPartition = partition;
return Participant;
}
internal TopicBase GetNewTopicInstance(string topicName)
{
return TopicData.TopicHandlerInitializers[topicName].GetTopicInstance();
}
internal ConcurrentDictionary<string, ImmutableArray<IActionData>> GetActionDataCollection()
{
return TopicData.TopicActionData;
}
public bool IsRegistered(Type TopicType)
{
var key = GetDataKey(TopicType.FullName, string.Empty);
return TopicData.TopicHandlerInitializers.ContainsKey(key);
}
public ITopicHandler InitializeTopicHandler(Type topicType, Participant participant)
{
if (TryGetTopicHandler(topicType, participant, out ITopicHandler topicHandler))
{
var topicHandlerKey = GetDataKey(topicType.FullName, participant.DefaultPartitionName);
TopicData.SubscribedTopicHandlers[topicHandlerKey] = topicHandler;
return topicHandler;
}
else
{
throw new Exception($"InitializeTopicHandler for {topicType.Name} and partition ={participant.DefaultPartitionName} FAILED.");
}
}
internal bool TryGetTopicHandler(Type topicType, Participant participant, out ITopicHandler newTopicHandler)
{
newTopicHandler = null;
var registrationKey = GetDataKey(topicType.FullName, string.Empty);
if (TopicData.TopicHandlerInitializers.TryGetValue(registrationKey, out var topicInitializer))
{
newTopicHandler = topicInitializer.InitializeTopicHandler(participant);
return true;
}
return false;
}
/// <summary>
/// Starts the thread periodically reading incoming topics.
/// Only one thread shall run per application, only first request is accepted, successive requests (coming from another instances od DDS class) are ignored.
/// </summary>
/// <param name="readingPause">number of msecs to sleep between ticks of the reading thread</param>
/// <param name="topicReaderThreadTickCallback">delegate that should read the topics</param>
internal void StartTopicReaderThreadIfNotAlreadyRunning(int readingPause, Action topicReaderThreadTickCallback)
{
TopicData.StartReaderThreadIfNotAlreadyRunning(readingPause, topicReaderThreadTickCallback);
}
internal void UpdateReadingPause(int readingPause)
{
TopicData.ReadingPauseMsec = readingPause;
}
/// <summary>
/// Globally shared topic data including the one and only topic reading thread per app.
/// Reference counting based disposal only when no one uses it anymore.
///
/// </summary>
private static class TopicData
{
/// <summary>
/// ActionData added each subscribe
/// </summary>
internal static ConcurrentDictionary<string, ImmutableArray<IActionData>> TopicActionData { get; }
internal static ConcurrentDictionary<string, ITopicHandlerInitializer> TopicHandlerInitializers { get; }
internal static ConcurrentDictionary<string, ITopicHandler> SubscribedTopicHandlers { get; }
/// <summary> One per partition </summary>
internal static ConcurrentDictionary<string, Participant> Participants { get; }
internal static int ReadingPauseMsec { get; set; }
// The one and only topic reading thread per app
// Managed here (and not in the TopicReader) to ensure stopping it on the last Dispose (when no one needs it anymore).
private static Thread _readerThread;
private static volatile bool _stopRequired;
private static Action _topicReaderThreadTickCallback;
private static int _refCount;
static TopicData()
{
_refCount = 0;
TopicActionData = new ConcurrentDictionary<string, ImmutableArray<IActionData>>();
TopicHandlerInitializers = new ConcurrentDictionary<string, ITopicHandlerInitializer>();
SubscribedTopicHandlers = new ConcurrentDictionary<string, ITopicHandler>();
Participants = new ConcurrentDictionary<string, Participant>();
}
/// <summary>
/// This MUST be called from the ctor of anyone using the TopicData
/// NOT thread safe!
/// </summary>
internal static void AddRef()
{
_refCount++;
}
/// <summary>
/// This MUST be called from the dispose handler of anyone using the TopicData
/// NOT thread safe!
/// </summary>
internal static void RemoveRef()
{
if (_refCount <= 0) throw new Exception("Trying to derefence TopicData which were already deleted.");
_refCount--;
// clear and dispose only if no one else is using the data anymore
if (_refCount > 0) return;
Clear();
}
// Do not Clear TopicHandlerInitializers, registration is processed once at start of application.
private static void Clear()
{
StopReaderThread();
TopicActionData.Clear();
foreach (var subscribedTopicHandler in SubscribedTopicHandlers.Values)
subscribedTopicHandler.Dispose(); // properly dispose the low level dds readers/writes
SubscribedTopicHandlers.Clear();
foreach (var participant in Participants.Values)
participant.Dispose(); // properly dispose the low level dds participant
Participants.Clear();
}
/// <summary>
/// Starts the thread if not already existing. Periodically calls given callback until the thread is stopped.
/// </summary>
internal static void StartReaderThreadIfNotAlreadyRunning(int readingPauseMsec, Action topicReaderThreadTickCallback)
{
if (_topicReaderThreadTickCallback != null)
return; // topic reading already requested, this must come from ctor of another instance of DDS class
StartReaderThread(readingPauseMsec, topicReaderThreadTickCallback);
}
private static void StartReaderThread(int readingPauseMsec, Action topicReaderThreadTickCallback)
{
_topicReaderThreadTickCallback = topicReaderThreadTickCallback;
ReadingPauseMsec = readingPauseMsec;
_readerThread = new Thread(delegate () { TopicReaderLoop(); })
{
Priority = ThreadPriority.Normal,
Name = "Topic_Reader_Thread",
IsBackground = true
};
_readerThread.Start();
}
private static void StopReaderThread()
{
if (_readerThread != null)
{
_stopRequired = true;
_readerThread.Join();
_readerThread = null;
_topicReaderThreadTickCallback = null;
}
}
private static void TopicReaderLoop()
{
while (!_stopRequired)
{
_topicReaderThreadTickCallback(); // typically pointing to TopicReader.ReadTopics();
Thread.Sleep(ReadingPauseMsec);
}
System.Diagnostics.Debug.WriteLine("TopicReaderThread stopped.");
_stopRequired = false;
}
}
}
}
TopicHandler.cs
using Dds;
using Dds.Base;
namespace Communication
{
internal class TopicHandler<T, W, R, S> : ITopicHandler
where T : TopicBase, new()
where W : Dds.Base.TopicWriter, new()
where R : Dds.Base.TopicReader, new()
where S : Dds.Base.TopicSettings, new()
{
public TopicBase Topic
{
get => _topic;
set
{
if (value is T newTopic)
_topic = newTopic;
}
}
public string TopicName { get; }
private T _topic;
private W _topicWriter;
private R _topicReader;
private S _topicSettings;
private SampleInfo _sampleInfo;
public SampleInfo SampleInfo => _sampleInfo;
internal TopicHandler(T topic, W topicWriter, R topicReader, S topicSettings)
{
_topic = topic;
_topicWriter = topicWriter;
_topicReader = topicReader;
_topicSettings = topicSettings;
_sampleInfo = new SampleInfo();
TopicName = typeof(T).FullName;
}
public ReturnCode Publish(TopicBase sample, bool dispose = false)
{
if (_topicWriter.IsDisposed)
{
return ReturnCode.AlreadyDeleted;
}
else
{
return _topicWriter.Write(sample, dispose);
}
}
public ReturnCode Read(ref TopicBase sample, ref SampleInfo sampleInfo)
{
if (_topicReader.IsDisposed)
{
return ReturnCode.AlreadyDeleted;
}
else
{
return _topicReader.Take(ref sample, ref sampleInfo);
}
}
public string GetSenderGuid( InstanceHandle publicationHandle )
{
return _topicReader.GetSenderGuid( publicationHandle );
}
public void ResetWriter()
{
if (_topicWriter != null && !_topicWriter.IsDisposed)
{
var participant = _topicWriter.Participant;
_topicWriter.Dispose();
_topicWriter = new W();
_topicWriter.Initialize(participant);
}
}
public void Dispose()
{
_topicReader?.Dispose();
_topicWriter?.Dispose();
}
}
}
do you have any suggestions on what or where our problem is? how can we handle those network and memory bursts?
Thanks