Skip to content

Network burst on multiple participants, partitions and computers #2306

@erezdamari-bagira

Description

@erezdamari-bagira

Hi,

in our system we use CycloneDDS implementation, commit hash - 16928eb.

Above the cyclone implementation we have our own cpp layer and above this we C# layer
In the system we ENV with around 100 computers and ~500 processes that uses DDS.

during the system run we create 20 partition - each partition has a round 20-25 processes
we experiencing the following issues:

  1. on system star - we have some application that responsible to launch all applications and have watchdog on them to restart them if they crush - when we start all our applications we have a huge network burst on all out applications of around 50-100 MB/s
  2. then on every time we create a new partition (notifying the relevant applications to subscribe to a specific partition - nothing was happening on this partition before), we have the same burst of network
  3. after we have 15-16 partitions, the network use stays high - around ~50MB/s and even more on every app that uses DDS
  4. after we new partition we load a scenario on the system - basically start using the system and sending DDS samples - in one of the application, the memory use keep rising up and up - in some cases it riches to 45 GB - sometimes it stays steady on 1-2 GB of RAM

we tried changing the domain id, we made sure we dispose Readers and Writers on partition change

this is our cyclonedds.xml

default 65500B 500kB severe config cyclonedds.log
	    <Partitioning>
  <NetworkPartitions>
    <!-- Empty DCPS partition stays on Cyclone's default multicast -->
    <NetworkPartition name="ex_empty" Address="239.255.0.1"/>

    <!-- One multicast group per exercise partition, starting at 239.255.0.2 -->
    <NetworkPartition name="ex1"  Address="239.255.0.2"/>
    <NetworkPartition name="ex2"  Address="239.255.0.3"/>
    <NetworkPartition name="ex3"  Address="239.255.0.4"/>
    <NetworkPartition name="ex4"  Address="239.255.0.5"/>
    <NetworkPartition name="ex5"  Address="239.255.0.6"/>
    <NetworkPartition name="ex6"  Address="239.255.0.7"/>
    <NetworkPartition name="ex7"  Address="239.255.0.8"/>
    <NetworkPartition name="ex8"  Address="239.255.0.9"/>
    <NetworkPartition name="ex9"  Address="239.255.0.10"/>
    <NetworkPartition name="ex10" Address="239.255.0.11"/>
    <NetworkPartition name="ex11" Address="239.255.0.12"/>
    <NetworkPartition name="ex12" Address="239.255.0.13"/>
    <NetworkPartition name="ex13" Address="239.255.0.14"/>
    <NetworkPartition name="ex14" Address="239.255.0.15"/>
    <NetworkPartition name="ex15" Address="239.255.0.16"/>
    <NetworkPartition name="ex16" Address="239.255.0.17"/>
    <NetworkPartition name="ex17" Address="239.255.0.18"/>
    <NetworkPartition name="ex18" Address="239.255.0.19"/>
    <NetworkPartition name="ex19" Address="239.255.0.20"/>
    <NetworkPartition name="ex20" Address="239.255.0.21"/>
    <NetworkPartition name="ex21" Address="239.255.0.22"/>
    <NetworkPartition name="ex22" Address="239.255.0.23"/>
    <NetworkPartition name="ex23" Address="239.255.0.24"/>
    <NetworkPartition name="ex24" Address="239.255.0.25"/>
    <NetworkPartition name="ex25" Address="239.255.0.26"/>
  </NetworkPartitions>

  <PartitionMappings>
    <!-- Specific partitions 1..25 -->
    <PartitionMapping DCPSPartitionTopic="1.*"  NetworkPartition="ex1"/>
    <PartitionMapping DCPSPartitionTopic="2.*"  NetworkPartition="ex2"/>
    <PartitionMapping DCPSPartitionTopic="3.*"  NetworkPartition="ex3"/>
    <PartitionMapping DCPSPartitionTopic="4.*"  NetworkPartition="ex4"/>
    <PartitionMapping DCPSPartitionTopic="5.*"  NetworkPartition="ex5"/>
    <PartitionMapping DCPSPartitionTopic="6.*"  NetworkPartition="ex6"/>
    <PartitionMapping DCPSPartitionTopic="7.*"  NetworkPartition="ex7"/>
    <PartitionMapping DCPSPartitionTopic="8.*"  NetworkPartition="ex8"/>
    <PartitionMapping DCPSPartitionTopic="9.*"  NetworkPartition="ex9"/>
    <PartitionMapping DCPSPartitionTopic="10.*" NetworkPartition="ex10"/>
    <PartitionMapping DCPSPartitionTopic="11.*" NetworkPartition="ex11"/>
    <PartitionMapping DCPSPartitionTopic="12.*" NetworkPartition="ex12"/>
    <PartitionMapping DCPSPartitionTopic="13.*" NetworkPartition="ex13"/>
    <PartitionMapping DCPSPartitionTopic="14.*" NetworkPartition="ex14"/>
    <PartitionMapping DCPSPartitionTopic="15.*" NetworkPartition="ex15"/>
    <PartitionMapping DCPSPartitionTopic="16.*" NetworkPartition="ex16"/>
    <PartitionMapping DCPSPartitionTopic="17.*" NetworkPartition="ex17"/>
    <PartitionMapping DCPSPartitionTopic="18.*" NetworkPartition="ex18"/>
    <PartitionMapping DCPSPartitionTopic="19.*" NetworkPartition="ex19"/>
    <PartitionMapping DCPSPartitionTopic="20.*" NetworkPartition="ex20"/>
    <PartitionMapping DCPSPartitionTopic="21.*" NetworkPartition="ex21"/>
    <PartitionMapping DCPSPartitionTopic="22.*" NetworkPartition="ex22"/>
    <PartitionMapping DCPSPartitionTopic="23.*" NetworkPartition="ex23"/>
    <PartitionMapping DCPSPartitionTopic="24.*" NetworkPartition="ex24"/>
    <PartitionMapping DCPSPartitionTopic="25.*" NetworkPartition="ex25"/>

    <!-- Catch-all for EMPTY DCPS partition (must be last) -->
    <PartitionMapping DCPSPartitionTopic=".*" NetworkPartition="ex_empty"/>
  </PartitionMappings>
</Partitioning>
</Domain>

this is our TopicReade.cpp file

#include <stdio.h>
#include "TopicReader.h"
#include "DataListener.h"
#include <memory.h>
#include "dds/dds.h"
#include "DdsGuid.h"

namespace Dds::Base
{

TopicReader::TopicReader( Dds::Ptr<Participant> participant, TTopicSettings* topicSettings, DataListener* listener, const char* partitionName )
{
	//printf("TopicReader()\n");

	_topicSettings = topicSettings;
	_participant = participant;

	_topic = dds_create_topic( _participant->GetNativeHandle(), topicSettings->descriptor, topicSettings->name, NULL, NULL);
	if( _topic < 0 )
	    DDS_FATAL("dds_create_topic %s: %s\n", topicSettings->name, dds_strretcode(-_topic));

	dds_qos_t *qos = dds_create_qos ();
	dds_qset_reliability( qos, topicSettings->reliability.kind, topicSettings->reliability.blocking_time );
	dds_qset_durability( qos, topicSettings->durability.kind );
	dds_qset_history( qos, topicSettings->history.kind, topicSettings->history.depth );

	const char* finalPartitionName = partitionName;
	if( !finalPartitionName ) finalPartitionName = participant->GetDefaultPartitionName();
	if( finalPartitionName )
	{
		dds_qset_partition1( qos, finalPartitionName );
	}

	_reader = dds_create_reader( _participant->GetNativeHandle(), _topic, qos, NULL );
	if (_reader < 0)
		DDS_FATAL("dds_create_reader %s: %s\n", topicSettings->name, dds_strretcode(-_reader));

    dds_delete_qos(qos);

	SetListener( listener );
}

TopicReader::~TopicReader()
{
	// dispose writer
	// dispose topic...
	printf("~TopicReader()\n");
	
	// unregister potential listener
	dds_set_listener( _reader, NULL );

	// free the sample buffer
	free( _samples );

	// delete reader & topic handles
	if( _reader >= 0 )
	{
		auto rc = dds_delete( _reader );
		if (rc != DDS_RETCODE_OK)
			DDS_FATAL("dds_delete(reader) %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
	}

	if( _topic >= 0 )
	{
		auto rc = dds_delete( _topic );
		if (rc != DDS_RETCODE_OK)
			DDS_FATAL("dds_delete(topic): %s %s\n", _topicSettings->name, dds_strretcode(-rc));
	}
}

void TopicReader::ReturnLoan()
{
	if( _numSamplesReturned == 0 )
		return;

	void* sample = _samples;
	for( int i=0; i < _numSamplesReturned; i++ )
	{
		dds_sample_free( sample, _topicSettings->descriptor, DDS_FREE_CONTENTS );

		sample = (void*)((uint8_t*)sample + _topicSettings->descriptor->m_size);
	}

	// zero the used part of the sample buffer to make it ready for next GetSamples()
	memset( _samples, 0, _numSamplesReturned * _topicSettings->descriptor->m_size );

	_numSamplesReturned = 0;
}

int TopicReader::ReserveSamples( int minCapacity )
{
	if( minCapacity > _samplesCapacity ) // the buffer only grows
	{
		void* newBuf = realloc( _samples, minCapacity * _topicSettings->descriptor->m_size );
		if( newBuf )
		{
			// zero the extra allocated memory to make it ready for the GetSamples()
			memset(
				(uint8_t*)newBuf + _samplesCapacity * _topicSettings->descriptor->m_size,
				0,
				(minCapacity - _samplesCapacity) * _topicSettings->descriptor->m_size
			);

			_samplesCapacity = minCapacity;
			_samples = newBuf;
		}
	}
	return _samplesCapacity;
}

dds_return_t TopicReader::GetSamples(
	EAction action,
    void** samples, // [out] preallocated array of sample pointers (contained pointers can be null, will be replaced with pointers to dds-allocated memory)
    dds_sample_info_t* sampleInfos, // [out] preallocated array of SampleInfo structures
    int maxSamples,
    dds_instance_handle_t instanceHandle,
    uint32_t sampleStateMask
	)
{
	// free previously taken samples (if any)
	// and zero the used part of the sample buffer (mandatory - dds_read/take expects it !)
	ReturnLoan();

	// limit the request to the available memory we can get
	maxSamples = std::min( maxSamples, ReserveSamples( maxSamples ) );

	if ( maxSamples <= 0 )
		return DDS_RETCODE_ERROR;  // either bad parameter or failed to allocate buffer for samples

	// return the pointers to our internal buffer
	for(int i=0; i < maxSamples; i++ ) 
		samples[i] = (void*)((uint8_t*)_samples + i*_topicSettings->descriptor->m_size);

	// No need for zeroing here as the sample bufer is guaranteed to be already zeroed by ReturnLoan & ReserveSamples
	//// zero the sample memory so that dds does not try to reuse any dynamic structures
	//memset( _samples, 0, maxSamples * _topicSettings->descriptor->m_size );

	if( sampleStateMask == 0 )
		sampleStateMask = DDS_ANY_STATE;

	dds_return_t rc;
	switch( action )
	{
		case EAction::Read:
			rc = dds_read_mask( _reader, samples, sampleInfos, maxSamples, maxSamples, sampleStateMask );
			break;

		case EAction::Take:
			rc = dds_take_mask( _reader, samples, sampleInfos, maxSamples, maxSamples, sampleStateMask );
			break;

		case EAction::ReadInstance:
			rc = dds_read_instance_mask( _reader, samples, sampleInfos, maxSamples, maxSamples, instanceHandle, sampleStateMask );
			break;

		case EAction::TakeInstance:
			rc = dds_take_instance_mask( _reader, samples, sampleInfos, maxSamples, maxSamples, instanceHandle, sampleStateMask );
			break;

		default:
			rc = DDS_RETCODE_UNSUPPORTED;
			break;
	}

    if (rc < 0)
	{
		if ( rc != DDS_RETCODE_PRECONDITION_NOT_MET || action == EAction::Read || action == EAction::Take )
		{  
			// read/take_instance before writer is matched results DDS_RETCODE_PRECONDITION_NOT_MET.
			// However, this is a normal use case and the error stops once a writer is matched.
			dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "dds read/take", "dds read/take %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
		}
	}
	else
	{
		_numSamplesReturned = rc;

		//{
		//	for(int i=0; i < rc; i++ ) 
		//	{
		//		auto* info = sampleInfos + i;
		//		printf("[DDS READ]  topic=%s, sender=%s\n",
		//			GetTopicSettings()->GetTopicName(),
		//			GetSenderGuid(info->publication_handle).c_str()
		//		);
		//	}
		//}
	}

	return rc;
}

dds_instance_handle_t TopicReader::LookupInstance( void* sample )
{
	return dds_lookup_instance( _reader, sample );
}

void TopicReader::SetListener( DataListener* listener )
{
	if( listener ) listener->SetReader( this );

	dds_listener_t* ddsList = listener ? listener->GetNativeHandle() : NULL;

	dds_set_listener( _reader, ddsList );
}

int TopicReader::GetSenderGuid(dds_instance_handle_t publicationHandle, char* buf, int bufsize)
{
	dds_builtintopic_endpoint_t* publData = dds_get_matched_publication_data(_reader, publicationHandle);
	if (!publData)
	{
		dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "dds_get_matched_publication_data", "dds_get_matched_publication_data could not be found from publication handle %0llx in reader of %s\n", publicationHandle, _topicSettings->name);
		return 0;
	}

	dds_guid_t senderGuid = publData->participant_key;
	int res = dds_guid_to_string(senderGuid, buf, bufsize );

	dds_free(publData); publData = nullptr;

	return res;
}


std::string TopicReader::GetSenderGuid( dds_instance_handle_t publicationHandle )
{
	char guidStr[100] = {0};
	GetSenderGuid( publicationHandle, guidStr, sizeof(guidStr) );
	return guidStr;
}


dds_return_t TopicReader::WaitForSubscriptionMatched(int timeoutMsec)
{
	auto ret = dds_set_status_mask(_reader, DDS_SUBSCRIPTION_MATCHED_STATUS);
	if (ret != DDS_RETCODE_OK)
		return ret;

	const int quantumMsec = 20;
	int timeRemaining = timeoutMsec;
	while (true)
	{
		uint32_t status;
		ret = dds_get_status_changes(_reader, &status);
		if (ret != DDS_RETCODE_OK)
			return ret;

		if (status == DDS_SUBSCRIPTION_MATCHED_STATUS) {
			break;
		}

		if (timeoutMsec >= 0) // do we wait for timeout?
		{
			if (timeRemaining <= 0)
				return DDS_RETCODE_TIMEOUT;
		}

		dds_sleepfor(DDS_MSECS(quantumMsec));

		if (timeoutMsec >= 0) // do we wait for timeout?
		{
			timeRemaining -= quantumMsec;

			if (timeRemaining <= 0)
				return DDS_RETCODE_TIMEOUT;
		}

	}
	return DDS_RETCODE_OK;
}

}

TopicWriter.cpp

#include <stdio.h>
#include "TopicWriter.h"
#include "dds/dds.h"
#include "SenderId/ParticInfoSender.h"

namespace Dds::Base
{

TopicWriter::TopicWriter( Participant* participant, TTopicSettings* topicSettings, const char* partitionName )
{
	//printf("TopicWriter()\n");

	_topicSettings = topicSettings;
	_participant = participant;

	_topic = dds_create_topic( _participant->GetNativeHandle(), topicSettings->descriptor, topicSettings->name, NULL, NULL);
	if( _topic < 0 )
	    DDS_FATAL("dds_create_topic %s: %s\n", topicSettings->name, dds_strretcode(-_topic));

	dds_qos_t *qos = dds_create_qos ();
	dds_qset_reliability( qos, topicSettings->reliability.kind, topicSettings->reliability.blocking_time );
	dds_qset_durability( qos, topicSettings->durability.kind );
	dds_qset_history( qos, topicSettings->history.kind, topicSettings->history.depth );
	dds_qset_writer_data_lifecycle( qos, topicSettings->writer_data_lifecycle.autodispose );

	const char* finalPartitionName = partitionName;
	if( !finalPartitionName ) finalPartitionName = participant->GetDefaultPartitionName();
	if( finalPartitionName )
	{
		dds_qset_partition1( qos, finalPartitionName );
	}

	_writer = dds_create_writer( _participant->GetNativeHandle(), _topic, qos, NULL );
	if (_writer < 0)
		DDS_FATAL("dds_create_writer %s: %s\n", topicSettings->name, dds_strretcode(-_writer));

    dds_delete_qos(qos);

	// publish the participant info
	ParticInfoSender::GetInstance()->RegisterParticipant( _participant );
}

TopicWriter::~TopicWriter()
{
	// dispose writer
	// dispose topic...
	printf("~TopicWriter()\n");

	// unpublish the participant info
	ParticInfoSender::GetInstance()->UnregisterParticipant(_participant);

	if( _writer >= 0 )
	{
		auto rc = dds_delete( _writer );
		if (rc != DDS_RETCODE_OK)
			DDS_FATAL("dds_delete(writer) %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
	}

	if( _topic >= 0 )
	{
		auto rc = dds_delete( _topic );
		if (rc != DDS_RETCODE_OK)
			DDS_FATAL("dds_delete(topic) %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
	}
}

int TopicWriter::Write( void* sample, bool dispose )
{
	//{
	//	printf("[DDS WRITE] topic=%s, sender=%s\n",
	//		_topicSettings->GetTopicName(),
	//		_participant->GetGuid()
	//	);
	//}

	if( dispose )
	{
		auto rc = dds_writedispose( _writer, sample );
		if( rc != DDS_RETCODE_OK )
		{
			dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "dds writedispose", "dds writedispose %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
		}
		return rc;
	}
	else
	{
		auto rc = dds_write( _writer, sample );
		if (rc != DDS_RETCODE_OK)
		{
			dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "dds write", "dds write %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
		}
		return rc;
	}
}

int TopicWriter::WriteWithTimeStamp( void* sample, dds_time_t timeStamp, bool dispose )
{
	if( dispose )
	{
		auto rc = dds_writedispose_ts( _writer, sample, timeStamp );
		if (rc != DDS_RETCODE_OK)
		{
			dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "dds writedispose_ts", "dds writedispose_ts %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
		}
		return rc;
	}
	else
	{
		auto rc = dds_write_ts( _writer, sample, timeStamp );
		if (rc != DDS_RETCODE_OK)
		{
			dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "dds write_ts", "dds write_ts %s: %s\n", _topicSettings->name, dds_strretcode(-rc));
		}
		return rc;
	}
}

dds_instance_handle_t TopicWriter::LookupInstance( void* sample )
{
	return dds_lookup_instance(_writer, sample);
}

dds_instance_handle_t TopicWriter::RegisterInstance( void* sample )
{
	dds_instance_handle_t handle;

	dds_return_t rc = dds_register_instance( _writer, &handle, sample );
	if( rc == DDS_RETCODE_OK )
		return handle;
	return 0;
}


dds_return_t TopicWriter::UnregisterInstance( void* sample )
{
	return dds_unregister_instance( _writer, sample );
}

dds_return_t TopicWriter::UnregisterInstance( dds_instance_handle_t instanceHandle )
{
	return dds_unregister_instance_ih( _writer, instanceHandle );
}

dds_return_t TopicWriter::WaitForPublicationMatched( int timeoutMsec )
{
	auto ret = dds_set_status_mask( _writer, DDS_PUBLICATION_MATCHED_STATUS);
	if( ret != DDS_RETCODE_OK )
		return ret;
	
	const int quantumMsec = 20;
	int timeRemaining = timeoutMsec;
	while( true )
	{
		uint32_t status;
		ret = dds_get_status_changes (_writer, &status);
		if( ret != DDS_RETCODE_OK )
			return ret;
	
		if (status == DDS_PUBLICATION_MATCHED_STATUS) {
			break;
		}
		
		if( timeoutMsec >= 0 ) // do we wait for timeout?
		{
			if( timeRemaining <= 0 )
				return DDS_RETCODE_TIMEOUT;
		}

		dds_sleepfor (DDS_MSECS (quantumMsec));

		if( timeoutMsec >= 0 ) // do we wait for timeout?
		{
			timeRemaining -= quantumMsec;

			if( timeRemaining <= 0 )
				return DDS_RETCODE_TIMEOUT;
		}

	}
	return DDS_RETCODE_OK;
}


};

Participant.cpp

#include <stdio.h>
#include "Participant.h"
#include "dds/dds.h"
#include "DdsGuid.h"
#include <ostream>
#include <sstream>

namespace Dds
{

	Participant::Participant(const char* partitionName, int appDomainId, int appNodeId)
	{
		printf("Participant()\n");

		if (partitionName)
		{
			_partitionNameSet = true;
			_partitionName = partitionName;
		}

		_participant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL);
		if (_participant < 0)
		{
			_error = -_participant;
			dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "Participant::ctor", "dds_create_participant: %s\n", dds_strretcode(-_participant));
		}

		// get the guid
		dds_guid_t nativeGuid;
		auto guid_rc = dds_get_guid(_participant, &nativeGuid);
		if (guid_rc < 0)
		{
			_error = -_participant;
			dds_log(DDS_LC_ERROR, __FILE__, __LINE__, "Participant::ctor", "dds_get_guid: %s\n", dds_strretcode(-guid_rc));
		}

		char guidStr[100]; dds_guid_to_string(nativeGuid, guidStr, sizeof(guidStr));
		_guid = guidStr;

		_appDomainId = appDomainId;
		_appNodeId = appNodeId;
	}

	Participant::~Participant()
	{
		printf("~Participant() guid=%s domainId=%d nodeId=%d\n", _guid.c_str(), _appDomainId, _appNodeId);

		if (_participant == 0) return;

		/* Deleting the participant will delete all its children recursively as well. */
		auto rc = dds_delete(_participant);
		if (rc != DDS_RETCODE_OK)
			DDS_FATAL("dds_delete(participant): %s\n", dds_strretcode(-rc));
	}
};

this is our API for the C# layer

using Communication.CustomException;
using Communication.CustomFilter;
using Dds;
using Dds.Base;
using Microsoft.Extensions.Logging;
using System;

namespace Communication
{
    public class DDS : Disposable, ICommunicationLayer
    {
        private int _readingPause;
        private bool _changeCPUClockResolution;
        private TopicDataManager _topicDataManager;
        private TopicReader _topicReader;
        private TopicWriter _topicWriter;
        private ILogger _logger;
        private bool _isDisposing;

        /// <summary>
        /// WARNING: Not thread safe!!! Topic samples delivered in static reader thread. One thread for App.
        /// </summary>
        /// <param name="provideSenderInfo">If true, SenderInfo will be provided in OnTopicChange</param>
        public DDS()
        {
            _readingPause = 15;
            _topicDataManager = new TopicDataManager();
            _topicReader = new TopicReader(_topicDataManager, _readingPause);
            _topicWriter = new TopicWriter(_topicDataManager);
        }

        public DDS(ILogger logger) : this()
        {
            _logger = logger;
            _topicReader.Logger = _logger;
            _topicWriter.Logger = _logger;
        }

        public DDS(bool changeCPUClockResolution) : this()
        {
            _changeCPUClockResolution = changeCPUClockResolution;
            if (_changeCPUClockResolution)
            {
                Win32API.SetCPUClockResolution(_logger);

                _readingPause = 1;
                _topicReader.UpdateReadingPause(_readingPause);
            }
        }

        public DDS(bool changeCPUClockResolution, bool provideSenderInfo) : this(changeCPUClockResolution)
        {
            _topicReader.ProvideSenderInfo = provideSenderInfo;
        }

        public DDS(bool changeCPUClockResolution, bool provideSenderInfo, ILogger logger) : this(changeCPUClockResolution, provideSenderInfo)
        {
            _logger = logger;
            _topicReader.Logger = _logger;
            _topicWriter.Logger = _logger;
        }

        /// <summary>
        /// Connect correct writer nad reader with given topic.
        /// </summary>
        /// <typeparam name="T">TopicBase</typeparam>
        /// <typeparam name="W">TopicWriter</typeparam>
        /// <typeparam name="R">TopicReader</typeparam>
        /// <exception cref="TopicNotRegisteredException">Throws if Topic is not registered yet.</exception>
        public void RegisterTopic<T, W, R, S>()
            where T : TopicBase, new()
            where W : Dds.Base.TopicWriter, new()
            where R : Dds.Base.TopicReader, new()
            where S : Dds.Base.TopicSettings, new()
        {
            _topicDataManager.RegisterHandlersForTopic<T, W, R, S>();
        }

        public void Subscribe<T>(Action<T, SampleInfo> onTopicChange, Func<T, bool> filter) where T : TopicBase
        {
            Subscribe(onTopicChange, string.Empty, filter);
        }

        /// <summary>
        /// Subscribe an action, with specific filter on topic data update. 
        /// An action is subscribed just once.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="onTopicChange"></param>
        /// <param name="partition"></param>
        /// <param name="filter">Null if empty.</param>
        /// <exception cref="TopicRegistrationException">Throws if TopicWriter, or TopicReader are incorrect.</exception>
        public void Subscribe<T>(Action<T, SampleInfo> onTopicChange, IFilterHandler<T> filter, string partition = "") where T : TopicBase
        {
            Subscribe(typeof(T), onTopicChange, filter, partition);
        }

        public void Subscribe<T>(Action<T, SampleInfo> onTopicChange, string partition = "", Func<T, bool> filter = null) where T : TopicBase
        {
            Subscribe(onTopicChange, new GenericFilterHandler<T>(filter), partition);
        }

        /// <summary>
        /// Subscribe an action, with specific filter on topic data update. 
        /// An action is subscribed just once.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="topicType"></param>
        /// <param name="onTopicChange"></param>
        /// <param name="partition"></param>
        /// <param name="filter">Null if empty.</param>
        /// <exception cref="TopicRegistrationException">Throws if TopicWriter, or TopicReader are incorrect.</exception>
        public void Subscribe<T>(Type topicType, Action<T, SampleInfo> onTopicChange, IFilterHandler<T> filter, string partition = "") where T : TopicBase
        {
            _topicReader.SubscribeTopic(topicType, (T sample, SampleInfo info, ParticInfo sender) => onTopicChange(sample, info), onTopicChange.Target, onTopicChange.Method.MetadataToken, partition, filter);
            _logger?.LogDebug("Topic = {topicName}, method = {methodName} Subscribed", typeof(T).Name, nameof(onTopicChange));
        }

        public void Subscribe<T>(Type topicType, Action<T, SampleInfo, ParticInfo> onTopicChange, IFilterHandler<T> filter, string partition = "") where T : TopicBase
        {
            _topicReader.SubscribeTopic(topicType, onTopicChange, onTopicChange.Target, onTopicChange.Method.MetadataToken, partition, filter);
            _logger?.LogDebug("Topic = {topicName}, method = {methodName} Subscribed", typeof(T).Name, nameof(onTopicChange));
        }


        /// <summary>
        /// Subscribe an action, with specific filter on topic data update. 
        /// An action is subscribed just once.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="topicType"></param>
        /// <param name="onTopicChange"></param>
        /// <param name="partition"></param>
        /// <param name="filter">Null if empty.</param>
        /// <exception cref="TopicRegistrationException">Throws if TopicWriter, or TopicReader are incorrect.</exception>
        public void Subscribe<T>(Type topicType, Action<T, SampleInfo> onTopicChange, string partition = "", Func<T, bool> filter = null) where T : TopicBase
        {
            Subscribe(topicType, onTopicChange, new GenericFilterHandler<T>(filter), partition);
        }

        /// <summary>
        /// Remove subscription of an action on given topic.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="onTopicChange"></param>
        /// <param name="partition"></param>
        public void Unsubscribe<T>(Action<T, SampleInfo> onTopicChange, string partition = "") where T : TopicBase
        {
            Unsubscribe(typeof(T), onTopicChange, partition);
        }

        /// <summary>
        /// Remove subscription of an action on given topic.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="topicType"></param>
        /// <param name="onTopicChange"></param>
        /// <param name="partition"></param>
        public void Unsubscribe<T>(Type topicType, Action<T, SampleInfo> onTopicChange, string partition = "") where T : TopicBase
        {
            _topicReader.UnsubscribeTopic(topicType, (T sample, SampleInfo info, ParticInfo sender) => onTopicChange(sample, info), onTopicChange.Target, onTopicChange.Method.MetadataToken, partition);
            _logger?.LogDebug("Topic = {topicName}, method = {methodName} Unsubscribed", typeof(T).Name, nameof(onTopicChange));
        }

        public void Unsubscribe<T>(Type topicType, Action<T, SampleInfo, ParticInfo> onTopicChange, string partition = "") where T : TopicBase
        {
            _topicReader.UnsubscribeTopic(topicType, onTopicChange, onTopicChange.Target, onTopicChange.Method.MetadataToken, partition);
            _logger?.LogDebug("Topic = {topicName}, method = {methodName} Unsubscribed", typeof(T).Name, nameof(onTopicChange));
        }

        /// <summary>
        /// Publish topic change.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="sample"></param>
        /// <param name="partition"></param>
        /// <exception cref="Exception">Thrown when return code is not Ok.</exception>
        /// <exception cref="TopicNotRegisteredException">Throws when topic is not registered yet</exception>
        public void Publish<T>(T sample, string partition = "") where T : TopicBase
        {
            Publish(sample.GetType(), sample, partition);
        }

        public void Publish(Type topicType, TopicBase sample, string partition = "")
        {
            ProcessPublish(topicType, sample, partition, false);
            _logger?.LogTrace("Sample = {topicName}, partition = {partition} Published", topicType.Name, partition);
        }

        private void ProcessPublish(Type topicType, TopicBase sample, string partition, bool dispose)
        {
            if (_isDisposing)
                throw new ObjectDisposedException("ICommunicationLayer; DDS");

            var result = _topicWriter.PublishTopicChange(topicType, sample, dispose, partition);
            if (result != ReturnCode.Ok)
            {
                string currentProcess = dispose ? "Dispose" : "Publish";
                throw new Exception($"{currentProcess} of topic={sample.GetType().Name} failed with returnCode={result}");
            }
        }

        /// <summary>
        /// Dispose concrete topic sample.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="sample"></param>
        /// <param name="partition"></param>
        /// <exception cref="Exception">Throws if dispose failed</exception>
        public void DisposeTopicSample<T>(T sample, string partition = "") where T : TopicBase
        {
            ProcessPublish(sample.GetType(), sample, partition, true);
            _logger?.LogTrace("Sample = {topicName}, partition = {partition} Disposed", typeof(T).Name, partition);
        }

        /// <summary>
        /// Dispose the internal dds objects (reading thread, participants...)
        /// You need to call this explicitly when you no longer need to work with the DDS messages, latest at the end of the app.
        /// WARNING: Not thread safe!!! DO NOT call from multiple tasks/threads at the same time!
        /// </summary>
		protected override void Dispose(bool disposing)
        {
            base.Dispose(disposing);
            if (!disposing)
            {
                return;
            }
            _isDisposing = disposing;

            _topicReader?.Dispose();
            _topicWriter?.Dispose();
            _topicDataManager?.Dispose();
            if (_changeCPUClockResolution)
            {
                Win32API.ResetCPUClockResolution();
            }

            _logger?.LogDebug($"DDS communication Disposed");
        }

        /// <summary>
        /// Check if a type already subscribed with specific action
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="topicType"></param>
        /// <param name="onTopicChange"></param>
        /// <param name="partition"></param>
        public bool IsTopicSubscribedToAction<T>(Type topicType, Action<T, SampleInfo> onTopicChange, string partition = "") where T : TopicBase
        {
            return _topicReader.IsTopicSubscribedToAction(topicType, (T sample, SampleInfo info, ParticInfo sender) => onTopicChange(sample, info), onTopicChange.Target, onTopicChange.Method.MetadataToken, partition);
        }

        public void DisposeAllWrites()
        {
            _topicWriter.ResetAllWriters();
        }
    }
}

and here are some of the classes that may be relevant

TopicReader.cs

using Communication.CustomException;
using Dds.Base;
using Dds;
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.Logging;
using System.Collections.Immutable;

namespace Communication
{
    internal class TopicReader : Disposable
    {
        private TopicDataManager _topicDataManager;
        internal ParticInfoMonitor _particInfoMonitor;
        internal bool ProvideSenderInfo
        {
            set
            {
                if (value)
                {
                    _particInfoMonitor = new ParticInfoMonitor();
                }
            }
        }

        private ILogger _logger;
        internal ILogger Logger
        {
            set
            {
                _logger = value;
            }
        }

        internal TopicReader(TopicDataManager topicDataManager, int readingPause)
        {
            _topicDataManager = topicDataManager;
            // note: Max one thread runs per application, stopped on TopicDataManager disposal
            _topicDataManager.StartTopicReaderThreadIfNotAlreadyRunning(readingPause, ReadTopics);
        }


        protected override void Dispose(bool disposing)
        {
            if (disposing)
            {
                _particInfoMonitor?.Dispose();
                _particInfoMonitor = null;
            }
        }

        /// <summary>
        /// Polls incoming samples of all subscribed topics, for those passing the filter calls their action handlers
        /// </summary>
        protected void ReadTopics()
        {
            foreach (var topicCallbacks in _topicDataManager.GetActionDataCollection()) // iterate all subscriptions
            {
                ITopicHandler topicHandler = _topicDataManager.GetTopicHandlerOrDefault(topicCallbacks.Key);
                if (topicHandler != null)
                {
                    while(true) // read all msgs buffered in the dds reader
                    // Intentionally there is no limit on how many msgs to read per iteration.
                    // If we keep getting more messages than what we can process quickly enough,
                    // reading thread will freeze and no other msgs will be received,
                    // indicating the problem very clearly. Maybe better than if dying slowly,
                    // showing just increasing delays in msg reception.
                    {
                        var sample = topicHandler.Topic;
                        var sampleInfo = topicHandler.SampleInfo;
                        var result = topicHandler.Read(ref sample, ref sampleInfo);
                        if (result != ReturnCode.Ok)
                            break;

                        ParticInfo sender = null;
                        if (_particInfoMonitor != null)
                        {
                            string senderGuid = topicHandler.GetSenderGuid(sampleInfo.PublicationHandle);
                            sender = _particInfoMonitor.FindByGuid(senderGuid);
                        }

                        topicHandler.Topic = _topicDataManager.GetNewTopicInstance(topicHandler.TopicName);
                        foreach (var callbackData in topicCallbacks.Value)
                        {
                            if (callbackData.Filter(sample))
                            {
                                callbackData.OnTopicChange(sample, sampleInfo, sender);
                            }
                        }
                    }
                }
            }
        }

        internal void SubscribeTopic<T>(Type topicType, Action<T, SampleInfo, ParticInfo> OnTopicChange, object id1, int id2, string partition, IFilterHandler<T> filter) where T : TopicBase
        {
            var subscriptionKey = _topicDataManager.GetDataKey(topicType.FullName, partition);
            InitializeTopicHandler(topicType, _topicDataManager.GetParticipant(partition), subscriptionKey);
            SubscribeTopic(OnTopicChange, id1, id2, filter, subscriptionKey);
        }

        private void InitializeTopicHandler(Type topicType, Participant participant, string subscriptionKey)
        {
            if (TopicDataManager.SubscribedTopicHandlers.ContainsKey(subscriptionKey))
            {
                if (!TopicDataManager.TopicActionData.ContainsKey(subscriptionKey))
                {
                    TopicDataManager.TopicActionData[subscriptionKey] = ImmutableArray.Create<IActionData>();
                }

                return;
            }

            if (_topicDataManager.TryGetTopicHandler(topicType, participant, out ITopicHandler topicHandler))
            {
                TopicDataManager.TopicActionData[subscriptionKey] = ImmutableArray.Create<IActionData>();
                TopicDataManager.SubscribedTopicHandlers[subscriptionKey] = topicHandler;
                TopicDataManager.ExplicitlyUnsubscribedKeys.TryRemove(subscriptionKey, out _);
            }
            else
                throw new TopicNotRegisteredException($"TOPIC NOT REGISTERED - Subscribed topic={topicType.Name} has to be registered first.");
        }

        private void SubscribeTopic<T>(Action<T, SampleInfo, ParticInfo> onTopicChange, object id1, int id2, IFilterHandler<T> filter, string subscriptionKey) where T : TopicBase
        {
            IActionData subscribeData = new ActionData<T>(onTopicChange, id1, id2, filter);
            if (
                    TopicDataManager.TopicActionData.TryGetValue(subscriptionKey, out var actions) &&
                    !IsSubscribed(subscribeData, actions)
                )
            {
                TopicDataManager.TopicActionData[subscriptionKey] = actions.Add(subscribeData);
            }
        }

        private bool IsSubscribed(IActionData actionData, IEnumerable<IActionData> actions)
        {
          var isSubscribed = actions.Any(x => actionData.Equals(x));
            return isSubscribed;
        }

        internal void UnsubscribeTopic<T>(Type topicType, Action<T, SampleInfo, ParticInfo> onTopicChange, object id1, int id2, string partition) where T : TopicBase
        {
            var key = _topicDataManager.GetDataKey(topicType.FullName, partition);
            if (TopicDataManager.TopicActionData.TryGetValue(key, out var actions))
            {
                TopicDataManager.TopicActionData[key] = UnsubscribeTopic(onTopicChange, id1, id2, key, actions);
                if(TopicDataManager.TopicActionData[key].Count() == 0 )
                {
                    if(TopicDataManager.TopicActionData.TryRemove(key, out _))
                    {
                        if(TopicDataManager.SubscribedTopicHandlers.TryRemove(key, out var topicHandler))
                        {
                            topicHandler.Dispose();
                            TopicDataManager.ExplicitlyUnsubscribedKeys.TryAdd(key, 0);
                        }
                    }
                }
            }
        }

        private ImmutableArray<IActionData> UnsubscribeTopic<T>(Action<T, SampleInfo, ParticInfo> onTopicChange, object id1, int id2, string key, ImmutableArray<IActionData> actions) where T : TopicBase
        {
            IActionData actionData = new ActionData<T>(onTopicChange, id1, id2);
            var subscriptionData = actions.Where(x => actionData.Equals(x)).FirstOrDefault();
            if (subscriptionData != null)
            {
                return actions.Remove(subscriptionData);
            }

            return actions;
        }

        internal bool IsTopicSubscribedToAction<T>(Type topicType, Action<T, SampleInfo, ParticInfo> onTopicChange, object id1, int id2, string partition) where T : TopicBase
        {
            var subscriptionKey = _topicDataManager.GetDataKey(topicType.FullName, partition);
            IActionData subscribeData = new ActionData<T>(onTopicChange, id1, id2);
            bool subscribed = false;
            if (TopicDataManager.TopicActionData.TryGetValue(subscriptionKey, out var actions))
            {
                subscribed = IsSubscribed(subscribeData, actions);
            }

            return subscribed;
        }
        
        internal void UpdateReadingPause(int readingPause)
        {
            _topicDataManager.UpdateReadingPause(readingPause);
        }
    }
}

TopicDataManager.cs

using Dds;
using Dds.Base;
using System;
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Threading;

namespace Communication
{
    /// <summary>
    /// Data storage for information on subscribed/published topics, keyed by topicName+partitionName. No logic.
    /// Data is static, shared shared by TopicReader & TopicWriter. Dispose() should be called in pair with the ctor
    /// to disposed the shared static data when no one is using it anymore.
    /// Maintains various info like
    ///   - instances od participant for each individual partition (one per partition)
    ///   - instances of internal readers and writers (one per topic type)
    ///   - actions to call on topic arrival and associated filter method 
    /// </summary>
    public class TopicDataManager : Disposable
    {
        public static ConcurrentDictionary<string, ImmutableArray<IActionData>> TopicActionData => TopicData.TopicActionData;
        public static ConcurrentDictionary<string, ITopicHandlerInitializer> TopicHandlerInitializers => TopicData.TopicHandlerInitializers;
        public static ConcurrentDictionary<string, ITopicHandler> SubscribedTopicHandlers => TopicData.SubscribedTopicHandlers;
        public static string CurrentParticipantPartition => _currentPartition;

        public static Participant Participant;        // single live participant
        private static string _currentPartition = "";   // last partition used for _participant
        public static ConcurrentDictionary<string, byte> ExplicitlyUnsubscribedKeys = new ConcurrentDictionary<string, byte>();
        public TopicDataManager()
        {
            TopicData.AddRef();

            // We found a bug that in some cases the first volative message sent is not being received correctly.
            // The problem seems to be fixed by creating a participant (any partition) a few ms before actually publishing the message.
            GetParticipant(string.Empty);
        }

        protected override void Dispose(bool disposing)
        {
            base.Dispose(disposing);
            if (!disposing) return;

            // note: TopicData is shared between TopicReader & TopicWriter, we never know who is still using it => disposes once refcnt drops to zero
            TopicData.RemoveRef();
        }

        internal void RegisterHandlersForTopic<T, W, R, S>()
            where T : TopicBase, new()
            where W : Dds.Base.TopicWriter, new()
            where R : Dds.Base.TopicReader, new()
            where S : Dds.Base.TopicSettings, new()
        {
            var key = GetDataKey(typeof(T).FullName, string.Empty);
            if (!TopicData.TopicHandlerInitializers.ContainsKey(key))
            {
                var topicBaseType = typeof(T).Name;
                var topicWriter = typeof(W).Name;
                var topicReader = typeof(R).Name;
                var topicSettings = typeof(S).Name;

                CheckCorrectTypes(topicBaseType, topicWriter, topicReader, topicSettings);
                TopicData.TopicHandlerInitializers[key] = new TopicHandlerInitializer<T, W, R, S>();
            }
        }

        public string GetDataKey<T>(string partition) where T : TopicBase
        {
            return GetDataKey(typeof(T).FullName, partition);
        }

        public string GetDataKey(string topicName, string partition)
        {
            if (string.IsNullOrEmpty(partition))
                return $"{topicName}";
            else
                return $"{topicName}_{partition}";
        }

        private void CheckCorrectTypes(string topicName, string writerName, string readerName, string settingsName)
        {
            string exceptionMessage = string.Empty;

            bool correctWriterType = writerName.StartsWith(topicName);
            if (!correctWriterType)
                exceptionMessage = $"Incorrect TopicWriter type={writerName}, for topic={topicName}";

            bool correctReaderType = readerName.StartsWith(topicName);
            if (!correctReaderType)
                exceptionMessage = $"{exceptionMessage}\n Incorrect TopicReader type={readerName}, for topic={topicName}";

            bool correctSettingsType = settingsName.StartsWith(topicName);
            if (!correctSettingsType)
                exceptionMessage = $"{exceptionMessage}\n Incorrect TopicSettings type={settingsName}, for topic={topicName}";

            if (!correctWriterType || !correctReaderType || !correctSettingsType)
                throw new Exception(exceptionMessage);
        }

        internal ITopicHandler GetTopicHandlerOrDefault<T>(T sample, string partition) where T : TopicBase
        {
            var key = GetDataKey(sample.GetType().FullName, partition);
            return GetTopicHandlerOrDefault(key);
        }

        internal ITopicHandler GetTopicHandlerOrDefault(string TopicHandlerKey)
        {
            if (TopicData.SubscribedTopicHandlers.TryGetValue(TopicHandlerKey, out var handler))
                return handler;

            return null;
        }

        // Disposes ONLY handlers that were explicitly unsubscribed by the application
        // and that belong to the specified partition.
        private void DisposeHandlersForPartition_ExplicitOnly(string partition)
        {
            // keys look like:
            //   empty partition: "Topic"
            //   non-empty:       "Topic_<partition>"
            string suffix = string.IsNullOrEmpty(partition) ? null : "_" + partition;

            foreach (var kv in TopicData.SubscribedTopicHandlers)
            {
                string key = kv.Key;

                // Must be explicitly unsubscribed:
                if (!ExplicitlyUnsubscribedKeys.ContainsKey(key))
                    continue;

                // Must belong to the old partition we are leaving:
                bool match =
                    suffix == null
                        ? !key.Contains("_")             // empty partition keys have no suffix
                        : key.EndsWith(suffix, StringComparison.Ordinal);

                if (!match)
                    continue;

                if (TopicData.SubscribedTopicHandlers.TryRemove(key, out var handler))
                {
                    try { handler.Dispose(); } catch { /* keep current behavior; swallow or log as you already do */ }
                }
            }
        }

        internal Participant GetParticipant(string partition)
        {
            if (partition == null)
            {
                partition = string.Empty;
            }

            // If we already have a participant for this exact partition, reuse it
            if (Participant != null && string.Equals(_currentPartition, partition, StringComparison.Ordinal))
            {
                return Participant;
            }

            // If there is an old participant (for a different partition), dispose it first
            if (Participant != null)
            {
                DisposeHandlersForPartition_ExplicitOnly(_currentPartition);

                try
                {
                    Participant.Dispose(); // IMPORTANT: this should delete contained entities as your wrapper already does
                }
                catch
                {
                    // swallow to keep behavior robust; logging is ok if you prefer
                }

                Participant = null;
            }

            // Create a fresh participant for the requested partition
            Participant = new Participant(partition);

            // Remember which partition this participant represents
            _currentPartition = partition;

            return Participant;
        }


        internal TopicBase GetNewTopicInstance(string topicName)
        {
            return TopicData.TopicHandlerInitializers[topicName].GetTopicInstance();
        }

        internal ConcurrentDictionary<string, ImmutableArray<IActionData>> GetActionDataCollection()
        {
            return TopicData.TopicActionData;
        }

        public bool IsRegistered(Type TopicType)
        {
            var key = GetDataKey(TopicType.FullName, string.Empty);
            return TopicData.TopicHandlerInitializers.ContainsKey(key);
        }

        public ITopicHandler InitializeTopicHandler(Type topicType, Participant participant)
        {
            if (TryGetTopicHandler(topicType, participant, out ITopicHandler topicHandler))
            {
                var topicHandlerKey = GetDataKey(topicType.FullName, participant.DefaultPartitionName);
                TopicData.SubscribedTopicHandlers[topicHandlerKey] = topicHandler;
                return topicHandler;
            }
            else
            {
                throw new Exception($"InitializeTopicHandler for {topicType.Name} and partition ={participant.DefaultPartitionName} FAILED.");
            }
        }
        internal bool TryGetTopicHandler(Type topicType, Participant participant, out ITopicHandler newTopicHandler)
        {
            newTopicHandler = null;
            var registrationKey = GetDataKey(topicType.FullName, string.Empty);
            if (TopicData.TopicHandlerInitializers.TryGetValue(registrationKey, out var topicInitializer))
            {
                newTopicHandler = topicInitializer.InitializeTopicHandler(participant);
                return true;
            }
            return false;
        }

        /// <summary>
        /// Starts the thread periodically reading incoming topics.
        /// Only one thread shall run per application, only first request is accepted, successive requests (coming from another instances od DDS class) are ignored.
        /// </summary>
        /// <param name="readingPause">number of msecs to sleep between ticks of the reading thread</param>
        /// <param name="topicReaderThreadTickCallback">delegate that should read the topics</param>
        internal void StartTopicReaderThreadIfNotAlreadyRunning(int readingPause, Action topicReaderThreadTickCallback)
        {
            TopicData.StartReaderThreadIfNotAlreadyRunning(readingPause, topicReaderThreadTickCallback);
        }

        internal void UpdateReadingPause(int readingPause)
        {
            TopicData.ReadingPauseMsec = readingPause;
        }

        /// <summary>
        /// Globally shared topic data including the one and only topic reading thread per app.
        /// Reference counting based disposal only when no one uses it anymore.
        /// 
        /// </summary>
        private static class TopicData
        {
            /// <summary>
            /// ActionData added each subscribe
            /// </summary>
            internal static ConcurrentDictionary<string, ImmutableArray<IActionData>> TopicActionData { get; }
            internal static ConcurrentDictionary<string, ITopicHandlerInitializer> TopicHandlerInitializers { get; }
            internal static ConcurrentDictionary<string, ITopicHandler> SubscribedTopicHandlers { get; }
            /// <summary> One per partition </summary>
            internal static ConcurrentDictionary<string, Participant> Participants { get; }

            internal static int ReadingPauseMsec { get; set; }

            // The one and only topic reading thread per app
            // Managed here (and not in the TopicReader) to ensure stopping it on the last Dispose (when no one needs it anymore).
            private static Thread _readerThread;
            private static volatile bool _stopRequired;
            private static Action _topicReaderThreadTickCallback;

            private static int _refCount;

            static TopicData()
            {
                _refCount = 0;
                TopicActionData = new ConcurrentDictionary<string, ImmutableArray<IActionData>>();
                TopicHandlerInitializers = new ConcurrentDictionary<string, ITopicHandlerInitializer>();
                SubscribedTopicHandlers = new ConcurrentDictionary<string, ITopicHandler>();
                Participants = new ConcurrentDictionary<string, Participant>();
            }

            /// <summary>
            /// This MUST be called from the ctor of anyone using the TopicData
            /// NOT thread safe!
            /// </summary>
            internal static void AddRef()
            {
                _refCount++;
            }

            /// <summary>
            /// This MUST be called from the dispose handler of anyone using the TopicData
            /// NOT thread safe!
            /// </summary>
            internal static void RemoveRef()
            {
                if (_refCount <= 0) throw new Exception("Trying to derefence TopicData which were already deleted.");
                _refCount--;

                // clear and dispose only if no one else is using the data anymore
                if (_refCount > 0) return;
                Clear();
            }

            // Do not Clear TopicHandlerInitializers, registration is processed once at start of application.
            private static void Clear()
            {
                StopReaderThread();

                TopicActionData.Clear();

                foreach (var subscribedTopicHandler in SubscribedTopicHandlers.Values)
                    subscribedTopicHandler.Dispose(); // properly dispose the low level dds readers/writes
                SubscribedTopicHandlers.Clear();

                foreach (var participant in Participants.Values)
                    participant.Dispose(); // properly dispose the low level dds participant
                Participants.Clear();
            }

            /// <summary>
            /// Starts the thread if not already existing. Periodically calls given callback until the thread is stopped.
            /// </summary>
		    internal static void StartReaderThreadIfNotAlreadyRunning(int readingPauseMsec, Action topicReaderThreadTickCallback)
            {
                if (_topicReaderThreadTickCallback != null)
                    return; // topic reading already requested, this must come from ctor of another instance of DDS class

                StartReaderThread(readingPauseMsec, topicReaderThreadTickCallback);
            }

            private static void StartReaderThread(int readingPauseMsec, Action topicReaderThreadTickCallback)
            {
                _topicReaderThreadTickCallback = topicReaderThreadTickCallback;
                ReadingPauseMsec = readingPauseMsec;

                _readerThread = new Thread(delegate () { TopicReaderLoop(); })
                {
                    Priority = ThreadPriority.Normal,
                    Name = "Topic_Reader_Thread",
                    IsBackground = true
                };
                _readerThread.Start();
            }

            private static void StopReaderThread()
            {
                if (_readerThread != null)
                {
                    _stopRequired = true;
                    _readerThread.Join();
                    _readerThread = null;
                    _topicReaderThreadTickCallback = null;
                }
            }

            private static void TopicReaderLoop()
            {
                while (!_stopRequired)
                {
                    _topicReaderThreadTickCallback(); // typically pointing to TopicReader.ReadTopics();
                    Thread.Sleep(ReadingPauseMsec);
                }
                System.Diagnostics.Debug.WriteLine("TopicReaderThread stopped.");

                _stopRequired = false;
            }

        }
    }
}

TopicHandler.cs

using Dds;
using Dds.Base;

namespace Communication
{
    internal class TopicHandler<T, W, R, S> : ITopicHandler
        where T : TopicBase, new()
        where W : Dds.Base.TopicWriter, new()
        where R : Dds.Base.TopicReader, new()
        where S : Dds.Base.TopicSettings, new()
    {
        public TopicBase Topic
        {
            get => _topic;
            set
            {
                if (value is T newTopic)
                    _topic = newTopic;
            }
        }
        public string TopicName { get; }
        private T _topic;
        private W _topicWriter;
        private R _topicReader;
        private S _topicSettings;
        private SampleInfo _sampleInfo;
        public SampleInfo SampleInfo => _sampleInfo;

        internal TopicHandler(T topic, W topicWriter, R topicReader, S topicSettings)
        {
            _topic = topic;
            _topicWriter = topicWriter;
            _topicReader = topicReader;
            _topicSettings = topicSettings;
            _sampleInfo = new SampleInfo();
            TopicName = typeof(T).FullName;
        }

        public ReturnCode Publish(TopicBase sample, bool dispose = false)
        {
            if (_topicWriter.IsDisposed)
            {
                return ReturnCode.AlreadyDeleted;
            }
            else
            {
                return _topicWriter.Write(sample, dispose);
            }
        }

        public ReturnCode Read(ref TopicBase sample, ref SampleInfo sampleInfo)
        {
            if (_topicReader.IsDisposed)
            {
                return ReturnCode.AlreadyDeleted;
            }
            else
            {
                return _topicReader.Take(ref sample, ref sampleInfo);
            }
        }

        public string GetSenderGuid( InstanceHandle publicationHandle )
        {
            return _topicReader.GetSenderGuid( publicationHandle );
        }

        public void ResetWriter()
        {
            if (_topicWriter != null && !_topicWriter.IsDisposed)
            {
                var participant = _topicWriter.Participant;
                _topicWriter.Dispose();
                _topicWriter = new W();
                _topicWriter.Initialize(participant);
            }
        }

        public void Dispose()
        {
            _topicReader?.Dispose();
            _topicWriter?.Dispose();
        }
    }
}

do you have any suggestions on what or where our problem is? how can we handle those network and memory bursts?

Thanks

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions