Skip to content

Commit 21db401

Browse files
committed
Use unix sockets for control plane ipc
1 parent 7276c87 commit 21db401

File tree

5 files changed

+204
-58
lines changed

5 files changed

+204
-58
lines changed

src/tx.cpp

Lines changed: 100 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1424,11 +1424,16 @@ void local_loop_unix(int argc, char* const* argv, int optind, int rcv_buf, int l
14241424
string tmp = i > 0 ? string_format("%s-%d", unix_socket, i) : string(unix_socket);
14251425
int fd = open_unix_socket_for_rx(tmp.c_str(), rcv_buf);
14261426

1427+
IPC_MSG("%" PRIu64 "\tLISTEN_UNIX\t%s:%x\n", get_time_ms(), tmp.c_str(), i);
14271428
WFB_INFO("Listen on @%s for %s\n", tmp.c_str(), wlan);
1429+
14281430
rx_fd.push_back(fd);
14291431
wlans.push_back(string(wlan));
14301432
}
14311433

1434+
IPC_MSG("%" PRIu64 "\tLISTEN_UNIX_END\n", get_time_ms());
1435+
IPC_MSG_SEND();
1436+
14321437
if (debug_port)
14331438
{
14341439
WFB_INFO("Using %zu ports from %d for wlan emulation\n", wlans.size(), debug_port);
@@ -1521,6 +1526,69 @@ void distributor_loop(int argc, char* const* argv, int optind, int rcv_buf, int
15211526
data_source(t, rx_fd, control_fd, fec_timeout, mirror, log_interval);
15221527
}
15231528

1529+
1530+
void distributor_loop_unix(int argc, char* const* argv, int optind, int rcv_buf, int log_interval,
1531+
const char* unix_socket, int k, int n, const string &keypair, int fec_timeout,
1532+
uint64_t epoch, uint32_t channel_id, uint32_t fec_delay, bool use_qdisc, uint32_t fwmark,
1533+
radiotap_header_t &radiotap_header, uint8_t frame_type, int control_port, bool mirror,
1534+
int snd_buf_size)
1535+
{
1536+
vector<int> rx_fd;
1537+
vector<pair<string, vector<uint16_t>>> remote_hosts;
1538+
int port_idx = 0;
1539+
1540+
set<string> hosts;
1541+
1542+
for(int i = optind; i < argc; i++)
1543+
{
1544+
vector<uint16_t> remote_ports;
1545+
char *p = argv[i];
1546+
char *t = NULL;
1547+
1548+
t = strsep(&p, ":");
1549+
if (t == NULL) continue;
1550+
1551+
string remote_host = string(t);
1552+
1553+
if(hosts.count(remote_host))
1554+
{
1555+
throw runtime_error(string_format("Duplicate host %s", remote_host.c_str()));
1556+
}
1557+
1558+
hosts.insert(remote_host);
1559+
1560+
for(int j=0; (t=strsep(&p, ",")) != NULL; j++, port_idx++)
1561+
{
1562+
uint16_t remote_port = atoi(t);
1563+
1564+
string tmp = port_idx > 0 ? string_format("%s-%d", unix_socket, port_idx) : string(unix_socket);
1565+
int fd = open_unix_socket_for_rx(tmp.c_str(), rcv_buf);
1566+
1567+
uint64_t wlan_id = (uint64_t)ntohl(inet_addr(remote_host.c_str())) << 24 | j;
1568+
1569+
IPC_MSG("%" PRIu64 "\tLISTEN_UNIX\t%s:%" PRIx64 "\n", get_time_ms(), tmp.c_str(), wlan_id);
1570+
WFB_INFO("Listen on @%s for %s:%d\n", tmp.c_str(), remote_host.c_str(), remote_port);
1571+
1572+
rx_fd.push_back(fd);
1573+
remote_ports.push_back(remote_port);
1574+
}
1575+
1576+
remote_hosts.push_back(pair<string, vector<uint16_t>>(remote_host, remote_ports));
1577+
}
1578+
1579+
IPC_MSG("%" PRIu64 "\tLISTEN_UNIX_END\n", get_time_ms());
1580+
IPC_MSG_SEND();
1581+
1582+
vector<tags_item_t> tags;
1583+
unique_ptr<Transmitter> t = unique_ptr<RemoteTransmitter>(new RemoteTransmitter(k, n, keypair, epoch, channel_id, fec_delay, tags,
1584+
remote_hosts, radiotap_header, frame_type, use_qdisc,
1585+
fwmark, snd_buf_size));
1586+
1587+
int control_fd = open_control_fd(control_port);
1588+
data_source(t, rx_fd, control_fd, fec_timeout, mirror, log_interval);
1589+
}
1590+
1591+
15241592
int main(int argc, char * const *argv)
15251593
{
15261594
int opt;
@@ -1574,7 +1642,6 @@ int main(int argc, char * const *argv)
15741642
udp_port = atoi(optarg);
15751643
break;
15761644
case 'U':
1577-
tx_mode = LOCAL_UNIX;
15781645
unix_socket = optarg;
15791646
break;
15801647
case 'p':
@@ -1718,27 +1785,41 @@ int main(int argc, char * const *argv)
17181785
break;
17191786

17201787
case LOCAL:
1721-
local_loop_udp(argc, argv, optind, rcv_buf, log_interval,
1722-
udp_port, debug_port, k, n, keypair, fec_timeout,
1723-
epoch, channel_id, fec_delay, use_qdisc, fwmark,
1724-
radiotap_header, frame_type, control_port, mirror,
1725-
snd_buf);
1726-
break;
1727-
1728-
case LOCAL_UNIX:
1729-
local_loop_unix(argc, argv, optind, rcv_buf, log_interval,
1730-
unix_socket, debug_port, k, n, keypair, fec_timeout,
1731-
epoch, channel_id, fec_delay, use_qdisc, fwmark,
1732-
radiotap_header, frame_type, control_port, mirror,
1733-
snd_buf);
1788+
if (unix_socket != NULL)
1789+
{
1790+
local_loop_unix(argc, argv, optind, rcv_buf, log_interval,
1791+
unix_socket, debug_port, k, n, keypair, fec_timeout,
1792+
epoch, channel_id, fec_delay, use_qdisc, fwmark,
1793+
radiotap_header, frame_type, control_port, mirror,
1794+
snd_buf);
1795+
}
1796+
else
1797+
{
1798+
local_loop_udp(argc, argv, optind, rcv_buf, log_interval,
1799+
udp_port, debug_port, k, n, keypair, fec_timeout,
1800+
epoch, channel_id, fec_delay, use_qdisc, fwmark,
1801+
radiotap_header, frame_type, control_port, mirror,
1802+
snd_buf);
1803+
}
17341804
break;
17351805

17361806
case DISTRIBUTOR:
1737-
distributor_loop(argc, argv, optind, rcv_buf, log_interval,
1738-
udp_port, k, n, keypair, fec_timeout,
1739-
epoch, channel_id, fec_delay, use_qdisc, fwmark,
1740-
radiotap_header, frame_type, control_port, mirror,
1741-
snd_buf);
1807+
if (unix_socket != NULL)
1808+
{
1809+
distributor_loop_unix(argc, argv, optind, rcv_buf, log_interval,
1810+
unix_socket, k, n, keypair, fec_timeout,
1811+
epoch, channel_id, fec_delay, use_qdisc, fwmark,
1812+
radiotap_header, frame_type, control_port, mirror,
1813+
snd_buf);
1814+
}
1815+
else
1816+
{
1817+
distributor_loop(argc, argv, optind, rcv_buf, log_interval,
1818+
udp_port, k, n, keypair, fec_timeout,
1819+
epoch, channel_id, fec_delay, use_qdisc, fwmark,
1820+
radiotap_header, frame_type, control_port, mirror,
1821+
snd_buf);
1822+
}
17421823
break;
17431824

17441825
default:

src/tx.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ radiotap_header_t init_radiotap_header(uint8_t stbc,
6464

6565
typedef enum {
6666
LOCAL,
67-
LOCAL_UNIX,
6867
INJECTOR,
6968
DISTRIBUTOR
7069
} tx_mode_t;

wfb_ng/protocols.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,7 @@ def __init__(self, ant_stat_cb, tx_id, ports_df, control_port_df):
466466
self.ports_df = ports_df
467467
self.control_port_df = control_port_df
468468
self.ports = {}
469+
self.sockets = {}
469470
self.control_port = None
470471
self.ant = {}
471472
self.count_all = None
@@ -485,6 +486,13 @@ def lineReceived(self, line):
485486
elif cmd == 'LISTEN_UDP_END' and self.ports_df is not None:
486487
self.ports_df.callback(self.ports)
487488

489+
elif cmd == 'LISTEN_UNIX' and len(cols) == 3:
490+
unix_socket, wlan_id = cols[2].split(':', 1)
491+
self.sockets[int(wlan_id, 16)] = unix_socket
492+
493+
elif cmd == 'LISTEN_UNIX_END' and self.ports_df is not None:
494+
self.ports_df.callback(self.sockets)
495+
488496
elif cmd == 'LISTEN_UDP_CONTROL' and len(cols) == 3 and self.control_port_df is not None:
489497
port = cols[2]
490498
self.control_port = int(port)

0 commit comments

Comments
 (0)