Skip to content

AF_XDP not supported on x1_emac driver in Orangepi rv2 #254

@Dulend

Description

@Dulend

I am currently testing AF_XDP (Address Family eXpress Data Path) functionality on an Orange Pi RV2 board running a custom Linux kernel (version 6.6.63-ky, RISC-V architecture). I have verified that the kernel has both eBPF and XDP support enabled, as shown below:

--- Kernel Configuration Highlights ---
• CONFIG_XDP_SOCKETS=y
• CONFIG_XDP_SOCKETS_DIAG=m
• CONFIG_BPF=y
• CONFIG_BPF_SYSCALL=y
• CONFIG_BPF_JIT=y
• CONFIG_BPF_JIT_ALWAYS_ON=y
• CONFIG_CGROUP_BPF=y
• CONFIG_NETFILTER_BPF_LINK=y
• CONFIG_BPF_EVENTS=y

To test AF_XDP, I used a forwarding application from this GitHub repository:
https://github.com/xdp-project/bpf-examples/tree/main/AF_XDP-forwarding
However, when I attempt to bind an AF_XDP socket to the interface (e.g., end1 or end0), the bind operation fails. AF_XDP socket creation will be successful but the AF_XDP socket binding operation. Below shows the code and the output of xsk_fwd.c.

Note:- To understand the error clearly I have modified the original xsk_fwd.c for detailed debugging.

Code------------------------------------------------------------>

#define _GNU_SOURCE
#include <linux/if_ether.h>
#include <linux/if_xdp.h>
#include <sys/poll.h>
#include <err.h>
#include <pthread.h>
#include <signal.h>
#include <sched.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include <getopt.h>
#include <netinet/ether.h>
#include <net/if.h>
#include <sys/socket.h>
#include <linux/if_xdp.h>
#include <errno.h>

#undef timer_t
#undef fd_set
#undef loff_t
#undef dev_t
#undef int64_t
#undef u_int64_t
#undef blkcnt_t

#ifndef ARRAY_SIZE
#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
#endif

#include <bpf/libbpf.h>
#include <xdp/libxdp.h>
#include <xdp/xsk.h>

#ifndef XDP_FLAGS_SKB_MODE
#define XDP_FLAGS_SKB_MODE (1 << 2)
#endif

typedef __u64 u64;
typedef __u32 u32;
typedef __u16 u16;
typedef __u8 u8;

struct bpool_params {
u32 n_buffers;
u32 buffer_size;
int mmap_flags;
u32 n_users_max;
u32 n_buffers_per_slab;
};

struct bpool {
struct bpool_params params;
pthread_mutex_t lock;
void *addr;
u64 **slabs;
u64 **slabs_reserved;
u64 *buffers;
u64 *buffers_reserved;
u64 n_slabs;
u64 n_slabs_reserved;
u64 n_buffers;
u64 n_slabs_available;
u64 n_slabs_reserved_available;
struct xsk_umem_config umem_cfg;
struct xsk_ring_prod umem_fq;
struct xsk_ring_cons umem_cq;
struct xsk_umem *umem;
};

static struct bpool *
bpool_init(struct bpool_params *params, struct xsk_umem_config *umem_cfg)
{
struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY};
u64 n_slabs, n_slabs_reserved, n_buffers, n_buffers_reserved;
u64 slabs_size, slabs_reserved_size, buffers_size, buffers_reserved_size;
u64 total_size, i;
struct bpool *bp;
u8 *p;
int status;

fprintf(stderr, "bpool_init: Checking memory lock limits\n");
if (setrlimit(RLIMIT_MEMLOCK, &r)) {
    fprintf(stderr, "bpool_init: Failed to set RLIMIT_MEMLOCK: %s\n", strerror(errno));
    return NULL;
}

n_slabs = (params->n_buffers + params->n_buffers_per_slab - 1) / params->n_buffers_per_slab;
n_slabs_reserved = params->n_users_max * 2;
n_buffers = n_slabs * params->n_buffers_per_slab;
n_buffers_reserved = n_slabs_reserved * params->n_buffers_per_slab;
fprintf(stderr, "bpool_init: n_slabs=%llu, n_slabs_reserved=%llu, n_buffers=%llu, n_buffers_reserved=%llu\n",
        n_slabs, n_slabs_reserved, n_buffers, n_buffers_reserved);

slabs_size = n_slabs * sizeof(u64 *);
slabs_reserved_size = n_slabs_reserved * sizeof(u64 *);
buffers_size = n_buffers * sizeof(u64);
buffers_reserved_size = n_buffers_reserved * sizeof(u64);
total_size = sizeof(struct bpool) + slabs_size + slabs_reserved_size + buffers_size + buffers_reserved_size;
fprintf(stderr, "bpool_init: Total allocation size=%llu bytes\n", total_size);

p = calloc(total_size, sizeof(u8));
if (!p) {
    fprintf(stderr, "bpool_init: Failed to allocate %llu bytes: %s\n", total_size, strerror(errno));
    return NULL;
}
fprintf(stderr, "bpool_init: Allocated memory at %p\n", p);

bp = (struct bpool *)p;
memcpy(&bp->params, params, sizeof(*params));
bp->params.n_buffers = n_buffers;

bp->slabs = (u64 **)&p[sizeof(struct bpool)];
bp->slabs_reserved = (u64 **)&p[sizeof(struct bpool) + slabs_size];
bp->buffers = (u64 *)&p[sizeof(struct bpool) + slabs_size + slabs_reserved_size];
bp->buffers_reserved = (u64 *)&p[sizeof(struct bpool) + slabs_size + slabs_reserved_size + buffers_size];

bp->n_slabs = n_slabs;
bp->n_slabs_reserved = n_slabs_reserved;
bp->n_buffers = n_buffers;
fprintf(stderr, "bpool_init: Initialized bpool: slabs=%llu, slabs_reserved=%llu, buffers=%llu\n",
        bp->n_slabs, bp->n_slabs_reserved, bp->n_buffers);

for (i = 0; i < n_slabs; i++)
    bp->slabs[i] = &bp->buffers[i * params->n_buffers_per_slab];
bp->n_slabs_available = n_slabs;

for (i = 0; i < n_slabs_reserved; i++)
    bp->slabs_reserved[i] = &bp->buffers_reserved[i * params->n_buffers_per_slab];
bp->n_slabs_reserved_available = n_slabs_reserved;

for (i = 0; i < n_buffers; i++)
    bp->buffers[i] = i * params->buffer_size;
fprintf(stderr, "bpool_init: Initialized slabs and buffers\n");

status = pthread_mutex_init(&bp->lock, NULL);
if (status) {
    fprintf(stderr, "bpool_init: Failed to initialize mutex: %s\n", strerror(status));
    free(p);
    return NULL;
}
fprintf(stderr, "bpool_init: Mutex initialized\n");

fprintf(stderr, "bpool_init: Attempting mmap of %llu bytes\n", n_buffers * params->buffer_size);
bp->addr = mmap(NULL, n_buffers * params->buffer_size, PROT_READ | PROT_WRITE,
                MAP_PRIVATE | MAP_ANONYMOUS | params->mmap_flags, -1, 0);
if (bp->addr == MAP_FAILED) {
    fprintf(stderr, "bpool_init: mmap failed: %s\n", strerror(errno));
    pthread_mutex_destroy(&bp->lock);
    free(p);
    return NULL;
}
fprintf(stderr, "bpool_init: mmap successful at %p\n", bp->addr);

fprintf(stderr, "bpool_init: Creating UMEM with size=%u, frame_size=%u, headroom=%u\n",
        bp->params.n_buffers * bp->params.buffer_size, umem_cfg->frame_size, umem_cfg->frame_headroom);
status = xsk_umem__create(&bp->umem, bp->addr, bp->params.n_buffers * bp->params.buffer_size,
                          &bp->umem_fq, &bp->umem_cq, umem_cfg);
if (status) {
    fprintf(stderr, "bpool_init: Failed to create UMEM: %s\n", strerror(-status));
    munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
    pthread_mutex_destroy(&bp->lock);
    free(p);
    return NULL;
}
fprintf(stderr, "bpool_init: UMEM created successfully\n");
memcpy(&bp->umem_cfg, umem_cfg, sizeof(*umem_cfg));

return bp;

}

static void
bpool_free(struct bpool *bp)
{
if (!bp)
return;

fprintf(stderr, "bpool_free: Freeing buffer pool\n");
xsk_umem__delete(bp->umem);
munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
pthread_mutex_destroy(&bp->lock);
free(bp);
fprintf(stderr, "bpool_free: Buffer pool freed\n");

}

struct bcache {
struct bpool *bp;
u64 *slab_cons;
u64 *slab_prod;
u64 n_buffers_cons;
u64 n_buffers_prod;
};

static struct bcache *
bcache_init(struct bpool *bp)
{
struct bcache *bc;

fprintf(stderr, "bcache_init: Initializing buffer cache\n");
bc = calloc(1, sizeof(struct bcache));
if (!bc) {
    fprintf(stderr, "bcache_init: Failed to allocate bcache: %s\n", strerror(errno));
    return NULL;
}
fprintf(stderr, "bcache_init: Allocated bcache at %p\n", bc);

bc->bp = bp;
bc->n_buffers_cons = 0;
bc->n_buffers_prod = 0;

pthread_mutex_lock(&bp->lock);
if (bp->n_slabs_reserved_available == 0) {
    fprintf(stderr, "bcache_init: No reserved slabs available\n");
    pthread_mutex_unlock(&bp->lock);
    free(bc);
    return NULL;
}

bc->slab_cons = bp->slabs_reserved[bp->n_slabs_reserved_available - 1];
bc->slab_prod = bp->slabs_reserved[bp->n_slabs_reserved_available - 2];
bp->n_slabs_reserved_available -= 2;
pthread_mutex_unlock(&bp->lock);
fprintf(stderr, "bcache_init: Buffer cache initialized, cons_slab=%p, prod_slab=%p\n",
        bc->slab_cons, bc->slab_prod);

return bc;

}

static void
bcache_free(struct bcache *bc)
{
struct bpool *bp;

if (!bc)
    return;

fprintf(stderr, "bcache_free: Freeing buffer cache\n");
bp = bc->bp;
pthread_mutex_lock(&bp->lock);
bp->slabs_reserved[bp->n_slabs_reserved_available] = bc->slab_prod;
bp->slabs_reserved[bp->n_slabs_reserved_available + 1] = bc->slab_cons;
bp->n_slabs_reserved_available += 2;
pthread_mutex_unlock(&bp->lock);
free(bc);
fprintf(stderr, "bcache_free: Buffer cache freed\n");

}

static inline u32
bcache_cons_check(struct bcache *bc, u32 n_buffers)
{
struct bpool *bp = bc->bp;
u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
u64 n_buffers_cons = bc->n_buffers_cons;
u64 n_slabs_available;
u64 *slab_full;

if (n_buffers_cons) {
    u32 available = (n_buffers_cons < n_buffers) ? n_buffers_cons : n_buffers;
    fprintf(stderr, "bcache_cons_check: Using %u buffers from consumer slab\n", available);
    return available;
}

fprintf(stderr, "bcache_cons_check: Consumer slab empty, checking pool\n");
pthread_mutex_lock(&bp->lock);
n_slabs_available = bp->n_slabs_available;
if (!n_slabs_available) {
    fprintf(stderr, "bcache_cons_check: No slabs available in pool\n");
    pthread_mutex_unlock(&bp->lock);
    return 0;
}

n_slabs_available--;
slab_full = bp->slabs[n_slabs_available];
bp->slabs[n_slabs_available] = bc->slab_cons;
bp->n_slabs_available = n_slabs_available;
pthread_mutex_unlock(&bp->lock);

bc->slab_cons = slab_full;
bc->n_buffers_cons = n_buffers_per_slab;
fprintf(stderr, "bcache_cons_check: Swapped consumer slab, now %llu buffers available\n", bc->n_buffers_cons);
return n_buffers;

}

static inline u64
bcache_cons(struct bcache *bc)
{
u64 n_buffers_cons = bc->n_buffers_cons - 1;
u64 buffer;

buffer = bc->slab_cons[n_buffers_cons];
bc->n_buffers_cons = n_buffers_cons;
fprintf(stderr, "bcache_cons: Consumed buffer %llu, remaining %llu\n", buffer, n_buffers_cons);
return buffer;

}

static inline void
bcache_prod(struct bcache *bc, u64 buffer)
{
struct bpool *bp = bc->bp;
u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
u64 n_buffers_prod = bc->n_buffers_prod;
u64 n_slabs_available;
u64 *slab_empty;

if (n_buffers_prod < n_buffers_per_slab) {
    bc->slab_prod[n_buffers_prod] = buffer;
    bc->n_buffers_prod = n_buffers_prod + 1;
    fprintf(stderr, "bcache_prod: Stored buffer %llu, prod count=%llu\n", buffer, bc->n_buffers_prod);
    return;
}

fprintf(stderr, "bcache_prod: Producer slab full, swapping with pool\n");
pthread_mutex_lock(&bp->lock);
n_slabs_available = bp->n_slabs_available;
slab_empty = bp->slabs[n_slabs_available];
bp->slabs[n_slabs_available] = bc->slab_prod;
bp->n_slabs_available = n_slabs_available + 1;
pthread_mutex_unlock(&bp->lock);

slab_empty[0] = buffer;
bc->slab_prod = slab_empty;
bc->n_buffers_prod = 1;
fprintf(stderr, "bcache_prod: Swapped producer slab, stored buffer %llu\n", buffer);

}

#ifndef MAX_BURST_RX
#define MAX_BURST_RX 64
#endif

#ifndef MAX_BURST_TX
#define MAX_BURST_TX 64
#endif

struct burst_rx {
u64 addr[MAX_BURST_RX];
u32 len[MAX_BURST_RX];
};

struct burst_tx {
u64 addr[MAX_BURST_TX];
u32 len[MAX_BURST_TX];
u32 n_pkts;
};

struct port_params {
struct xsk_socket_config xsk_cfg;
struct bpool *bp;
const char *iface;
u32 iface_queue;
};

struct port {
struct port_params params;
struct bcache *bc;
int xsk_fd;
struct xsk_ring_cons rxq;
struct xsk_ring_prod txq;
struct xsk_ring_prod umem_fq;
struct xsk_ring_cons umem_cq;
struct xsk_socket *xsk;
int umem_fq_initialized;
u64 n_pkts_rx;
u64 n_pkts_tx;
};

static void
port_free(struct port *p)
{
if (!p) return;

fprintf(stderr, "port_free: Freeing port for %s queue %u\n", p->params.iface, p->params.iface_queue);
if (p->xsk_fd > 0) {
    close(p->xsk_fd);
    fprintf(stderr, "port_free: Closed socket fd %d\n", p->xsk_fd);
}
if (p->bc) {
    bcache_free(p->bc);
    p->bc = NULL;
}
free(p);
fprintf(stderr, "port_free: Port freed\n");

}

static struct port *
port_init(struct port_params *params)
{
struct port *p;
u32 umem_fq_size, pos = 0;
int i;

fprintf(stderr, "port_init: Initializing port for interface %s, queue %u\n",
        params->iface, params->iface_queue);

p = calloc(1, sizeof(struct port));
if (!p) {
    fprintf(stderr, "port_init: Failed to allocate port structure: %s\n", strerror(errno));
    return NULL;
}
fprintf(stderr, "port_init: Allocated port structure at %p\n", p);
memcpy(&p->params, params, sizeof(*params));
umem_fq_size = params->bp->umem_cfg.fill_size;

fprintf(stderr, "port_init: Initializing bcache\n");
p->bc = bcache_init(params->bp);
if (!p->bc) {
    fprintf(stderr, "port_init: Failed to initialize bcache\n");
    port_free(p);
    return NULL;
}
fprintf(stderr, "port_init: bcache initialized\n");

fprintf(stderr, "port_init: Creating AF_XDP socket\n");
p->xsk_fd = socket(AF_XDP, SOCK_RAW, 0);
if (p->xsk_fd < 0) {
    fprintf(stderr, "port_init: Failed to create AF_XDP socket: %s\n", strerror(errno));
    port_free(p);
    return NULL;
}
fprintf(stderr, "port_init: AF_XDP socket created, fd=%d\n", p->xsk_fd);

struct xdp_umem_reg umem_cfg = {
    .addr = (__u64)params->bp->addr,
    .len = params->bp->params.n_buffers * params->bp->params.buffer_size,
    .chunk_size = params->bp->umem_cfg.frame_size,
    .headroom = params->bp->umem_cfg.frame_headroom,
    .flags = 0
};
fprintf(stderr, "port_init: Registering UMEM: addr=%llu, len=%llu, chunk_size=%u, headroom=%u\n",
        umem_cfg.addr, umem_cfg.len, umem_cfg.chunk_size, umem_cfg.headroom);
if (setsockopt(p->xsk_fd, SOL_XDP, XDP_UMEM_REG, &umem_cfg, sizeof(umem_cfg)) < 0) {
    fprintf(stderr, "port_init: Failed to register UMEM: %s\n", strerror(errno));
    port_free(p);
    return NULL;
}
fprintf(stderr, "port_init: UMEM registered\n");

u32 rx_ring_size = 256;
u32 tx_ring_size = 256;
fprintf(stderr, "port_init: Setting RX ring size=%u\n", rx_ring_size);
if (setsockopt(p->xsk_fd, SOL_XDP, XDP_RX_RING, &rx_ring_size, sizeof(rx_ring_size)) < 0) {
    fprintf(stderr, "port_init: Failed to set RX ring: %s\n", strerror(errno));
    port_free(p);
    return NULL;
}
fprintf(stderr, "port_init: RX ring set\n");

fprintf(stderr, "port_init: Setting TX ring size=%u\n", tx_ring_size);
if (setsockopt(p->xsk_fd, SOL_XDP, XDP_TX_RING, &tx_ring_size, sizeof(tx_ring_size)) < 0) {
    fprintf(stderr, "port_init: Failed to set TX ring: %s\n", strerror(errno));
    port_free(p);
    return NULL;
}
fprintf(stderr, "port_init: TX ring set\n");

struct sockaddr_xdp sxdp = {
    .sxdp_family = AF_XDP,
    .sxdp_ifindex = if_nametoindex(params->iface),
    .sxdp_queue_id = params->iface_queue,
    .sxdp_flags = XDP_COPY | XDP_USE_NEED_WAKEUP
};
fprintf(stderr, "port_init: Binding to interface %s (ifindex=%u), queue %u, flags=%u\n",
        params->iface, sxdp.sxdp_ifindex, sxdp.sxdp_queue_id, sxdp.sxdp_flags);
if (sxdp.sxdp_ifindex == 0) {
    fprintf(stderr, "port_init: Invalid interface %s: %s\n", params->iface, strerror(errno));
    port_free(p);
    return NULL;
}
if (bind(p->xsk_fd, (struct sockaddr *)&sxdp, sizeof(sxdp)) < 0) {
    fprintf(stderr, "port_init: Failed to bind AF_XDP socket: %s\n", strerror(errno));
    fprintf(stderr, "port_init: Driver may not support AF_XDP or interface %s is misconfigured\n", params->iface);
    port_free(p);
    return NULL;
}
fprintf(stderr, "port_init: Socket bound successfully\n");

fprintf(stderr, "port_init: Initializing UMEM fill queue, size=%u\n", umem_fq_size);
xsk_ring_prod__reserve(&p->umem_fq, umem_fq_size, &pos);
for (i = 0; i < umem_fq_size; i++) {
    *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) = bcache_cons(p->bc);
}
xsk_ring_prod__submit(&p->umem_fq, umem_fq_size);
p->umem_fq_initialized = 1;
fprintf(stderr, "port_init: UMEM fill queue initialized\n");

fprintf(stderr, "port_init: Successfully initialized port for %s queue %d\n",
        params->iface, params->iface_queue);
return p;

}

static inline u32
port_rx_burst(struct port *p, struct burst_rx *b)
{
u32 n_pkts, pos, i;

n_pkts = ARRAY_SIZE(b->addr);
fprintf(stderr, "port_rx_burst: Attempting to receive up to %u packets\n", n_pkts);

n_pkts = bcache_cons_check(p->bc, n_pkts);
if (!n_pkts) {
    fprintf(stderr, "port_rx_burst: No buffers available\n");
    return 0;
}

n_pkts = xsk_ring_cons__peek(&p->rxq, n_pkts, &pos);
if (!n_pkts) {
    if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
        struct pollfd pollfd = {
            .fd = xsk_socket__fd(p->xsk),
            .events = POLLIN,
        };
        fprintf(stderr, "port_rx_burst: No packets, polling fd %d\n", pollfd.fd);
        poll(&pollfd, 1, 0);
    }
    return 0;
}
fprintf(stderr, "port_rx_burst: Received %u packets\n", n_pkts);

for (i = 0; i < n_pkts; i++) {
    b->addr[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->addr;
    b->len[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->len;
}

xsk_ring_cons__release(&p->rxq, n_pkts);
p->n_pkts_rx += n_pkts;

for (;;) {
    int status;
    status = xsk_ring_prod__reserve(&p->umem_fq, n_pkts, &pos);
    if (status == n_pkts)
        break;
    if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
        struct pollfd pollfd = {
            .fd = xsk_socket__fd(p->xsk),
            .events = POLLIN,
        };
        fprintf(stderr, "port_rx_burst: Reserving UMEM FQ, polling fd %d\n", pollfd.fd);
        poll(&pollfd, 1, 0);
    }
}

for (i = 0; i < n_pkts; i++)
    *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) = bcache_cons(p->bc);

xsk_ring_prod__submit(&p->umem_fq, n_pkts);
fprintf(stderr, "port_rx_burst: Submitted %u buffers to UMEM FQ\n", n_pkts);

return n_pkts;

}

static inline void
port_tx_burst(struct port *p, struct burst_tx *b)
{
u32 n_pkts, pos, i;
int status;

n_pkts = p->params.bp->umem_cfg.comp_size;
fprintf(stderr, "port_tx_burst: Checking UMEM CQ for %u completions\n", n_pkts);

n_pkts = xsk_ring_cons__peek(&p->umem_cq, n_pkts, &pos);
for (i = 0; i < n_pkts; i++) {
    u64 addr = *xsk_ring_cons__comp_addr(&p->umem_cq, pos + i);
    bcache_prod(p->bc, addr);
}
xsk_ring_cons__release(&p->umem_cq, n_pkts);
fprintf(stderr, "port_tx_burst: Processed %u completions\n", n_pkts);

n_pkts = b->n_pkts;
fprintf(stderr, "port_tx_burst: Transmitting %u packets\n", n_pkts);

for (;;) {
    status = xsk_ring_prod__reserve(&p->txq, n_pkts, &pos);
    if (status == n_pkts)
        break;
    if (xsk_ring_prod__needs_wakeup(&p->txq)) {
        fprintf(stderr, "port_tx_burst: TXQ needs wakeup, sending on fd %d\n", xsk_socket__fd(p->xsk));
        sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0);
    }
}

for (i = 0; i < n_pkts; i++) {
    xsk_ring_prod__tx_desc(&p->txq, pos + i)->addr = b->addr[i];
    xsk_ring_prod__tx_desc(&p->txq, pos + i)->len = b->len[i];
}

xsk_ring_prod__submit(&p->txq, n_pkts);
if (xsk_ring_prod__needs_wakeup(&p->txq)) {
    fprintf(stderr, "port_tx_burst: TXQ submit needs wakeup, sending on fd %d\n", xsk_socket__fd(p->xsk));
    sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0);
}
p->n_pkts_tx += n_pkts;
fprintf(stderr, "port_tx_burst: Transmitted %u packets\n", n_pkts);

}

#ifndef MAX_PORTS_PER_THREAD
#define MAX_PORTS_PER_THREAD 16
#endif

struct thread_data {
struct port *ports_rx[MAX_PORTS_PER_THREAD];
struct port *ports_tx[MAX_PORTS_PER_THREAD];
u32 n_ports_rx;
struct burst_rx burst_rx;
struct burst_tx burst_tx[MAX_PORTS_PER_THREAD];
u32 cpu_core_id;
int quit;
};

static void
swap_mac_addresses(void *data)
{
struct ether_header *eth = (struct ether_header *)data;
struct ether_addr *src_addr = (struct ether_addr *)&eth->ether_shost;
struct ether_addr *dst_addr = (struct ether_addr *)&eth->ether_dhost;
struct ether_addr tmp;

tmp = *src_addr;
*src_addr = *dst_addr;
*dst_addr = tmp;

}

static void *
thread_func(void *arg)
{
struct thread_data *t = arg;
cpu_set_t cpu_cores;
u32 i;

fprintf(stderr, "thread_func: Starting thread on CPU core %u\n", t->cpu_core_id);
CPU_ZERO(&cpu_cores);
CPU_SET(t->cpu_core_id, &cpu_cores);
if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_cores)) {
    fprintf(stderr, "thread_func: Failed to set thread affinity to CPU %u: %s\n",
            t->cpu_core_id, strerror(errno));
} else {
    fprintf(stderr, "thread_func: Thread affinity set to CPU %u\n", t->cpu_core_id);
}

for (i = 0; !t->quit; i = (i + 1) & (t->n_ports_rx - 1)) {
    struct port *port_rx = t->ports_rx[i];
    struct port *port_tx = t->ports_tx[i];
    struct burst_rx *brx = &t->burst_rx;
    struct burst_tx *btx = &t->burst_tx[i];
    u32 n_pkts, j;

    n_pkts = port_rx_burst(port_rx, brx);
    if (!n_pkts)
        continue;

    fprintf(stderr, "thread_func: Processing %u packets from %s queue %u\n",
            n_pkts, port_rx->params.iface, port_rx->params.iface_queue);

    for (j = 0; j < n_pkts; j++) {
        u64 addr = xsk_umem__add_offset_to_addr(brx->addr[j]);
        u8 *pkt = xsk_umem__get_data(port_rx->params.bp->addr, addr);
        swap_mac_addresses(pkt);
        btx->addr[btx->n_pkts] = brx->addr[j];
        btx->len[btx->n_pkts] = brx->len[j];
        btx->n_pkts++;
        if (btx->n_pkts == MAX_BURST_TX) {
            port_tx_burst(port_tx, btx);
            btx->n_pkts = 0;
        }
    }
}

fprintf(stderr, "thread_func: Thread on CPU %u exiting\n", t->cpu_core_id);
return NULL;

}

static const struct bpool_params bpool_params_default = {
.n_buffers = 4 * 1024,
.buffer_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
.mmap_flags = 0,
.n_users_max = 16,
.n_buffers_per_slab = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
};

static const struct xsk_umem_config umem_cfg_default = {
.fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
.comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM,
.flags = 0,
};

static const struct port_params port_params_default = {
.xsk_cfg = {
.rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
.tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
.libxdp_flags = 0,
.xdp_flags = XDP_FLAGS_SKB_MODE,
.bind_flags = XDP_USE_NEED_WAKEUP,
},
.bp = NULL,
.iface = NULL,
.iface_queue = 0,
};

#ifndef MAX_PORTS
#define MAX_PORTS 64
#endif

#ifndef MAX_THREADS
#define MAX_THREADS 64
#endif

static struct bpool_params bpool_params;
static struct xsk_umem_config umem_cfg;
static struct bpool *bp;

static struct port_params port_params[MAX_PORTS];
static struct port *ports[MAX_PORTS];
static u64 n_pkts_rx[MAX_PORTS];
static u64 n_pkts_tx[MAX_PORTS];
static int n_ports;

static pthread_t threads[MAX_THREADS];
static struct thread_data thread_data[MAX_THREADS];
static int n_threads;

static void
print_usage(char *prog_name)
{
const char *usage =
"Usage:\n"
"\t%s [ -b SIZE ] -c CORE -i INTERFACE [ -q QUEUE ]\n"
"\n"
"-c CORE CPU core to run a packet forwarding thread\n"
" on. May be invoked multiple times.\n"
"\n"
"-b SIZE Number of buffers in the buffer pool shared\n"
" by all the forwarding threads. Default: %u.\n"
"\n"
"-i INTERFACE Network interface. Each (INTERFACE, QUEUE)\n"
" pair specifies one forwarding port. May be\n"
" invoked multiple times.\n"
"\n"
"-q QUEUE Network interface queue for RX and TX. Each\n"
" (INTERFACE, QUEUE) pair specified one\n"
" forwarding port. Default: %u. May be invoked\n"
" multiple times.\n"
"\n";
printf(usage, prog_name, bpool_params_default.n_buffers, port_params_default.iface_queue);
}

static int
parse_args(int argc, char **argv)
{
struct option lgopts[] = {{NULL, 0, 0, 0}};
int opt, option_index;

fprintf(stderr, "parse_args: Parsing command-line arguments\n");
for (;;) {
    opt = getopt_long(argc, argv, "c:i:q:", lgopts, &option_index);
    if (opt == EOF)
        break;
    fprintf(stderr, "parse_args: Processing option %c, arg=%s\n", opt, optarg);
    switch (opt) {
    case 'b':
        bpool_params.n_buffers = atoi(optarg);
        fprintf(stderr, "parse_args: Set buffer pool size to %u\n", bpool_params.n_buffers);
        break;
    case 'c':
        if (n_threads == MAX_THREADS) {
            fprintf(stderr, "parse_args: Max threads (%d) reached\n", MAX_THREADS);
            return -1;
        }
        thread_data[n_threads].cpu_core_id = atoi(optarg);
        fprintf(stderr, "parse_args: Added thread on CPU %u\n", thread_data[n_threads].cpu_core_id);
        n_threads++;
        break;
    case 'i':
        if (n_ports == MAX_PORTS) {
            fprintf(stderr, "parse_args: Max ports (%d) reached\n", MAX_PORTS);
            return -1;
        }
        port_params[n_ports].iface = optarg;
        port_params[n_ports].iface_queue = 0;
        fprintf(stderr, "parse_args: Added port for interface %s\n", optarg);
        n_ports++;
        break;
    case 'q':
        if (n_ports == 0) {
            fprintf(stderr, "parse_args: No port specified for queue\n");
            return -1;
        }
        port_params[n_ports - 1].iface_queue = atoi(optarg);
        fprintf(stderr, "parse_args: Set queue %u for port %d\n", port_params[n_ports - 1].iface_queue, n_ports - 1);
        break;
    default:
        fprintf(stderr, "parse_args: Illegal argument %c\n", opt);
        return -1;
    }
}

optind = 1;
if (!n_ports) {
    fprintf(stderr, "parse_args: No ports specified\n");
    return -1;
}
if (!n_threads) {
    fprintf(stderr, "parse_args: No threads specified\n");
    return -1;
}
if (n_ports % n_threads) {
    fprintf(stderr, "parse_args: Ports (%d) cannot be evenly distributed to threads (%d)\n", n_ports, n_threads);
    return -1;
}
fprintf(stderr, "parse_args: Parsed %d ports and %d threads\n", n_ports, n_threads);
return 0;

}

static void
print_port(u32 port_id)
{
struct port *port = ports[port_id];
printf("Port %u: interface = %s, queue = %u\n",
port_id, port->params.iface, port->params.iface_queue);
}

static void
print_thread(u32 thread_id)
{
struct thread_data *t = &thread_data[thread_id];
u32 i;
printf("Thread %u (CPU core %u): ", thread_id, t->cpu_core_id);
for (i = 0; i < t->n_ports_rx; i++) {
struct port *port_rx = t->ports_rx[i];
struct port *port_tx = t->ports_tx[i];
printf("(%s, %u) -> (%s, %u), ",
port_rx->params.iface, port_rx->params.iface_queue,
port_tx->params.iface, port_tx->params.iface_queue);
}
printf("\n");
}

static void
print_port_stats_separator(void)
{
printf("+-%4s-+-%12s-+-%13s-+-%12s-+-%13s-+\n",
"----", "------------", "-------------", "------------", "-------------");
}

static void
print_port_stats_header(void)
{
print_port_stats_separator();
printf("| %4s | %12s | %13s | %12s | %13s |\n",
"Port", "RX packets", "RX rate (pps)", "TX packets", "TX_rate (pps)");
print_port_stats_separator();
}

static void
print_port_stats_trailer(void)
{
print_port_stats_separator();
printf("\n");
}

static void
print_port_stats(int port_id, u64 ns_diff)
{
struct port *p = ports[port_id];
double rx_pps, tx_pps;

rx_pps = (p->n_pkts_rx - n_pkts_rx[port_id]) * 1000000000. / ns_diff;
tx_pps = (p->n_pkts_tx - n_pkts_tx[port_id]) * 1000000000. / ns_diff;

printf("| %4d | %12llu | %13.0f | %12llu | %13.0f |\n",
       port_id, p->n_pkts_rx, rx_pps, p->n_pkts_tx, tx_pps);

n_pkts_rx[port_id] = p->n_pkts_rx;
n_pkts_tx[port_id] = p->n_pkts_tx;

}

static void
print_port_stats_all(u64 ns_diff)
{
int i;
print_port_stats_header();
for (i = 0; i < n_ports; i++)
print_port_stats(i, ns_diff);
print_port_stats_trailer();
}

static int quit;

static void
signal_handler(int sig)
{
fprintf(stderr, "signal_handler: Received signal %d, quitting\n", sig);
quit = 1;
}

static void
remove_xdp_program(void)
{
struct xdp_multiprog *mp;
int i, err;

fprintf(stderr, "remove_xdp_program: Detaching XDP programs\n");
for (i = 0; i < n_ports; i++) {
    fprintf(stderr, "remove_xdp_program: Checking interface %s\n", port_params[i].iface);
    mp = xdp_multiprog__get_from_ifindex(if_nametoindex(port_params[i].iface));
    if (libxdp_get_error(mp)) {
        fprintf(stderr, "remove_xdp_program: No XDP program on interface %s\n", port_params[i].iface);
        continue;
    }
    err = xdp_multiprog__detach(mp);
    if (err)
        fprintf(stderr, "remove_xdp_program: Failed to detach XDP program on %s: %s\n",
                port_params[i].iface, strerror(-err));
    else
        fprintf(stderr, "remove_xdp_program: Detached XDP program on %s\n", port_params[i].iface);
}

}

int main(int argc, char **argv)
{
struct timespec time;
u64 ns0;
int i;

fprintf(stderr, "main: Starting program on OrangePi R1V2\n");
FILE *fp = popen("uname -r", "r");
if (fp) {
    char kernel_version[256];
    if (fgets(kernel_version, sizeof(kernel_version), fp) != NULL) {
        fprintf(stderr, "main: Kernel version: %s", kernel_version);
    }
    pclose(fp);
} else {
    fprintf(stderr, "main: Failed to get kernel version: %s\n", strerror(errno));
}

fprintf(stderr, "main: libbpf version: %s\n", libbpf_version_string());
fprintf(stderr, "main: libxdp present (xdp_multiprog__get_from_ifindex available)\n");

for (i = 0; i < argc; i++) {
    if (strncmp(argv[i], "-i", 2) == 0 && i + 1 < argc) {
        char cmd[256];
        snprintf(cmd, sizeof(cmd), "ethtool -i %s", argv[i + 1]);
        fp = popen(cmd, "r");
        if (fp) {
            char line[256];
            fprintf(stderr, "main: Interface %s driver info:\n", argv[i + 1]);
            while (fgets(line, sizeof(line), fp) != NULL) {
                fprintf(stderr, "main: %s", line);
            }
            pclose(fp);
        } else {
            fprintf(stderr, "main: Failed to get driver info for %s: %s\n", argv[i + 1], strerror(errno));
        }
    }
}

for (i = 0; i < MAX_PORTS; i++)
    ports[i] = NULL;
bp = NULL;

memcpy(&bpool_params, &bpool_params_default, sizeof(struct bpool_params));
memcpy(&umem_cfg, &umem_cfg_default, sizeof(struct xsk_umem_config));
for (i = 0; i < MAX_PORTS; i++)
    memcpy(&port_params[i], &port_params_default, sizeof(struct port_params));

if (parse_args(argc, argv)) {
    fprintf(stderr, "main: Argument parsing failed\n");
    print_usage(argv[0]);
    return 1;
}

fprintf(stderr, "main: Initializing buffer pool\n");
bp = bpool_init(&bpool_params, &umem_cfg);
if (!bp) {
    fprintf(stderr, "main: Buffer pool initialization failed\n");
    return 1;
}
fprintf(stderr, "main: Buffer pool created successfully\n");

for (i = 0; i < MAX_PORTS; i++)
    port_params[i].bp = bp;

for (i = 0; i < n_ports; i++) {
    fprintf(stderr, "main: Initializing port %d\n", i);
    ports[i] = port_init(&port_params[i]);
    if (!ports[i]) {
        fprintf(stderr, "main: Port %d initialization failed\n", i);
        for (int j = 0; j < i; j++) {
            if (ports[j]) {
                fprintf(stderr, "main: Cleaning up port %d\n", j);
                port_free(ports[j]);
                ports[j] = NULL;
            }
        }
        if (bp) {
            fprintf(stderr, "main: Freeing buffer pool\n");
            bpool_free(bp);
            bp = NULL;
        }
        return 1;
    }
    print_port(i);
}
fprintf(stderr, "main: All ports created successfully\n");

for (i = 0; i < n_threads; i++) {
    struct thread_data *t = &thread_data[i];
    u32 n_ports_per_thread = n_ports / n_threads, j;

    for (j = 0; j < n_ports_per_thread; j++) {
        t->ports_rx[j] = ports[i * n_ports_per_thread + j];
        t->ports_tx[j] = ports[i * n_ports_per_thread + (j + 1) % n_ports_per_thread];
    }
    t->n_ports_rx = n_ports_per_thread;
    print_thread(i);
}

for (i = 0; i < n_threads; i++) {
    int status;
    fprintf(stderr, "main: Creating thread %d\n", i);
    status = pthread_create(&threads[i], NULL, thread_func, &thread_data[i]);
    if (status) {
        fprintf(stderr, "main: Thread %d creation failed: %s\n", i, strerror(status));
        for (int j = 0; j < n_ports; j++) {
            if (ports[j]) {
                fprintf(stderr, "main: Cleaning up port %d\n", j);
                port_free(ports[j]);
                ports[j] = NULL;
            }
        }
        if (bp) {
            fprintf(stderr, "main: Freeing buffer pool\n");
            bpool_free(bp);
            bp = NULL;
        }
        return 1;
    }
}
fprintf(stderr, "main: All threads created successfully\n");

signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
signal(SIGABRT, signal_handler);

clock_gettime(CLOCK_MONOTONIC, &time);
ns0 = time.tv_sec * 1000000000ULL + time.tv_nsec;
for (; !quit;) {
    u64 ns1, ns_diff;
    sleep(1);
    clock_gettime(CLOCK_MONOTONIC, &time);
    ns1 = time.tv_sec * 1000000000ULL + time.tv_nsec;
    ns_diff = ns1 - ns0;
    ns0 = ns1;
    print_port_stats_all(ns_diff);
}

fprintf(stderr, "main: Quitting, stopping threads\n");
for (i = 0; i < n_threads; i++)
    thread_data[i].quit = 1;

for (i = 0; i < n_threads; i++) {
    fprintf(stderr, "main: Joining thread %d\n", i);
    pthread_join(threads[i], NULL);
}

for (i = 0; i < n_ports; i++) {
    if (ports[i]) {
        fprintf(stderr, "main: Cleaning up port %d\n", i);
        port_free(ports[i]);
        ports[i] = NULL;
    }
}

if (bp) {
    fprintf(stderr, "main: Freeing buffer pool\n");
    bpool_free(bp);
    bp = NULL;
}

remove_xdp_program();
fprintf(stderr, "main: Program exiting\n");
return 0;

}

Result---------------------------------------------------------------------------------------------------------->

main: Starting program on OrangePi R1V2
main: Kernel version: 6.6.63-ky
main: libbpf version: v1.3
main: libxdp present (xdp_multiprog__get_from_ifindex available)
main: Interface end0 driver info:
main: driver: x1_emac
main: version: 6.6.63-ky
main: firmware-version:
main: expansion-rom-version:
main: bus-info: cac80000.ethernet
main: supports-statistics: yes
main: supports-test: no
main: supports-eeprom-access: no
main: supports-register-dump: yes
main: supports-priv-flags: no
main: Interface end1 driver info:
main: driver: x1_emac
main: version: 6.6.63-ky
main: firmware-version:
main: expansion-rom-version:
main: bus-info: cac81000.ethernet
main: supports-statistics: yes
main: supports-test: no
main: supports-eeprom-access: no
main: supports-register-dump: yes
main: supports-priv-flags: no
parse_args: Parsing command-line arguments
parse_args: Processing option i, arg=end0
parse_args: Added port for interface end0
parse_args: Processing option q, arg=0
parse_args: Set queue 0 for port 0
parse_args: Processing option i, arg=end1
parse_args: Added port for interface end1
parse_args: Processing option q, arg=0
parse_args: Set queue 0 for port 1
parse_args: Processing option c, arg=0
parse_args: Added thread on CPU 0
parse_args: Parsed 2 ports and 1 threads
main: Initializing buffer pool
bpool_init: Checking memory lock limits
bpool_init: n_slabs=1, n_slabs_reserved=32, n_buffers=4096, n_buffers_reserved=131072
bpool_init: Total allocation size=1081880 bytes
bpool_init: Allocated memory at 0x3fb22ec010
bpool_init: Initialized bpool: slabs=1, slabs_reserved=32, buffers=4096
bpool_init: Initialized slabs and buffers
bpool_init: Mutex initialized
bpool_init: Attempting mmap of 16777216 bytes
bpool_init: mmap successful at 0x3fb12ec000
bpool_init: Creating UMEM with size=16777216, frame_size=4096, headroom=0
bpool_init: UMEM created successfully
main: Buffer pool created successfully
main: Initializing port 0
port_init: Initializing port for interface end0, queue 0
port_init: Allocated port structure at 0x2ade6c2520
port_init: Initializing bcache
bcache_init: Initializing buffer cache
bcache_init: Allocated bcache at 0x2ade6c2650
bcache_init: Buffer cache initialized, cons_slab=0x3fb23ec228, prod_slab=0x3fb23e4228
port_init: bcache initialized
port_init: Creating AF_XDP socket
port_init: AF_XDP socket created, fd=4
port_init: Registering UMEM: addr=273555570688, len=16777216, chunk_size=4096, headroom=0
port_init: UMEM registered
port_init: Setting RX ring size=256
port_init: RX ring set
port_init: Setting TX ring size=256
port_init: TX ring set
port_init: Binding to interface end0 (ifindex=2), queue 0, flags=10
port_init: Failed to bind AF_XDP socket: Invalid argument
port_init: Driver may not support AF_XDP or interface end0 is misconfigured
port_free: Freeing port for end0 queue 0
port_free: Closed socket fd 4
bcache_free: Freeing buffer cache
bcache_free: Buffer cache freed
port_free: Port freed
main: Port 0 initialization failed
main: Freeing buffer pool
bpool_free: Freeing buffer pool
bpool_free: Buffer pool freed

Could you please clarify:

  1. Does the current driver x1_emac (ethernet) support AF_XDP?
  2. If not, is there an alternative or patched driver that provides AF_XDP support on this hardware?
  3. Are there known limitations or steps to enable AF_XDP zero-copy mode for this platform?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions