Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions Examples/XdpExample-FilterTraffic/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ struct PacketCaptureArgs
PacketStats* packetStats;
PacketMatchingEngine* matchingEngine;
std::unordered_map<uint32_t, bool> flowTable;
pcpp::XdpDevice* sendPacketsTo;
pcpp::XdpDevice::XdpSocket* sendPacketsTo;
pcpp::PcapFileWriterDevice* pcapWriter;
bool stopCapture;

Expand Down Expand Up @@ -128,14 +128,14 @@ static struct option XdpFilterTrafficOptions[] = {
/**
* A callback to handle packets that were received on the AF_XDP socket
*/
void onPacketsArrive(pcpp::RawPacket packets[], uint32_t packetCount, pcpp::XdpDevice* device, void* userCookie)
void onPacketsArrive(pcpp::RawPacket packets[], uint32_t packetCount, pcpp::XdpDevice::XdpSocket* socket, void* userCookie)
{
auto args = reinterpret_cast<PacketCaptureArgs*>(userCookie);

// if the user asked to interrupt the app, stop receiving packets
if (args->stopCapture)
{
device->stopReceivePackets();
socket->stopReceivePackets();
return;
}

Expand Down Expand Up @@ -268,8 +268,10 @@ void collectStats(std::future<void> futureObj, PacketStats* packetStats, pcpp::X
// run in an endless loop until the signal is received and print stats every 1 sec
while (futureObj.wait_for(std::chrono::milliseconds(1000)) == std::future_status::timeout)
{
// collect RX stats
auto rxStats = dev->getStatistics();
// collect RX stats on socket 0
auto socket = dev->getSocket(0);
auto rxStats = socket->getStatistics();
auto sendsocket = sendDev->getSocket(0);

pcpp::XdpDevice::XdpDeviceStats* txStats = nullptr;

Expand All @@ -278,7 +280,7 @@ void collectStats(std::future<void> futureObj, PacketStats* packetStats, pcpp::X
// if send socket is different from receive socket, collect stats from the send socket
if (sendDev != dev)
{
txStats = new pcpp::XdpDevice::XdpDeviceStats(sendDev->getStatistics());
txStats = new pcpp::XdpDevice::XdpDeviceStats(sendsocket->getStatistics());
}
else // send and receive sockets are the same
{
Expand Down Expand Up @@ -546,7 +548,7 @@ int main(int argc, char* argv[])
PacketCaptureArgs args;
args.packetStats = &packetStats;
args.matchingEngine = &matchingEngine;
args.sendPacketsTo = sendDev;
args.sendPacketsTo = sendDev->getSocket(0);
args.pcapWriter = pcapWriter;

// create future and promise instances to signal the stats collection threads when to stop
Expand All @@ -561,7 +563,8 @@ int main(int argc, char* argv[])
[](void* args) { reinterpret_cast<PacketCaptureArgs*>(args)->stopCapture = true; }, &args);

// start receiving packets on the AF_XDP socket
auto res = dev.receivePackets(onPacketsArrive, &args, -1);
auto recvsocket = dev.getSocket(0);
auto res = recvsocket->receivePackets(onPacketsArrive, &args, -1);

// user clicked ctrl+c, prepare to shut the app down

Expand All @@ -587,7 +590,8 @@ int main(int argc, char* argv[])
if (sendDev != nullptr)
{
// collect final TX stats
txStats = new pcpp::XdpDevice::XdpDeviceStats(sendDev->getStatistics());
auto sendSocket = sendDev->getSocket(0);
txStats = new pcpp::XdpDevice::XdpDeviceStats(sendSocket->getStatistics());

// if the send and receive devices are the same - no need to close the device again
if (sendInterfaceName != interfaceName)
Expand All @@ -598,7 +602,7 @@ int main(int argc, char* argv[])
}

// collect final RX stats
pcpp::XdpDevice::XdpDeviceStats rxStats = dev.getStatistics();
pcpp::XdpDevice::XdpDeviceStats rxStats = recvsocket->getStatistics();

// close the XDP device
dev.close();
Expand Down
200 changes: 132 additions & 68 deletions Pcap++/header/XdpDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
#include "Device.h"
#include <utility>
#include <functional>
#include <array>

/// @namespace pcpp
/// @
namespace pcpp
{

// used to dimension sockets
#define PCPP_MAXIMUM_NUMBER_QUEUES 8

/// @class XdpDevice
/// A class wrapping the main functionality of using AF_XDP (XSK) sockets
/// which are optimized for high performance packet processing.
Expand All @@ -19,13 +24,6 @@ namespace pcpp
class XdpDevice : public IDevice
{
public:
/// @typedef OnPacketsArrive
/// The callback that is called whenever packets are received on the socket
/// @param[in] packets An array of the raw packets received
/// @param[in] packetCount The number of packets received
/// @param[in] device The XdpDevice packets are received from (represents the AF_XDP socket)
/// @param[in] userCookie A pointer to an object set by the user when receivePackets() started
typedef void (*OnPacketsArrive)(RawPacket packets[], uint32_t packetCount, XdpDevice* device, void* userCookie);

/// @struct XdpDeviceConfiguration
/// A struct containing the configuration parameters available for opening an XDP device
Expand All @@ -47,6 +45,10 @@ namespace pcpp
/// AF_XDP operation mode
AttachMode attachMode;

/// number of queues. Should be less than or equal to the number of hardware queues supported by the device
// the queue ids are inferred as consecutive starting at zero
uint32_t numQueues;

/// UMEM is a region of virtual contiguous memory, divided into equal-sized frames.
/// This parameter determines the number of frames that will be allocated as pert of the UMEM.
uint16_t umemNumFrames;
Expand Down Expand Up @@ -89,7 +91,7 @@ namespace pcpp
explicit XdpDeviceConfiguration(AttachMode attachMode = AutoMode, uint16_t umemNumFrames = 0,
uint16_t umemFrameSize = 0, uint32_t fillRingSize = 0,
uint32_t completionRingSize = 0, uint32_t rxSize = 0, uint32_t txSize = 0,
uint16_t rxTxBatchSize = 0)
uint16_t rxTxBatchSize = 0, uint32_t numQueues = 0)
{
this->attachMode = attachMode;
this->umemNumFrames = umemNumFrames;
Expand All @@ -99,6 +101,7 @@ namespace pcpp
this->rxSize = rxSize;
this->txSize = txSize;
this->rxTxBatchSize = rxTxBatchSize;
this->numQueues = numQueues;
}
};

Expand Down Expand Up @@ -184,58 +187,12 @@ namespace pcpp
return m_DeviceOpened;
}

/// Start receiving packets. In order to use this method the device should be open. Note that this method is
/// blocking and will return if:
/// - stopReceivePackets() was called from within the user callback
/// - timeoutMS passed without receiving any packets
/// - Some error occurred (an error log will be printed)
/// @param[in] onPacketsArrive A callback to be called when packets are received
/// @param[in] onPacketsArriveUserCookie The callback is invoked with this cookie as a parameter. It can be used
/// to pass information from the user application to the callback
/// @param[in] timeoutMS Timeout in milliseconds to stop if no packets are received. The default value is 5000
/// ms
/// @return True if stopped receiving packets because stopReceivePackets() was called or because timeoutMS
/// passed, or false if an error occurred.
bool receivePackets(OnPacketsArrive onPacketsArrive, void* onPacketsArriveUserCookie, int timeoutMS = 5000);

/// Stop receiving packets. Call this method from within the callback passed to receivePackets() whenever you
/// want to stop receiving packets.
void stopReceivePackets();

/// Send a vector of packet pointers.
/// @param[in] packets A vector of packet pointers to send
/// @param[in] waitForTxCompletion Wait for confirmation from the kernel that packets were sent. If set to true
/// this method will wait until the number of packets in the completion ring is equal or greater to the number
/// of packets that were sent. The default value is false
/// @param[in] waitForTxCompletionTimeoutMS If waitForTxCompletion is set to true, poll the completion ring with
/// this timeout. The default value is 5000 ms
/// @return True if all packets were sent, or if waitForTxCompletion is true - all sent packets were confirmed.
/// Returns false if an error occurred or if poll timed out.
bool sendPackets(const RawPacketVector& packets, bool waitForTxCompletion = false,
int waitForTxCompletionTimeoutMS = 5000);

/// Send an array of packets.
/// @param[in] packets An array of raw packets to send
/// @param[in] packetCount The length of the packet array
/// @param[in] waitForTxCompletion Wait for confirmation from the kernel that packets were sent. If set to true
/// this method will wait until the number of packets in the completion ring is equal or greater to the number
/// of packets sent. The default value is false
/// @param[in] waitForTxCompletionTimeoutMS If waitForTxCompletion is set to true, poll the completion ring with
/// this timeout. The default value is 5000 ms
/// @return True if all packets were sent, or if waitForTxCompletion is true - all sent packets were confirmed.
/// Returns false if an error occurred or if poll timed out.
bool sendPackets(RawPacket packets[], size_t packetCount, bool waitForTxCompletion = false,
int waitForTxCompletionTimeoutMS = 5000);

Comment on lines 188 to 230
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep this API, but forward to XdpSocket m_Sockets[0] if called to keep the existing compatibility?

They can still fail if the device is not opened and there are no sockets.

/// @return A pointer to the current device configuration. If the device is not open this method returns nullptr
XdpDeviceConfiguration* getConfig() const
{
return m_Config;
}

/// @return Current device statistics
XdpDeviceStats getStatistics();
Comment on lines 238 to 239
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, API compat.


private:
class XdpUmem
{
Expand Down Expand Up @@ -290,25 +247,132 @@ namespace pcpp
uint64_t txCompletedPackets;
};




public:

class XdpSocket
{
public:
XdpSocket(XdpDevice *device, uint32_t qid);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: Private / Protected accessibility and friend class XdpDevice.
Code outside of XdpDevice should not be able to create an XdpSocket.

~XdpSocket();

const XdpDevice *getDevice() { return m_Device; }

/// @typedef OnPacketsArrive
/// The callback that is called whenever packets are received on the socket
/// @param[in] packets An array of the raw packets received
/// @param[in] packetCount The number of packets received
/// @param[in] device The XdpDevice packets are received from (represents the AF_XDP socket)
/// @param[in] userCookie A pointer to an object set by the user when receivePackets() started
typedef void (*OnPacketsArrive)(RawPacket packets[], uint32_t packetCount, XdpSocket* socket, void* userCookie);
Copy link
Collaborator

@Dimi1010 Dimi1010 Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Not critical, but having socket nested to the class might make the signature a bit long? 🤔

External code would have to define it as:
void handler(pcpp::RawPacket packets[], uint32_t packetCount, pcpp::XdpDevice::XdpSocket* socket, void* userCookie).

Anyway, its not critical and can be decided later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah---I was beginning to think that too. But it also suggests that the socket is subordinate to the device which it kind of is, and I thought that was a good idea. But the order of things in the XdpDevice class got messy. I also thought it should be in its own file set which might make things tidier; I'll have to figure that out if you agree.

Copy link
Collaborator

@Dimi1010 Dimi1010 Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I don't think we need a new file for now.

I think the following order would work:

root (namespace pcpp)
- XdpDevice - forward declare.
- XdpSocket - full declaration.
  - XdpUmem - nested, private visibility.
- XdpDevice - full declaration.

I think this should be fine since XdpUmem was currently only used inside XdpSocket? (Famous last words)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, breaking change in the signature with the existing callback API, but not much that can be done here.

If we are keeping the old XdpDevice API that works on Queue 0, we can probably provide a wrapper for code that essentially does:

void handler(RawPacket packets[], uint32_t packetCount, XdpSocket* socket, void* userCookie)
{
  oldCallback(packets, packetCount, socket->getDevice(), userCookie);
}

to allow code using the old API to work seamlessly if it does not need extra queues.

It would probably require storing the callback in std::function instead of raw function pointer, but that has the secondary effect of allowing user code to use lambdas. For function pointers the difference between std::function and a raw function pointer is pretty negligible as most mainstream implementations have that as an optimization.



/// Start receiving packets. In order to use this method the device should be open. Note that this method is
/// blocking and will return if:
/// - stopReceivePackets() was called from within the user callback
/// - timeoutMS passed without receiving any packets
/// - Some error occurred (an error log will be printed)
/// @param[in] onPacketsArrive A callback to be called when packets are received
/// @param[in] onPacketsArriveUserCookie The callback is invoked with this cookie as a parameter. It can be used
/// to pass information from the user application to the callback
/// @param[in] timeoutMS Timeout in milliseconds to stop if no packets are received. The default value is 5000
/// ms
/// @return True if stopped receiving packets because stopReceivePackets() was called or because timeoutMS
/// passed, or false if an error occurred.
bool receivePackets(OnPacketsArrive onPacketsArrive, void* onPacketsArriveUserCookie, int timeoutMS = 5000);

/// Stop receiving packets. Call this method from within the callback passed to receivePackets() whenever you
/// want to stop receiving packets.
void stopReceivePackets();

/// Send a vector of packet pointers.
/// @param[in] packets A vector of packet pointers to send
/// @param[in] waitForTxCompletion Wait for confirmation from the kernel that packets were sent. If set to true
/// this method will wait until the number of packets in the completion ring is equal or greater to the number
/// of packets that were sent. The default value is false
/// @param[in] waitForTxCompletionTimeoutMS If waitForTxCompletion is set to true, poll the completion ring with
/// this timeout. The default value is 5000 ms
/// @return True if all packets were sent, or if waitForTxCompletion is true - all sent packets were confirmed.
/// Returns false if an error occurred or if poll timed out.
bool sendPackets(const RawPacketVector& packets, bool waitForTxCompletion = false,
int waitForTxCompletionTimeoutMS = 5000);

/// Send an array of packets.
/// @param[in] packets An array of raw packets to send
/// @param[in] packetCount The length of the packet array
/// @param[in] waitForTxCompletion Wait for confirmation from the kernel that packets were sent. If set to true
/// this method will wait until the number of packets in the completion ring is equal or greater to the number
/// of packets sent. The default value is false
/// @param[in] waitForTxCompletionTimeoutMS If waitForTxCompletion is set to true, poll the completion ring with
/// this timeout. The default value is 5000 ms
/// @return True if all packets were sent, or if waitForTxCompletion is true - all sent packets were confirmed.
/// Returns false if an error occurred or if poll timed out.
bool sendPackets(RawPacket packets[], size_t packetCount, bool waitForTxCompletion = false,
int waitForTxCompletionTimeoutMS = 5000);

/// @return Current device statistics
XdpDeviceStats getStatistics();

bool configure();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This generally should not be accessible by public user code.
It is better to move it to private and add XdpDevice as a friend of XdpSocket.


private:

void initialize()
{
m_Device = nullptr;
m_Queueid = 0;

m_ReceivingPackets = false;
m_Umem = nullptr;
m_SocketInfo = nullptr;
memset(&m_Stats, 0, sizeof(XdpDeviceStats));
memset(&m_PrevStats, 0, sizeof(XdpPrevDeviceStats));
}

// point to the device that has this socket
XdpDevice *m_Device;
uint32_t m_Queueid;

bool m_ReceivingPackets = false;
XdpUmem* m_Umem = nullptr;
void* m_SocketInfo = nullptr;
XdpDeviceStats m_Stats;
XdpPrevDeviceStats m_PrevStats;

bool sendPackets(const std::function<RawPacket(uint32_t)>& getPacketAt,
const std::function<uint32_t()>& getPacketCount, bool waitForTxCompletion = false,
int waitForTxCompletionTimeoutMS = 5000);
bool populateFillRing(uint32_t count, uint32_t rxId = 0);
bool populateFillRing(const std::vector<uint64_t>& addresses, uint32_t rxId);
uint32_t checkCompletionRing();
bool initUmem();
bool getSocketStats();
};

const std::string& getInterfaceName() const { return m_InterfaceName; }
XdpSocket *getSocket(uint32_t queueid)
{
if(queueid < m_NumQueues)
{
return m_Socket[queueid];
}

return nullptr;
}

private:

bool m_DeviceOpened = false;

std::string m_InterfaceName;
XdpDeviceConfiguration* m_Config;
bool m_ReceivingPackets;
XdpUmem* m_Umem;
void* m_SocketInfo;
XdpDeviceStats m_Stats;
XdpPrevDeviceStats m_PrevStats;

bool sendPackets(const std::function<RawPacket(uint32_t)>& getPacketAt,
const std::function<uint32_t()>& getPacketCount, bool waitForTxCompletion = false,
int waitForTxCompletionTimeoutMS = 5000);
bool populateFillRing(uint32_t count, uint32_t rxId = 0);
bool populateFillRing(const std::vector<uint64_t>& addresses, uint32_t rxId);
uint32_t checkCompletionRing();
bool configureSocket();
bool initUmem();
uint32_t m_NumQueues; // number of queues
std::array<XdpSocket*, PCPP_MAXIMUM_NUMBER_QUEUES> m_Socket;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, any specific reason std::array<XdpSocket*, N> was chosen instead of std::vector<XdpSocket>?

Vector would allow keeping only the active sockets, without having to deal with nullptr or manually deleting them. It would also remove a level of indirection when accessing XdpSocket through the container.

Also, since we are mapping thread queue to socket 1:1, using a std::vector<XdpSocket> we can use m_Socket.size() to fetch the number of active queues, removing the need for m_NumQueues.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS: It should probably allow further changes to XdpSocket to be fully RAII compliant, eliminating the need for configure and having it happen in the constructor, but that can be done later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also removes the MAX_QUEUES limitation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about these things. Because the index into the array is also the queueid I thought the array was more natural but we could do the same thing with vector. I also thought of configure as part of constructor but the current code has these as separate phases. I will give this another go---this was the initial concept of XdpSocket.


bool initConfig();
bool getSocketStats();

};
} // namespace pcpp
Loading
Loading