-
Notifications
You must be signed in to change notification settings - Fork 363
Description
I am currently testing AF_XDP (Address Family eXpress Data Path) functionality on an Orange Pi RV2 board running a custom Linux kernel (version 6.6.63-ky, RISC-V architecture). I have verified that the kernel has both eBPF and XDP support enabled, as shown below:
--- Kernel Configuration Highlights ---
• CONFIG_XDP_SOCKETS=y
• CONFIG_XDP_SOCKETS_DIAG=m
• CONFIG_BPF=y
• CONFIG_BPF_SYSCALL=y
• CONFIG_BPF_JIT=y
• CONFIG_BPF_JIT_ALWAYS_ON=y
• CONFIG_CGROUP_BPF=y
• CONFIG_NETFILTER_BPF_LINK=y
• CONFIG_BPF_EVENTS=y
To test AF_XDP, I used a forwarding application from this GitHub repository:
https://github.com/xdp-project/bpf-examples/tree/main/AF_XDP-forwarding
However, when I attempt to bind an AF_XDP socket to the interface (e.g., end1
or end0
), the bind operation fails. AF_XDP socket creation will be successful but the AF_XDP socket binding operation. Below shows the code and the output of xsk_fwd.c.
Note:- To understand the error clearly I have modified the original xsk_fwd.c for detailed debugging.
Code------------------------------------------------------------>
#define _GNU_SOURCE
#include <linux/if_ether.h>
#include <linux/if_xdp.h>
#include <sys/poll.h>
#include <err.h>
#include <pthread.h>
#include <signal.h>
#include <sched.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include <getopt.h>
#include <netinet/ether.h>
#include <net/if.h>
#include <sys/socket.h>
#include <linux/if_xdp.h>
#include <errno.h>
#undef timer_t
#undef fd_set
#undef loff_t
#undef dev_t
#undef int64_t
#undef u_int64_t
#undef blkcnt_t
#ifndef ARRAY_SIZE
#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
#endif
#include <bpf/libbpf.h>
#include <xdp/libxdp.h>
#include <xdp/xsk.h>
#ifndef XDP_FLAGS_SKB_MODE
#define XDP_FLAGS_SKB_MODE (1 << 2)
#endif
typedef __u64 u64;
typedef __u32 u32;
typedef __u16 u16;
typedef __u8 u8;
struct bpool_params {
u32 n_buffers;
u32 buffer_size;
int mmap_flags;
u32 n_users_max;
u32 n_buffers_per_slab;
};
struct bpool {
struct bpool_params params;
pthread_mutex_t lock;
void *addr;
u64 **slabs;
u64 **slabs_reserved;
u64 *buffers;
u64 *buffers_reserved;
u64 n_slabs;
u64 n_slabs_reserved;
u64 n_buffers;
u64 n_slabs_available;
u64 n_slabs_reserved_available;
struct xsk_umem_config umem_cfg;
struct xsk_ring_prod umem_fq;
struct xsk_ring_cons umem_cq;
struct xsk_umem *umem;
};
static struct bpool *
bpool_init(struct bpool_params *params, struct xsk_umem_config *umem_cfg)
{
struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY};
u64 n_slabs, n_slabs_reserved, n_buffers, n_buffers_reserved;
u64 slabs_size, slabs_reserved_size, buffers_size, buffers_reserved_size;
u64 total_size, i;
struct bpool *bp;
u8 *p;
int status;
fprintf(stderr, "bpool_init: Checking memory lock limits\n");
if (setrlimit(RLIMIT_MEMLOCK, &r)) {
fprintf(stderr, "bpool_init: Failed to set RLIMIT_MEMLOCK: %s\n", strerror(errno));
return NULL;
}
n_slabs = (params->n_buffers + params->n_buffers_per_slab - 1) / params->n_buffers_per_slab;
n_slabs_reserved = params->n_users_max * 2;
n_buffers = n_slabs * params->n_buffers_per_slab;
n_buffers_reserved = n_slabs_reserved * params->n_buffers_per_slab;
fprintf(stderr, "bpool_init: n_slabs=%llu, n_slabs_reserved=%llu, n_buffers=%llu, n_buffers_reserved=%llu\n",
n_slabs, n_slabs_reserved, n_buffers, n_buffers_reserved);
slabs_size = n_slabs * sizeof(u64 *);
slabs_reserved_size = n_slabs_reserved * sizeof(u64 *);
buffers_size = n_buffers * sizeof(u64);
buffers_reserved_size = n_buffers_reserved * sizeof(u64);
total_size = sizeof(struct bpool) + slabs_size + slabs_reserved_size + buffers_size + buffers_reserved_size;
fprintf(stderr, "bpool_init: Total allocation size=%llu bytes\n", total_size);
p = calloc(total_size, sizeof(u8));
if (!p) {
fprintf(stderr, "bpool_init: Failed to allocate %llu bytes: %s\n", total_size, strerror(errno));
return NULL;
}
fprintf(stderr, "bpool_init: Allocated memory at %p\n", p);
bp = (struct bpool *)p;
memcpy(&bp->params, params, sizeof(*params));
bp->params.n_buffers = n_buffers;
bp->slabs = (u64 **)&p[sizeof(struct bpool)];
bp->slabs_reserved = (u64 **)&p[sizeof(struct bpool) + slabs_size];
bp->buffers = (u64 *)&p[sizeof(struct bpool) + slabs_size + slabs_reserved_size];
bp->buffers_reserved = (u64 *)&p[sizeof(struct bpool) + slabs_size + slabs_reserved_size + buffers_size];
bp->n_slabs = n_slabs;
bp->n_slabs_reserved = n_slabs_reserved;
bp->n_buffers = n_buffers;
fprintf(stderr, "bpool_init: Initialized bpool: slabs=%llu, slabs_reserved=%llu, buffers=%llu\n",
bp->n_slabs, bp->n_slabs_reserved, bp->n_buffers);
for (i = 0; i < n_slabs; i++)
bp->slabs[i] = &bp->buffers[i * params->n_buffers_per_slab];
bp->n_slabs_available = n_slabs;
for (i = 0; i < n_slabs_reserved; i++)
bp->slabs_reserved[i] = &bp->buffers_reserved[i * params->n_buffers_per_slab];
bp->n_slabs_reserved_available = n_slabs_reserved;
for (i = 0; i < n_buffers; i++)
bp->buffers[i] = i * params->buffer_size;
fprintf(stderr, "bpool_init: Initialized slabs and buffers\n");
status = pthread_mutex_init(&bp->lock, NULL);
if (status) {
fprintf(stderr, "bpool_init: Failed to initialize mutex: %s\n", strerror(status));
free(p);
return NULL;
}
fprintf(stderr, "bpool_init: Mutex initialized\n");
fprintf(stderr, "bpool_init: Attempting mmap of %llu bytes\n", n_buffers * params->buffer_size);
bp->addr = mmap(NULL, n_buffers * params->buffer_size, PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS | params->mmap_flags, -1, 0);
if (bp->addr == MAP_FAILED) {
fprintf(stderr, "bpool_init: mmap failed: %s\n", strerror(errno));
pthread_mutex_destroy(&bp->lock);
free(p);
return NULL;
}
fprintf(stderr, "bpool_init: mmap successful at %p\n", bp->addr);
fprintf(stderr, "bpool_init: Creating UMEM with size=%u, frame_size=%u, headroom=%u\n",
bp->params.n_buffers * bp->params.buffer_size, umem_cfg->frame_size, umem_cfg->frame_headroom);
status = xsk_umem__create(&bp->umem, bp->addr, bp->params.n_buffers * bp->params.buffer_size,
&bp->umem_fq, &bp->umem_cq, umem_cfg);
if (status) {
fprintf(stderr, "bpool_init: Failed to create UMEM: %s\n", strerror(-status));
munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
pthread_mutex_destroy(&bp->lock);
free(p);
return NULL;
}
fprintf(stderr, "bpool_init: UMEM created successfully\n");
memcpy(&bp->umem_cfg, umem_cfg, sizeof(*umem_cfg));
return bp;
}
static void
bpool_free(struct bpool *bp)
{
if (!bp)
return;
fprintf(stderr, "bpool_free: Freeing buffer pool\n");
xsk_umem__delete(bp->umem);
munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
pthread_mutex_destroy(&bp->lock);
free(bp);
fprintf(stderr, "bpool_free: Buffer pool freed\n");
}
struct bcache {
struct bpool *bp;
u64 *slab_cons;
u64 *slab_prod;
u64 n_buffers_cons;
u64 n_buffers_prod;
};
static struct bcache *
bcache_init(struct bpool *bp)
{
struct bcache *bc;
fprintf(stderr, "bcache_init: Initializing buffer cache\n");
bc = calloc(1, sizeof(struct bcache));
if (!bc) {
fprintf(stderr, "bcache_init: Failed to allocate bcache: %s\n", strerror(errno));
return NULL;
}
fprintf(stderr, "bcache_init: Allocated bcache at %p\n", bc);
bc->bp = bp;
bc->n_buffers_cons = 0;
bc->n_buffers_prod = 0;
pthread_mutex_lock(&bp->lock);
if (bp->n_slabs_reserved_available == 0) {
fprintf(stderr, "bcache_init: No reserved slabs available\n");
pthread_mutex_unlock(&bp->lock);
free(bc);
return NULL;
}
bc->slab_cons = bp->slabs_reserved[bp->n_slabs_reserved_available - 1];
bc->slab_prod = bp->slabs_reserved[bp->n_slabs_reserved_available - 2];
bp->n_slabs_reserved_available -= 2;
pthread_mutex_unlock(&bp->lock);
fprintf(stderr, "bcache_init: Buffer cache initialized, cons_slab=%p, prod_slab=%p\n",
bc->slab_cons, bc->slab_prod);
return bc;
}
static void
bcache_free(struct bcache *bc)
{
struct bpool *bp;
if (!bc)
return;
fprintf(stderr, "bcache_free: Freeing buffer cache\n");
bp = bc->bp;
pthread_mutex_lock(&bp->lock);
bp->slabs_reserved[bp->n_slabs_reserved_available] = bc->slab_prod;
bp->slabs_reserved[bp->n_slabs_reserved_available + 1] = bc->slab_cons;
bp->n_slabs_reserved_available += 2;
pthread_mutex_unlock(&bp->lock);
free(bc);
fprintf(stderr, "bcache_free: Buffer cache freed\n");
}
static inline u32
bcache_cons_check(struct bcache *bc, u32 n_buffers)
{
struct bpool *bp = bc->bp;
u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
u64 n_buffers_cons = bc->n_buffers_cons;
u64 n_slabs_available;
u64 *slab_full;
if (n_buffers_cons) {
u32 available = (n_buffers_cons < n_buffers) ? n_buffers_cons : n_buffers;
fprintf(stderr, "bcache_cons_check: Using %u buffers from consumer slab\n", available);
return available;
}
fprintf(stderr, "bcache_cons_check: Consumer slab empty, checking pool\n");
pthread_mutex_lock(&bp->lock);
n_slabs_available = bp->n_slabs_available;
if (!n_slabs_available) {
fprintf(stderr, "bcache_cons_check: No slabs available in pool\n");
pthread_mutex_unlock(&bp->lock);
return 0;
}
n_slabs_available--;
slab_full = bp->slabs[n_slabs_available];
bp->slabs[n_slabs_available] = bc->slab_cons;
bp->n_slabs_available = n_slabs_available;
pthread_mutex_unlock(&bp->lock);
bc->slab_cons = slab_full;
bc->n_buffers_cons = n_buffers_per_slab;
fprintf(stderr, "bcache_cons_check: Swapped consumer slab, now %llu buffers available\n", bc->n_buffers_cons);
return n_buffers;
}
static inline u64
bcache_cons(struct bcache *bc)
{
u64 n_buffers_cons = bc->n_buffers_cons - 1;
u64 buffer;
buffer = bc->slab_cons[n_buffers_cons];
bc->n_buffers_cons = n_buffers_cons;
fprintf(stderr, "bcache_cons: Consumed buffer %llu, remaining %llu\n", buffer, n_buffers_cons);
return buffer;
}
static inline void
bcache_prod(struct bcache *bc, u64 buffer)
{
struct bpool *bp = bc->bp;
u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
u64 n_buffers_prod = bc->n_buffers_prod;
u64 n_slabs_available;
u64 *slab_empty;
if (n_buffers_prod < n_buffers_per_slab) {
bc->slab_prod[n_buffers_prod] = buffer;
bc->n_buffers_prod = n_buffers_prod + 1;
fprintf(stderr, "bcache_prod: Stored buffer %llu, prod count=%llu\n", buffer, bc->n_buffers_prod);
return;
}
fprintf(stderr, "bcache_prod: Producer slab full, swapping with pool\n");
pthread_mutex_lock(&bp->lock);
n_slabs_available = bp->n_slabs_available;
slab_empty = bp->slabs[n_slabs_available];
bp->slabs[n_slabs_available] = bc->slab_prod;
bp->n_slabs_available = n_slabs_available + 1;
pthread_mutex_unlock(&bp->lock);
slab_empty[0] = buffer;
bc->slab_prod = slab_empty;
bc->n_buffers_prod = 1;
fprintf(stderr, "bcache_prod: Swapped producer slab, stored buffer %llu\n", buffer);
}
#ifndef MAX_BURST_RX
#define MAX_BURST_RX 64
#endif
#ifndef MAX_BURST_TX
#define MAX_BURST_TX 64
#endif
struct burst_rx {
u64 addr[MAX_BURST_RX];
u32 len[MAX_BURST_RX];
};
struct burst_tx {
u64 addr[MAX_BURST_TX];
u32 len[MAX_BURST_TX];
u32 n_pkts;
};
struct port_params {
struct xsk_socket_config xsk_cfg;
struct bpool *bp;
const char *iface;
u32 iface_queue;
};
struct port {
struct port_params params;
struct bcache *bc;
int xsk_fd;
struct xsk_ring_cons rxq;
struct xsk_ring_prod txq;
struct xsk_ring_prod umem_fq;
struct xsk_ring_cons umem_cq;
struct xsk_socket *xsk;
int umem_fq_initialized;
u64 n_pkts_rx;
u64 n_pkts_tx;
};
static void
port_free(struct port *p)
{
if (!p) return;
fprintf(stderr, "port_free: Freeing port for %s queue %u\n", p->params.iface, p->params.iface_queue);
if (p->xsk_fd > 0) {
close(p->xsk_fd);
fprintf(stderr, "port_free: Closed socket fd %d\n", p->xsk_fd);
}
if (p->bc) {
bcache_free(p->bc);
p->bc = NULL;
}
free(p);
fprintf(stderr, "port_free: Port freed\n");
}
static struct port *
port_init(struct port_params *params)
{
struct port *p;
u32 umem_fq_size, pos = 0;
int i;
fprintf(stderr, "port_init: Initializing port for interface %s, queue %u\n",
params->iface, params->iface_queue);
p = calloc(1, sizeof(struct port));
if (!p) {
fprintf(stderr, "port_init: Failed to allocate port structure: %s\n", strerror(errno));
return NULL;
}
fprintf(stderr, "port_init: Allocated port structure at %p\n", p);
memcpy(&p->params, params, sizeof(*params));
umem_fq_size = params->bp->umem_cfg.fill_size;
fprintf(stderr, "port_init: Initializing bcache\n");
p->bc = bcache_init(params->bp);
if (!p->bc) {
fprintf(stderr, "port_init: Failed to initialize bcache\n");
port_free(p);
return NULL;
}
fprintf(stderr, "port_init: bcache initialized\n");
fprintf(stderr, "port_init: Creating AF_XDP socket\n");
p->xsk_fd = socket(AF_XDP, SOCK_RAW, 0);
if (p->xsk_fd < 0) {
fprintf(stderr, "port_init: Failed to create AF_XDP socket: %s\n", strerror(errno));
port_free(p);
return NULL;
}
fprintf(stderr, "port_init: AF_XDP socket created, fd=%d\n", p->xsk_fd);
struct xdp_umem_reg umem_cfg = {
.addr = (__u64)params->bp->addr,
.len = params->bp->params.n_buffers * params->bp->params.buffer_size,
.chunk_size = params->bp->umem_cfg.frame_size,
.headroom = params->bp->umem_cfg.frame_headroom,
.flags = 0
};
fprintf(stderr, "port_init: Registering UMEM: addr=%llu, len=%llu, chunk_size=%u, headroom=%u\n",
umem_cfg.addr, umem_cfg.len, umem_cfg.chunk_size, umem_cfg.headroom);
if (setsockopt(p->xsk_fd, SOL_XDP, XDP_UMEM_REG, &umem_cfg, sizeof(umem_cfg)) < 0) {
fprintf(stderr, "port_init: Failed to register UMEM: %s\n", strerror(errno));
port_free(p);
return NULL;
}
fprintf(stderr, "port_init: UMEM registered\n");
u32 rx_ring_size = 256;
u32 tx_ring_size = 256;
fprintf(stderr, "port_init: Setting RX ring size=%u\n", rx_ring_size);
if (setsockopt(p->xsk_fd, SOL_XDP, XDP_RX_RING, &rx_ring_size, sizeof(rx_ring_size)) < 0) {
fprintf(stderr, "port_init: Failed to set RX ring: %s\n", strerror(errno));
port_free(p);
return NULL;
}
fprintf(stderr, "port_init: RX ring set\n");
fprintf(stderr, "port_init: Setting TX ring size=%u\n", tx_ring_size);
if (setsockopt(p->xsk_fd, SOL_XDP, XDP_TX_RING, &tx_ring_size, sizeof(tx_ring_size)) < 0) {
fprintf(stderr, "port_init: Failed to set TX ring: %s\n", strerror(errno));
port_free(p);
return NULL;
}
fprintf(stderr, "port_init: TX ring set\n");
struct sockaddr_xdp sxdp = {
.sxdp_family = AF_XDP,
.sxdp_ifindex = if_nametoindex(params->iface),
.sxdp_queue_id = params->iface_queue,
.sxdp_flags = XDP_COPY | XDP_USE_NEED_WAKEUP
};
fprintf(stderr, "port_init: Binding to interface %s (ifindex=%u), queue %u, flags=%u\n",
params->iface, sxdp.sxdp_ifindex, sxdp.sxdp_queue_id, sxdp.sxdp_flags);
if (sxdp.sxdp_ifindex == 0) {
fprintf(stderr, "port_init: Invalid interface %s: %s\n", params->iface, strerror(errno));
port_free(p);
return NULL;
}
if (bind(p->xsk_fd, (struct sockaddr *)&sxdp, sizeof(sxdp)) < 0) {
fprintf(stderr, "port_init: Failed to bind AF_XDP socket: %s\n", strerror(errno));
fprintf(stderr, "port_init: Driver may not support AF_XDP or interface %s is misconfigured\n", params->iface);
port_free(p);
return NULL;
}
fprintf(stderr, "port_init: Socket bound successfully\n");
fprintf(stderr, "port_init: Initializing UMEM fill queue, size=%u\n", umem_fq_size);
xsk_ring_prod__reserve(&p->umem_fq, umem_fq_size, &pos);
for (i = 0; i < umem_fq_size; i++) {
*xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) = bcache_cons(p->bc);
}
xsk_ring_prod__submit(&p->umem_fq, umem_fq_size);
p->umem_fq_initialized = 1;
fprintf(stderr, "port_init: UMEM fill queue initialized\n");
fprintf(stderr, "port_init: Successfully initialized port for %s queue %d\n",
params->iface, params->iface_queue);
return p;
}
static inline u32
port_rx_burst(struct port *p, struct burst_rx *b)
{
u32 n_pkts, pos, i;
n_pkts = ARRAY_SIZE(b->addr);
fprintf(stderr, "port_rx_burst: Attempting to receive up to %u packets\n", n_pkts);
n_pkts = bcache_cons_check(p->bc, n_pkts);
if (!n_pkts) {
fprintf(stderr, "port_rx_burst: No buffers available\n");
return 0;
}
n_pkts = xsk_ring_cons__peek(&p->rxq, n_pkts, &pos);
if (!n_pkts) {
if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
struct pollfd pollfd = {
.fd = xsk_socket__fd(p->xsk),
.events = POLLIN,
};
fprintf(stderr, "port_rx_burst: No packets, polling fd %d\n", pollfd.fd);
poll(&pollfd, 1, 0);
}
return 0;
}
fprintf(stderr, "port_rx_burst: Received %u packets\n", n_pkts);
for (i = 0; i < n_pkts; i++) {
b->addr[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->addr;
b->len[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->len;
}
xsk_ring_cons__release(&p->rxq, n_pkts);
p->n_pkts_rx += n_pkts;
for (;;) {
int status;
status = xsk_ring_prod__reserve(&p->umem_fq, n_pkts, &pos);
if (status == n_pkts)
break;
if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
struct pollfd pollfd = {
.fd = xsk_socket__fd(p->xsk),
.events = POLLIN,
};
fprintf(stderr, "port_rx_burst: Reserving UMEM FQ, polling fd %d\n", pollfd.fd);
poll(&pollfd, 1, 0);
}
}
for (i = 0; i < n_pkts; i++)
*xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) = bcache_cons(p->bc);
xsk_ring_prod__submit(&p->umem_fq, n_pkts);
fprintf(stderr, "port_rx_burst: Submitted %u buffers to UMEM FQ\n", n_pkts);
return n_pkts;
}
static inline void
port_tx_burst(struct port *p, struct burst_tx *b)
{
u32 n_pkts, pos, i;
int status;
n_pkts = p->params.bp->umem_cfg.comp_size;
fprintf(stderr, "port_tx_burst: Checking UMEM CQ for %u completions\n", n_pkts);
n_pkts = xsk_ring_cons__peek(&p->umem_cq, n_pkts, &pos);
for (i = 0; i < n_pkts; i++) {
u64 addr = *xsk_ring_cons__comp_addr(&p->umem_cq, pos + i);
bcache_prod(p->bc, addr);
}
xsk_ring_cons__release(&p->umem_cq, n_pkts);
fprintf(stderr, "port_tx_burst: Processed %u completions\n", n_pkts);
n_pkts = b->n_pkts;
fprintf(stderr, "port_tx_burst: Transmitting %u packets\n", n_pkts);
for (;;) {
status = xsk_ring_prod__reserve(&p->txq, n_pkts, &pos);
if (status == n_pkts)
break;
if (xsk_ring_prod__needs_wakeup(&p->txq)) {
fprintf(stderr, "port_tx_burst: TXQ needs wakeup, sending on fd %d\n", xsk_socket__fd(p->xsk));
sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0);
}
}
for (i = 0; i < n_pkts; i++) {
xsk_ring_prod__tx_desc(&p->txq, pos + i)->addr = b->addr[i];
xsk_ring_prod__tx_desc(&p->txq, pos + i)->len = b->len[i];
}
xsk_ring_prod__submit(&p->txq, n_pkts);
if (xsk_ring_prod__needs_wakeup(&p->txq)) {
fprintf(stderr, "port_tx_burst: TXQ submit needs wakeup, sending on fd %d\n", xsk_socket__fd(p->xsk));
sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0);
}
p->n_pkts_tx += n_pkts;
fprintf(stderr, "port_tx_burst: Transmitted %u packets\n", n_pkts);
}
#ifndef MAX_PORTS_PER_THREAD
#define MAX_PORTS_PER_THREAD 16
#endif
struct thread_data {
struct port *ports_rx[MAX_PORTS_PER_THREAD];
struct port *ports_tx[MAX_PORTS_PER_THREAD];
u32 n_ports_rx;
struct burst_rx burst_rx;
struct burst_tx burst_tx[MAX_PORTS_PER_THREAD];
u32 cpu_core_id;
int quit;
};
static void
swap_mac_addresses(void *data)
{
struct ether_header *eth = (struct ether_header *)data;
struct ether_addr *src_addr = (struct ether_addr *)ð->ether_shost;
struct ether_addr *dst_addr = (struct ether_addr *)ð->ether_dhost;
struct ether_addr tmp;
tmp = *src_addr;
*src_addr = *dst_addr;
*dst_addr = tmp;
}
static void *
thread_func(void *arg)
{
struct thread_data *t = arg;
cpu_set_t cpu_cores;
u32 i;
fprintf(stderr, "thread_func: Starting thread on CPU core %u\n", t->cpu_core_id);
CPU_ZERO(&cpu_cores);
CPU_SET(t->cpu_core_id, &cpu_cores);
if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_cores)) {
fprintf(stderr, "thread_func: Failed to set thread affinity to CPU %u: %s\n",
t->cpu_core_id, strerror(errno));
} else {
fprintf(stderr, "thread_func: Thread affinity set to CPU %u\n", t->cpu_core_id);
}
for (i = 0; !t->quit; i = (i + 1) & (t->n_ports_rx - 1)) {
struct port *port_rx = t->ports_rx[i];
struct port *port_tx = t->ports_tx[i];
struct burst_rx *brx = &t->burst_rx;
struct burst_tx *btx = &t->burst_tx[i];
u32 n_pkts, j;
n_pkts = port_rx_burst(port_rx, brx);
if (!n_pkts)
continue;
fprintf(stderr, "thread_func: Processing %u packets from %s queue %u\n",
n_pkts, port_rx->params.iface, port_rx->params.iface_queue);
for (j = 0; j < n_pkts; j++) {
u64 addr = xsk_umem__add_offset_to_addr(brx->addr[j]);
u8 *pkt = xsk_umem__get_data(port_rx->params.bp->addr, addr);
swap_mac_addresses(pkt);
btx->addr[btx->n_pkts] = brx->addr[j];
btx->len[btx->n_pkts] = brx->len[j];
btx->n_pkts++;
if (btx->n_pkts == MAX_BURST_TX) {
port_tx_burst(port_tx, btx);
btx->n_pkts = 0;
}
}
}
fprintf(stderr, "thread_func: Thread on CPU %u exiting\n", t->cpu_core_id);
return NULL;
}
static const struct bpool_params bpool_params_default = {
.n_buffers = 4 * 1024,
.buffer_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
.mmap_flags = 0,
.n_users_max = 16,
.n_buffers_per_slab = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
};
static const struct xsk_umem_config umem_cfg_default = {
.fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
.comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM,
.flags = 0,
};
static const struct port_params port_params_default = {
.xsk_cfg = {
.rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
.tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
.libxdp_flags = 0,
.xdp_flags = XDP_FLAGS_SKB_MODE,
.bind_flags = XDP_USE_NEED_WAKEUP,
},
.bp = NULL,
.iface = NULL,
.iface_queue = 0,
};
#ifndef MAX_PORTS
#define MAX_PORTS 64
#endif
#ifndef MAX_THREADS
#define MAX_THREADS 64
#endif
static struct bpool_params bpool_params;
static struct xsk_umem_config umem_cfg;
static struct bpool *bp;
static struct port_params port_params[MAX_PORTS];
static struct port *ports[MAX_PORTS];
static u64 n_pkts_rx[MAX_PORTS];
static u64 n_pkts_tx[MAX_PORTS];
static int n_ports;
static pthread_t threads[MAX_THREADS];
static struct thread_data thread_data[MAX_THREADS];
static int n_threads;
static void
print_usage(char *prog_name)
{
const char *usage =
"Usage:\n"
"\t%s [ -b SIZE ] -c CORE -i INTERFACE [ -q QUEUE ]\n"
"\n"
"-c CORE CPU core to run a packet forwarding thread\n"
" on. May be invoked multiple times.\n"
"\n"
"-b SIZE Number of buffers in the buffer pool shared\n"
" by all the forwarding threads. Default: %u.\n"
"\n"
"-i INTERFACE Network interface. Each (INTERFACE, QUEUE)\n"
" pair specifies one forwarding port. May be\n"
" invoked multiple times.\n"
"\n"
"-q QUEUE Network interface queue for RX and TX. Each\n"
" (INTERFACE, QUEUE) pair specified one\n"
" forwarding port. Default: %u. May be invoked\n"
" multiple times.\n"
"\n";
printf(usage, prog_name, bpool_params_default.n_buffers, port_params_default.iface_queue);
}
static int
parse_args(int argc, char **argv)
{
struct option lgopts[] = {{NULL, 0, 0, 0}};
int opt, option_index;
fprintf(stderr, "parse_args: Parsing command-line arguments\n");
for (;;) {
opt = getopt_long(argc, argv, "c:i:q:", lgopts, &option_index);
if (opt == EOF)
break;
fprintf(stderr, "parse_args: Processing option %c, arg=%s\n", opt, optarg);
switch (opt) {
case 'b':
bpool_params.n_buffers = atoi(optarg);
fprintf(stderr, "parse_args: Set buffer pool size to %u\n", bpool_params.n_buffers);
break;
case 'c':
if (n_threads == MAX_THREADS) {
fprintf(stderr, "parse_args: Max threads (%d) reached\n", MAX_THREADS);
return -1;
}
thread_data[n_threads].cpu_core_id = atoi(optarg);
fprintf(stderr, "parse_args: Added thread on CPU %u\n", thread_data[n_threads].cpu_core_id);
n_threads++;
break;
case 'i':
if (n_ports == MAX_PORTS) {
fprintf(stderr, "parse_args: Max ports (%d) reached\n", MAX_PORTS);
return -1;
}
port_params[n_ports].iface = optarg;
port_params[n_ports].iface_queue = 0;
fprintf(stderr, "parse_args: Added port for interface %s\n", optarg);
n_ports++;
break;
case 'q':
if (n_ports == 0) {
fprintf(stderr, "parse_args: No port specified for queue\n");
return -1;
}
port_params[n_ports - 1].iface_queue = atoi(optarg);
fprintf(stderr, "parse_args: Set queue %u for port %d\n", port_params[n_ports - 1].iface_queue, n_ports - 1);
break;
default:
fprintf(stderr, "parse_args: Illegal argument %c\n", opt);
return -1;
}
}
optind = 1;
if (!n_ports) {
fprintf(stderr, "parse_args: No ports specified\n");
return -1;
}
if (!n_threads) {
fprintf(stderr, "parse_args: No threads specified\n");
return -1;
}
if (n_ports % n_threads) {
fprintf(stderr, "parse_args: Ports (%d) cannot be evenly distributed to threads (%d)\n", n_ports, n_threads);
return -1;
}
fprintf(stderr, "parse_args: Parsed %d ports and %d threads\n", n_ports, n_threads);
return 0;
}
static void
print_port(u32 port_id)
{
struct port *port = ports[port_id];
printf("Port %u: interface = %s, queue = %u\n",
port_id, port->params.iface, port->params.iface_queue);
}
static void
print_thread(u32 thread_id)
{
struct thread_data *t = &thread_data[thread_id];
u32 i;
printf("Thread %u (CPU core %u): ", thread_id, t->cpu_core_id);
for (i = 0; i < t->n_ports_rx; i++) {
struct port *port_rx = t->ports_rx[i];
struct port *port_tx = t->ports_tx[i];
printf("(%s, %u) -> (%s, %u), ",
port_rx->params.iface, port_rx->params.iface_queue,
port_tx->params.iface, port_tx->params.iface_queue);
}
printf("\n");
}
static void
print_port_stats_separator(void)
{
printf("+-%4s-+-%12s-+-%13s-+-%12s-+-%13s-+\n",
"----", "------------", "-------------", "------------", "-------------");
}
static void
print_port_stats_header(void)
{
print_port_stats_separator();
printf("| %4s | %12s | %13s | %12s | %13s |\n",
"Port", "RX packets", "RX rate (pps)", "TX packets", "TX_rate (pps)");
print_port_stats_separator();
}
static void
print_port_stats_trailer(void)
{
print_port_stats_separator();
printf("\n");
}
static void
print_port_stats(int port_id, u64 ns_diff)
{
struct port *p = ports[port_id];
double rx_pps, tx_pps;
rx_pps = (p->n_pkts_rx - n_pkts_rx[port_id]) * 1000000000. / ns_diff;
tx_pps = (p->n_pkts_tx - n_pkts_tx[port_id]) * 1000000000. / ns_diff;
printf("| %4d | %12llu | %13.0f | %12llu | %13.0f |\n",
port_id, p->n_pkts_rx, rx_pps, p->n_pkts_tx, tx_pps);
n_pkts_rx[port_id] = p->n_pkts_rx;
n_pkts_tx[port_id] = p->n_pkts_tx;
}
static void
print_port_stats_all(u64 ns_diff)
{
int i;
print_port_stats_header();
for (i = 0; i < n_ports; i++)
print_port_stats(i, ns_diff);
print_port_stats_trailer();
}
static int quit;
static void
signal_handler(int sig)
{
fprintf(stderr, "signal_handler: Received signal %d, quitting\n", sig);
quit = 1;
}
static void
remove_xdp_program(void)
{
struct xdp_multiprog *mp;
int i, err;
fprintf(stderr, "remove_xdp_program: Detaching XDP programs\n");
for (i = 0; i < n_ports; i++) {
fprintf(stderr, "remove_xdp_program: Checking interface %s\n", port_params[i].iface);
mp = xdp_multiprog__get_from_ifindex(if_nametoindex(port_params[i].iface));
if (libxdp_get_error(mp)) {
fprintf(stderr, "remove_xdp_program: No XDP program on interface %s\n", port_params[i].iface);
continue;
}
err = xdp_multiprog__detach(mp);
if (err)
fprintf(stderr, "remove_xdp_program: Failed to detach XDP program on %s: %s\n",
port_params[i].iface, strerror(-err));
else
fprintf(stderr, "remove_xdp_program: Detached XDP program on %s\n", port_params[i].iface);
}
}
int main(int argc, char **argv)
{
struct timespec time;
u64 ns0;
int i;
fprintf(stderr, "main: Starting program on OrangePi R1V2\n");
FILE *fp = popen("uname -r", "r");
if (fp) {
char kernel_version[256];
if (fgets(kernel_version, sizeof(kernel_version), fp) != NULL) {
fprintf(stderr, "main: Kernel version: %s", kernel_version);
}
pclose(fp);
} else {
fprintf(stderr, "main: Failed to get kernel version: %s\n", strerror(errno));
}
fprintf(stderr, "main: libbpf version: %s\n", libbpf_version_string());
fprintf(stderr, "main: libxdp present (xdp_multiprog__get_from_ifindex available)\n");
for (i = 0; i < argc; i++) {
if (strncmp(argv[i], "-i", 2) == 0 && i + 1 < argc) {
char cmd[256];
snprintf(cmd, sizeof(cmd), "ethtool -i %s", argv[i + 1]);
fp = popen(cmd, "r");
if (fp) {
char line[256];
fprintf(stderr, "main: Interface %s driver info:\n", argv[i + 1]);
while (fgets(line, sizeof(line), fp) != NULL) {
fprintf(stderr, "main: %s", line);
}
pclose(fp);
} else {
fprintf(stderr, "main: Failed to get driver info for %s: %s\n", argv[i + 1], strerror(errno));
}
}
}
for (i = 0; i < MAX_PORTS; i++)
ports[i] = NULL;
bp = NULL;
memcpy(&bpool_params, &bpool_params_default, sizeof(struct bpool_params));
memcpy(&umem_cfg, &umem_cfg_default, sizeof(struct xsk_umem_config));
for (i = 0; i < MAX_PORTS; i++)
memcpy(&port_params[i], &port_params_default, sizeof(struct port_params));
if (parse_args(argc, argv)) {
fprintf(stderr, "main: Argument parsing failed\n");
print_usage(argv[0]);
return 1;
}
fprintf(stderr, "main: Initializing buffer pool\n");
bp = bpool_init(&bpool_params, &umem_cfg);
if (!bp) {
fprintf(stderr, "main: Buffer pool initialization failed\n");
return 1;
}
fprintf(stderr, "main: Buffer pool created successfully\n");
for (i = 0; i < MAX_PORTS; i++)
port_params[i].bp = bp;
for (i = 0; i < n_ports; i++) {
fprintf(stderr, "main: Initializing port %d\n", i);
ports[i] = port_init(&port_params[i]);
if (!ports[i]) {
fprintf(stderr, "main: Port %d initialization failed\n", i);
for (int j = 0; j < i; j++) {
if (ports[j]) {
fprintf(stderr, "main: Cleaning up port %d\n", j);
port_free(ports[j]);
ports[j] = NULL;
}
}
if (bp) {
fprintf(stderr, "main: Freeing buffer pool\n");
bpool_free(bp);
bp = NULL;
}
return 1;
}
print_port(i);
}
fprintf(stderr, "main: All ports created successfully\n");
for (i = 0; i < n_threads; i++) {
struct thread_data *t = &thread_data[i];
u32 n_ports_per_thread = n_ports / n_threads, j;
for (j = 0; j < n_ports_per_thread; j++) {
t->ports_rx[j] = ports[i * n_ports_per_thread + j];
t->ports_tx[j] = ports[i * n_ports_per_thread + (j + 1) % n_ports_per_thread];
}
t->n_ports_rx = n_ports_per_thread;
print_thread(i);
}
for (i = 0; i < n_threads; i++) {
int status;
fprintf(stderr, "main: Creating thread %d\n", i);
status = pthread_create(&threads[i], NULL, thread_func, &thread_data[i]);
if (status) {
fprintf(stderr, "main: Thread %d creation failed: %s\n", i, strerror(status));
for (int j = 0; j < n_ports; j++) {
if (ports[j]) {
fprintf(stderr, "main: Cleaning up port %d\n", j);
port_free(ports[j]);
ports[j] = NULL;
}
}
if (bp) {
fprintf(stderr, "main: Freeing buffer pool\n");
bpool_free(bp);
bp = NULL;
}
return 1;
}
}
fprintf(stderr, "main: All threads created successfully\n");
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
signal(SIGABRT, signal_handler);
clock_gettime(CLOCK_MONOTONIC, &time);
ns0 = time.tv_sec * 1000000000ULL + time.tv_nsec;
for (; !quit;) {
u64 ns1, ns_diff;
sleep(1);
clock_gettime(CLOCK_MONOTONIC, &time);
ns1 = time.tv_sec * 1000000000ULL + time.tv_nsec;
ns_diff = ns1 - ns0;
ns0 = ns1;
print_port_stats_all(ns_diff);
}
fprintf(stderr, "main: Quitting, stopping threads\n");
for (i = 0; i < n_threads; i++)
thread_data[i].quit = 1;
for (i = 0; i < n_threads; i++) {
fprintf(stderr, "main: Joining thread %d\n", i);
pthread_join(threads[i], NULL);
}
for (i = 0; i < n_ports; i++) {
if (ports[i]) {
fprintf(stderr, "main: Cleaning up port %d\n", i);
port_free(ports[i]);
ports[i] = NULL;
}
}
if (bp) {
fprintf(stderr, "main: Freeing buffer pool\n");
bpool_free(bp);
bp = NULL;
}
remove_xdp_program();
fprintf(stderr, "main: Program exiting\n");
return 0;
}
Result---------------------------------------------------------------------------------------------------------->
main: Starting program on OrangePi R1V2
main: Kernel version: 6.6.63-ky
main: libbpf version: v1.3
main: libxdp present (xdp_multiprog__get_from_ifindex available)
main: Interface end0 driver info:
main: driver: x1_emac
main: version: 6.6.63-ky
main: firmware-version:
main: expansion-rom-version:
main: bus-info: cac80000.ethernet
main: supports-statistics: yes
main: supports-test: no
main: supports-eeprom-access: no
main: supports-register-dump: yes
main: supports-priv-flags: no
main: Interface end1 driver info:
main: driver: x1_emac
main: version: 6.6.63-ky
main: firmware-version:
main: expansion-rom-version:
main: bus-info: cac81000.ethernet
main: supports-statistics: yes
main: supports-test: no
main: supports-eeprom-access: no
main: supports-register-dump: yes
main: supports-priv-flags: no
parse_args: Parsing command-line arguments
parse_args: Processing option i, arg=end0
parse_args: Added port for interface end0
parse_args: Processing option q, arg=0
parse_args: Set queue 0 for port 0
parse_args: Processing option i, arg=end1
parse_args: Added port for interface end1
parse_args: Processing option q, arg=0
parse_args: Set queue 0 for port 1
parse_args: Processing option c, arg=0
parse_args: Added thread on CPU 0
parse_args: Parsed 2 ports and 1 threads
main: Initializing buffer pool
bpool_init: Checking memory lock limits
bpool_init: n_slabs=1, n_slabs_reserved=32, n_buffers=4096, n_buffers_reserved=131072
bpool_init: Total allocation size=1081880 bytes
bpool_init: Allocated memory at 0x3fb22ec010
bpool_init: Initialized bpool: slabs=1, slabs_reserved=32, buffers=4096
bpool_init: Initialized slabs and buffers
bpool_init: Mutex initialized
bpool_init: Attempting mmap of 16777216 bytes
bpool_init: mmap successful at 0x3fb12ec000
bpool_init: Creating UMEM with size=16777216, frame_size=4096, headroom=0
bpool_init: UMEM created successfully
main: Buffer pool created successfully
main: Initializing port 0
port_init: Initializing port for interface end0, queue 0
port_init: Allocated port structure at 0x2ade6c2520
port_init: Initializing bcache
bcache_init: Initializing buffer cache
bcache_init: Allocated bcache at 0x2ade6c2650
bcache_init: Buffer cache initialized, cons_slab=0x3fb23ec228, prod_slab=0x3fb23e4228
port_init: bcache initialized
port_init: Creating AF_XDP socket
port_init: AF_XDP socket created, fd=4
port_init: Registering UMEM: addr=273555570688, len=16777216, chunk_size=4096, headroom=0
port_init: UMEM registered
port_init: Setting RX ring size=256
port_init: RX ring set
port_init: Setting TX ring size=256
port_init: TX ring set
port_init: Binding to interface end0 (ifindex=2), queue 0, flags=10
port_init: Failed to bind AF_XDP socket: Invalid argument
port_init: Driver may not support AF_XDP or interface end0 is misconfigured
port_free: Freeing port for end0 queue 0
port_free: Closed socket fd 4
bcache_free: Freeing buffer cache
bcache_free: Buffer cache freed
port_free: Port freed
main: Port 0 initialization failed
main: Freeing buffer pool
bpool_free: Freeing buffer pool
bpool_free: Buffer pool freed
Could you please clarify:
- Does the current driver x1_emac (ethernet) support AF_XDP?
- If not, is there an alternative or patched driver that provides AF_XDP support on this hardware?
- Are there known limitations or steps to enable AF_XDP zero-copy mode for this platform?