diff --git a/Pcap++/header/PcapLiveDevice.h b/Pcap++/header/PcapLiveDevice.h index 3bf89912f..c0fda947c 100644 --- a/Pcap++/header/PcapLiveDevice.h +++ b/Pcap++/header/PcapLiveDevice.h @@ -82,44 +82,6 @@ namespace pcpp bool isLoopback; }; - /// @brief A worker thread that periodically calls the provided callback with updated statistics. - class StatisticsUpdateWorker - { - public: - /// @brief Constructs and starts a worker thread that periodically calls the provided callback with updated - /// statistics. - /// @param pcapDevice A pointer to the PcapLiveDevice instance to be monitored. - /// @param onStatsUpdateCallback A callback function to be called with updated statistics. - /// @param onStatsUpdateUserCookie A user-defined pointer that is passed to the callback function. - /// @param updateIntervalMs The interval in milliseconds between each callback invocation. - StatisticsUpdateWorker(PcapLiveDevice const& pcapDevice, OnStatsUpdateCallback onStatsUpdateCallback, - void* onStatsUpdateUserCookie = nullptr, unsigned int updateIntervalMs = 1000); - - /// @brief Stops the worker thread. - void stopWorker(); - - private: - struct ThreadData - { - PcapLiveDevice const* pcapDevice = nullptr; - OnStatsUpdateCallback cbOnStatsUpdate; - void* cbOnStatsUpdateUserCookie = nullptr; - unsigned int updateIntervalMs = 1000; // Default update interval is 1 second - }; - - struct SharedThreadData - { - std::atomic_bool stopRequested{ false }; - }; - - /// @brief Main function for the worker thread. - /// @remarks This function is static to allow the worker class to be movable. - static void workerMain(std::shared_ptr sharedThreadData, ThreadData threadData); - - std::shared_ptr m_SharedThreadData; - std::thread m_WorkerThread; - }; - // This is a second descriptor for the same device. It is needed because of a bug // that occurs in libpcap on Linux (on Windows using WinPcap/Npcap it works well): // It's impossible to capture packets sent by the same descriptor @@ -131,22 +93,15 @@ namespace pcpp uint32_t m_DeviceMtu; MacAddress m_MacAddress; IPv4Address m_DefaultGateway; - std::thread m_CaptureThread; - // TODO: Cpp17 Using std::optional might be better here - std::unique_ptr m_StatisticsUpdateWorker; + std::thread m_CaptureThread; + std::thread m_StatsThread; // Should be set to true by the Caller for the Callee std::atomic m_StopThread; // Should be set to true by the Callee for the Caller std::atomic m_CaptureThreadStarted; - OnPacketArrivesCallback m_cbOnPacketArrives; - void* m_cbOnPacketArrivesUserCookie; - OnPacketArrivesStopBlocking m_cbOnPacketArrivesBlockingMode; - void* m_cbOnPacketArrivesBlockingModeUserCookie; - RawPacketVector* m_CapturedPackets; - bool m_CaptureCallbackMode; LinkLayerType m_LinkType; bool m_UsePoll; @@ -162,13 +117,6 @@ namespace pcpp void setDeviceMacAddress(); void setDefaultGateway(); - // threads - void captureThreadMain(); - - static void onPacketArrives(uint8_t* user, const struct pcap_pkthdr* pkthdr, const uint8_t* packet); - static void onPacketArrivesNoCallback(uint8_t* user, const struct pcap_pkthdr* pkthdr, const uint8_t* packet); - static void onPacketArrivesBlockingMode(uint8_t* user, const struct pcap_pkthdr* pkthdr, const uint8_t* packet); - public: /// The type of the live device enum LiveDeviceType diff --git a/Pcap++/src/PcapLiveDevice.cpp b/Pcap++/src/PcapLiveDevice.cpp index 973b6de8a..a912256cf 100644 --- a/Pcap++/src/PcapLiveDevice.cpp +++ b/Pcap++/src/PcapLiveDevice.cpp @@ -230,68 +230,6 @@ namespace pcpp } } - PcapLiveDevice::StatisticsUpdateWorker::StatisticsUpdateWorker(PcapLiveDevice const& pcapDevice, - OnStatsUpdateCallback onStatsUpdateCallback, - void* onStatsUpdateUserCookie, - unsigned int updateIntervalMs) - { - // Setup thread data - m_SharedThreadData = std::make_shared(); - - ThreadData threadData; - threadData.pcapDevice = &pcapDevice; - threadData.cbOnStatsUpdate = onStatsUpdateCallback; - threadData.cbOnStatsUpdateUserCookie = onStatsUpdateUserCookie; - threadData.updateIntervalMs = updateIntervalMs; - - // Start the thread - m_WorkerThread = std::thread(&StatisticsUpdateWorker::workerMain, m_SharedThreadData, std::move(threadData)); - } - - void PcapLiveDevice::StatisticsUpdateWorker::stopWorker() - { - m_SharedThreadData->stopRequested = true; - if (m_WorkerThread.joinable()) - { - m_WorkerThread.join(); - } - } - - void PcapLiveDevice::StatisticsUpdateWorker::workerMain(std::shared_ptr sharedThreadData, - ThreadData threadData) - { - if (sharedThreadData == nullptr) - { - PCPP_LOG_ERROR("Shared thread data is null"); - return; - } - - if (threadData.pcapDevice == nullptr) - { - PCPP_LOG_ERROR("Pcap device is null"); - return; - } - - if (threadData.cbOnStatsUpdate == nullptr) - { - PCPP_LOG_ERROR("Statistics Callback is null"); - return; - } - - PCPP_LOG_DEBUG("Started statistics thread"); - - PcapStats stats; - auto sleepDuration = std::chrono::milliseconds(threadData.updateIntervalMs); - while (!sharedThreadData->stopRequested) - { - threadData.pcapDevice->getStatistics(stats); - threadData.cbOnStatsUpdate(stats, threadData.cbOnStatsUpdateUserCookie); - std::this_thread::sleep_for(sleepDuration); - } - - PCPP_LOG_DEBUG("Stopped statistics thread"); - } - PcapLiveDevice::PcapLiveDevice(DeviceInterfaceDetails interfaceDetails, bool calculateMTU, bool calculateMacAddress, bool calculateDefaultGateway) : IPcapDevice(), m_PcapSendDescriptor(nullptr), m_PcapSelectableFd(-1), @@ -327,12 +265,6 @@ namespace pcpp m_CaptureThreadStarted = false; m_StopThread = false; m_CaptureThread = {}; - m_cbOnPacketArrives = nullptr; - m_cbOnPacketArrivesBlockingMode = nullptr; - m_cbOnPacketArrivesBlockingModeUserCookie = nullptr; - m_cbOnPacketArrivesUserCookie = nullptr; - m_CaptureCallbackMode = true; - m_CapturedPackets = nullptr; if (calculateMacAddress) { setDeviceMacAddress(); @@ -340,85 +272,205 @@ namespace pcpp } } - void PcapLiveDevice::onPacketArrives(uint8_t* user, const struct pcap_pkthdr* pkthdr, const uint8_t* packet) + namespace { - PcapLiveDevice* pThis = reinterpret_cast(user); - if (pThis == nullptr) + struct StatisticsUpdateContext { - PCPP_LOG_ERROR("Unable to extract PcapLiveDevice instance"); - return; + OnStatsUpdateCallback cbOnStatsUpdate; + void* cbOnStatsUpdateUserCookie = nullptr; + unsigned int updateIntervalMs = 1000; // Default update interval is 1 second + }; + + void statsThreadMain(std::atomic_bool& stopFlag, internal::PcapHandle const& pcapDescriptor, + StatisticsUpdateContext context) + { + if (context.cbOnStatsUpdate == nullptr) + { + PCPP_LOG_ERROR("No callback provided for statistics updates"); + return; + } + + PCPP_LOG_DEBUG("Started statistics thread"); + + IPcapDevice::PcapStats stats; + auto sleepDuration = std::chrono::milliseconds(context.updateIntervalMs); + while (!stopFlag.load()) + { + try + { + pcapDescriptor.getStatistics(stats); + context.cbOnStatsUpdate(stats, context.cbOnStatsUpdateUserCookie); + } + catch (const std::exception& ex) + { + PCPP_LOG_ERROR("Exception occurred while invoking statistics update callback: " << ex.what()); + break; + } + catch (...) + { + PCPP_LOG_ERROR("Unknown exception occurred while invoking statistics update callback"); + break; + } + std::this_thread::sleep_for(sleepDuration); + } + PCPP_LOG_DEBUG("Ended statistics thread"); } - RawPacket rawPacket(packet, pkthdr->caplen, pkthdr->ts, false, pThis->getLinkType()); + struct CaptureContext + { + PcapLiveDevice* device = nullptr; + OnPacketArrivesCallback callback; + void* userCookie = nullptr; + }; - if (pThis->m_cbOnPacketArrives != nullptr) - pThis->m_cbOnPacketArrives(&rawPacket, pThis, pThis->m_cbOnPacketArrivesUserCookie); - } + struct AccumulatorCaptureContext + { + PcapLiveDevice* device = nullptr; + RawPacketVector* capturedPackets = nullptr; + }; - void PcapLiveDevice::onPacketArrivesNoCallback(uint8_t* user, const struct pcap_pkthdr* pkthdr, - const uint8_t* packet) - { - PcapLiveDevice* pThis = reinterpret_cast(user); - if (pThis == nullptr) + struct CaptureContextST { - PCPP_LOG_ERROR("Unable to extract PcapLiveDevice instance"); - return; - } + PcapLiveDevice* device = nullptr; + OnPacketArrivesStopBlocking callback; + void* userCookie = nullptr; + bool requestStop = false; + }; - uint8_t* packetData = new uint8_t[pkthdr->caplen]; - memcpy(packetData, packet, pkthdr->caplen); - RawPacket* rawPacketPtr = new RawPacket(packetData, pkthdr->caplen, pkthdr->ts, true, pThis->getLinkType()); - pThis->m_CapturedPackets->pushBack(rawPacketPtr); - } + void onPacketArrivesNoop(uint8_t* user, const pcap_pkthdr* pkthdr, const uint8_t* packet) + {} - void PcapLiveDevice::onPacketArrivesBlockingMode(uint8_t* user, const struct pcap_pkthdr* pkthdr, - const uint8_t* packet) - { - PcapLiveDevice* pThis = reinterpret_cast(user); - if (pThis == nullptr) + void onPacketArrivesCallback(uint8_t* user, const pcap_pkthdr* pkthdr, const uint8_t* packet) { - PCPP_LOG_ERROR("Unable to extract PcapLiveDevice instance"); - return; + CaptureContext* context = reinterpret_cast(user); + if (context == nullptr || context->device == nullptr || context->callback == nullptr) + { + PCPP_LOG_ERROR("Unable to extract PcapLiveDevice instance or callback"); + return; + } + + try + { + RawPacket rawPacket(packet, pkthdr->caplen, pkthdr->ts, false, context->device->getLinkType()); + context->callback(&rawPacket, context->device, context->userCookie); + } + catch (const std::exception& ex) + { + PCPP_LOG_ERROR("Exception occurred while invoking packet arrival callback: " << ex.what()); + } + catch (...) + { + PCPP_LOG_ERROR("Unknown exception occurred while invoking packet arrival callback"); + } } - RawPacket rawPacket(packet, pkthdr->caplen, pkthdr->ts, false, pThis->getLinkType()); + void onPacketArrivesAccumulator(uint8_t* user, const pcap_pkthdr* pkthdr, const uint8_t* packet) + { + AccumulatorCaptureContext* context = reinterpret_cast(user); + if (context == nullptr || context->device == nullptr || context->capturedPackets == nullptr) + { + PCPP_LOG_ERROR("Unable to extract PcapLiveDevice instance or captured packets vector"); + return; + } - if (pThis->m_cbOnPacketArrivesBlockingMode != nullptr) - if (pThis->m_cbOnPacketArrivesBlockingMode(&rawPacket, pThis, - pThis->m_cbOnPacketArrivesBlockingModeUserCookie)) - pThis->m_StopThread = true; - } + try + { + uint8_t* packetData = new uint8_t[pkthdr->caplen]; + std::memcpy(packetData, packet, pkthdr->caplen); + auto rawPacket = std::make_unique(packetData, pkthdr->caplen, pkthdr->ts, true, + context->device->getLinkType()); + context->capturedPackets->pushBack(std::move(rawPacket)); + } + catch (const std::exception& ex) + { + PCPP_LOG_ERROR("Exception occurred while invoking packet arrival callback: " << ex.what()); + } + catch (...) + { + PCPP_LOG_ERROR("Unknown exception occurred while invoking packet arrival callback"); + } + } - void PcapLiveDevice::captureThreadMain() - { - PCPP_LOG_DEBUG("Started capture thread for device '" << m_InterfaceDetails.name << "'"); - m_CaptureThreadStarted = true; + void onPacketArrivesCallbackWithCancellation(uint8_t* user, const pcap_pkthdr* pkthdr, const uint8_t* packet) + { + CaptureContextST* context = reinterpret_cast(user); - if (m_CaptureCallbackMode) + if (context == nullptr || context->device == nullptr || context->callback == nullptr) + { + PCPP_LOG_ERROR("Unable to extract PcapLiveDevice instance or callback"); + return; + } + + if (context->requestStop) + { + // If requestStop is true, there is no need to process the packet + PCPP_LOG_DEBUG("Capture request stop is set, skipping packet processing"); + return; + } + + RawPacket rawPacket(packet, pkthdr->caplen, pkthdr->ts, false, context->device->getLinkType()); + + try + { + if (context->callback(&rawPacket, context->device, context->userCookie)) + { + // If the callback returns true, it means that the user wants to stop the capture + context->requestStop = true; + } + } + catch (const std::exception& ex) + { + PCPP_LOG_ERROR("Exception occurred while invoking packet arrival callback: " << ex.what()); + context->requestStop = true; // Stop capture on exception + } + catch (...) + { + PCPP_LOG_ERROR("Unknown exception occurred while invoking packet arrival callback"); + context->requestStop = true; // Stop capture on unknown exception + } + } + + void captureThreadMain(std::atomic_bool& stopFlag, std::atomic_bool& hasStarted, + internal::PcapHandle const& pcapDescriptor, CaptureContext context) { - while (!m_StopThread) + PCPP_LOG_DEBUG("Started capture thread for device '" << context.device->getName() << "'"); + hasStarted.store(true); + + // If the callback is null, we use a no-op handler to avoid unnecessary overhead + // Statistics only capture still requires pcap_dispatch to be called, but we don't need to process packets. + pcap_handler callbackHandler = context.callback ? onPacketArrivesCallback : onPacketArrivesNoop; + + while (!stopFlag.load()) { - if (pcap_dispatch(m_PcapDescriptor.get(), -1, onPacketArrives, reinterpret_cast(this)) == -1) + if (pcap_dispatch(pcapDescriptor.get(), -1, callbackHandler, reinterpret_cast(&context)) == + -1) { - PCPP_LOG_ERROR("pcap_dispatch returned an error: " << m_PcapDescriptor.getLastError()); - m_StopThread = true; + PCPP_LOG_ERROR("pcap_dispatch returned an error: " << pcapDescriptor.getLastError()); + stopFlag.store(true); } } + + PCPP_LOG_DEBUG("Ended capture thread for device '" << context.device->getName() << "'"); } - else + + void captureThreadMainAccumulator(std::atomic_bool& stopFlag, std::atomic_bool& hasStarted, + internal::PcapHandle const& pcapDescriptor, AccumulatorCaptureContext context) { - while (!m_StopThread) + PCPP_LOG_DEBUG("Started capture thread for device '" << context.device->getName() << "'"); + hasStarted.store(true); + while (!stopFlag.load()) { - if (pcap_dispatch(m_PcapDescriptor.get(), 100, onPacketArrivesNoCallback, - reinterpret_cast(this)) == -1) + if (pcap_dispatch(pcapDescriptor.get(), 100, onPacketArrivesAccumulator, + reinterpret_cast(&context)) == -1) { - PCPP_LOG_ERROR("pcap_dispatch returned an error: " << m_PcapDescriptor.getLastError()); - m_StopThread = true; + PCPP_LOG_ERROR("pcap_dispatch returned an error: " << pcapDescriptor.getLastError()); + stopFlag.store(true); } } + + PCPP_LOG_DEBUG("Ended capture thread for device '" << context.device->getName() << "'"); } - PCPP_LOG_DEBUG("Ended capture thread for device '" << m_InterfaceDetails.name << "'"); - } + } // namespace internal::PcapHandle PcapLiveDevice::doOpen(const DeviceConfiguration& config) { @@ -669,11 +721,13 @@ namespace pcpp return false; } - m_CaptureCallbackMode = true; - m_cbOnPacketArrives = std::move(onPacketArrives); - m_cbOnPacketArrivesUserCookie = onPacketArrivesUserCookie; + CaptureContext context; + context.device = this; + context.callback = std::move(onPacketArrives); + context.userCookie = onPacketArrivesUserCookie; - m_CaptureThread = std::thread(&pcpp::PcapLiveDevice::captureThreadMain, this); + m_CaptureThread = std::thread(&captureThreadMain, std::ref(m_StopThread), std::ref(m_CaptureThreadStarted), + std::ref(m_PcapDescriptor), std::move(context)); // Wait thread to be start // C++20 = m_CaptureThreadStarted.wait(true); @@ -686,10 +740,13 @@ namespace pcpp if (onStatsUpdate != nullptr && intervalInSecondsToUpdateStats > 0) { - // Due to passing a 'this' pointer, the current device object shouldn't be relocated, while the worker is - // active. - m_StatisticsUpdateWorker = std::make_unique( - *this, std::move(onStatsUpdate), onStatsUpdateUserCookie, intervalInSecondsToUpdateStats * 1000); + StatisticsUpdateContext statsContext; + statsContext.cbOnStatsUpdate = std::move(onStatsUpdate); + statsContext.cbOnStatsUpdateUserCookie = onStatsUpdateUserCookie; + statsContext.updateIntervalMs = intervalInSecondsToUpdateStats * 1000; // Convert seconds to milliseconds + + m_StatsThread = std::thread(statsThreadMain, std::ref(m_StopThread), std::ref(m_PcapDescriptor), + std::move(statsContext)); PCPP_LOG_DEBUG("Successfully created stats thread for device '" << m_InterfaceDetails.name << "'."); } @@ -721,11 +778,13 @@ namespace pcpp return false; } - m_CapturedPackets = &capturedPacketsVector; - m_CapturedPackets->clear(); + capturedPacketsVector.clear(); - m_CaptureCallbackMode = false; - m_CaptureThread = std::thread(&pcpp::PcapLiveDevice::captureThreadMain, this); + AccumulatorCaptureContext context; + context.device = this; + context.capturedPackets = &capturedPacketsVector; + m_CaptureThread = std::thread(&captureThreadMainAccumulator, std::ref(m_StopThread), + std::ref(m_CaptureThreadStarted), std::ref(m_PcapDescriptor), std::move(context)); // Wait thread to be start // C++20 = m_CaptureThreadStarted.wait(true); while (m_CaptureThreadStarted != true) @@ -763,11 +822,6 @@ namespace pcpp PCPP_LOG_ERROR("Failed to prepare capture: " << ex.what()); return 0; } - m_cbOnPacketArrives = nullptr; - m_cbOnPacketArrivesUserCookie = nullptr; - - m_cbOnPacketArrivesBlockingMode = std::move(onPacketArrives); - m_cbOnPacketArrivesBlockingModeUserCookie = userCookie; m_CaptureThreadStarted = true; m_StopThread = false; @@ -785,17 +839,28 @@ namespace pcpp bool shouldReturnError = false; + CaptureContextST context; + context.device = this; + context.callback = std::move(onPacketArrives); + context.userCookie = userCookie; + context.requestStop = false; + if (timeoutMs <= 0) { while (!m_StopThread) { - if (pcap_dispatch(m_PcapDescriptor.get(), -1, onPacketArrivesBlockingMode, - reinterpret_cast(this)) == -1) + if (pcap_dispatch(m_PcapDescriptor.get(), -1, onPacketArrivesCallbackWithCancellation, + reinterpret_cast(&context)) == -1) { PCPP_LOG_ERROR("pcap_dispatch returned an error: " << m_PcapDescriptor.getLastError()); shouldReturnError = true; m_StopThread = true; } + else if (context.requestStop) + { + // If the callback requested to stop the capture, we break the loop + m_StopThread = true; + } } } else @@ -817,13 +882,18 @@ namespace pcpp if (ready > 0) { - if (pcap_dispatch(m_PcapDescriptor.get(), -1, onPacketArrivesBlockingMode, - reinterpret_cast(this)) == -1) + if (pcap_dispatch(m_PcapDescriptor.get(), -1, onPacketArrivesCallbackWithCancellation, + reinterpret_cast(&context)) == -1) { PCPP_LOG_ERROR("pcap_dispatch returned an error: " << m_PcapDescriptor.getLastError()); shouldReturnError = true; m_StopThread = true; } + else if (context.requestStop) + { + // If the callback requested to stop the capture, we break the loop + m_StopThread = true; + } } else if (ready < 0) { @@ -839,13 +909,18 @@ namespace pcpp } else { - if (pcap_dispatch(m_PcapDescriptor.get(), -1, onPacketArrivesBlockingMode, - reinterpret_cast(this)) == -1) + if (pcap_dispatch(m_PcapDescriptor.get(), -1, onPacketArrivesCallbackWithCancellation, + reinterpret_cast(&context)) == -1) { PCPP_LOG_ERROR("pcap_dispatch returned an error: " << m_PcapDescriptor.getLastError()); shouldReturnError = true; m_StopThread = true; } + else if (context.requestStop) + { + // If the callback requested to stop the capture, we break the loop + m_StopThread = true; + } } currentTime = std::chrono::steady_clock::now(); } @@ -853,8 +928,6 @@ namespace pcpp m_CaptureThreadStarted = false; m_StopThread = false; - m_cbOnPacketArrivesBlockingMode = nullptr; - m_cbOnPacketArrivesBlockingModeUserCookie = nullptr; if (shouldReturnError) { @@ -870,8 +943,8 @@ namespace pcpp void PcapLiveDevice::stopCapture() { - // in blocking mode stop capture isn't relevant - if (m_cbOnPacketArrivesBlockingMode != nullptr) + // In blocking mode, there is no capture thread, so we don't need to stop it + if (!m_CaptureThread.joinable()) return; if (m_CaptureThread.get_id() != std::thread::id{} && m_CaptureThread.get_id() == std::this_thread::get_id()) @@ -890,11 +963,10 @@ namespace pcpp } PCPP_LOG_DEBUG("Capture thread stopped for device '" << m_InterfaceDetails.name << "'"); - if (m_StatisticsUpdateWorker != nullptr) + if (m_StatsThread.joinable()) { PCPP_LOG_DEBUG("Stopping stats thread, waiting for it to join..."); - m_StatisticsUpdateWorker->stopWorker(); - m_StatisticsUpdateWorker.reset(); + m_StatsThread.join(); PCPP_LOG_DEBUG("Stats thread stopped for device '" << m_InterfaceDetails.name << "'"); }