Skip to content

Commit 209bef7

Browse files
authored
Merge pull request #27 from devilcove/feature/virt-subnet
Feature/virt subnet
2 parents 04d81df + 31c72e7 commit 209bef7

16 files changed

+464
-102
lines changed

internal/agent/broker.go

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func subcribe(ec *nats.EncodedConn) {
107107
}
108108
deleteAllNetworks()
109109
deleteAllInterfaces()
110-
if err := saveServerNetworks(resp.Networks); err != nil {
110+
if err := saveServerNetworks(self, resp.Networks); err != nil {
111111
slog.Error("save networks", "error", err)
112112
}
113113
startAllInterfaces(self)
@@ -137,13 +137,19 @@ func subcribe(ec *nats.EncodedConn) {
137137
}
138138
return
139139
}
140-
if err := resetPeersOnNetworkInterface(self, network); err != nil {
141-
slog.Error(err.Error())
142-
if err := ec.Publish(reply, plexus.MessageResponse{Message: err.Error()}); err != nil {
143-
slog.Error(err.Error())
144-
}
145-
return
140+
if err := deleteInterface(network.Interface); err != nil {
141+
slog.Error("delete interface", "iface", network.Interface, "error", err)
142+
}
143+
if err := startInterface(self, network); err != nil {
144+
slog.Error("start interface", "iface", network.Interface, "error", err)
146145
}
146+
//if err := resetPeersOnNetworkInterface(self, network); err != nil {
147+
// slog.Error(err.Error())
148+
// if err := ec.Publish(reply, plexus.MessageResponse{Message: err.Error()}); err != nil {
149+
// slog.Error(err.Error())
150+
// }
151+
// return
152+
//}
147153
if err := ec.Publish(reply, plexus.MessageResponse{Message: "interface reset"}); err != nil {
148154
slog.Error(err.Error())
149155
}
@@ -242,7 +248,6 @@ func subcribeToServerTopics(self Device) {
242248
slog.Error("join network subscription", "error", err)
243249
}
244250
subscriptions = append(subscriptions, joinNet)
245-
246251
sendListenPorts, err := serverEC.Subscribe(plexus.Update+id+plexus.SendListenPorts,
247252
func(subj, reply string, data plexus.ListenPortRequest) {
248253
slog.Info("new listen ports", "network", data.Network)
@@ -260,6 +265,48 @@ func subcribeToServerTopics(self Device) {
260265
slog.Error("send listen port subscription", "error", err)
261266
}
262267
subscriptions = append(subscriptions, sendListenPorts)
268+
addRouter, err := serverEC.Subscribe(plexus.Update+id+plexus.AddRouter,
269+
func(subj, reply string, data plexus.NetworkPeer) {
270+
if data.WGPublicKey != id {
271+
slog.Error("add router wrong id", "me", id, "router", data.WGPublicKey)
272+
return
273+
}
274+
if !data.IsSubnetRouter {
275+
return
276+
}
277+
slog.Debug("adding subnet router")
278+
if data.UseNat {
279+
if err := addNat(); err != nil {
280+
slog.Error("add nat", "error", err)
281+
}
282+
}
283+
if data.UseVirtSubnet {
284+
if err := addVirtualSubnet(data.VirtSubnet, data.Subnet); err != nil {
285+
slog.Error("add virtual subnet", "error", err)
286+
}
287+
}
288+
})
289+
if err != nil {
290+
slog.Error("add router subscription", "error", err)
291+
}
292+
subscriptions = append(subscriptions, addRouter)
293+
delRouter, err := serverEC.Subscribe(plexus.Update+id+plexus.DeleteRouter,
294+
func(subj, reply string, data plexus.NetworkPeer) {
295+
if data.WGPublicKey != id {
296+
slog.Error("add router wrong id", "me", id, "router", data.WGPublicKey)
297+
return
298+
}
299+
if err := delNat(); err != nil {
300+
slog.Error("delete nat", "error", err)
301+
}
302+
if err := delVirtualSubnet(); err != nil {
303+
slog.Error("delete virtual subnet", "error", err)
304+
}
305+
})
306+
if err != nil {
307+
slog.Error("delete router subscription", "error", err)
308+
}
309+
subscriptions = append(subscriptions, delRouter)
263310
}
264311

265312
func createRegistationConnection(key plexus.KeyValue) (*nats.EncodedConn, error) {

internal/agent/interface.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ func deleteAllInterfaces() {
4545
if err = delNat(); err != nil {
4646
slog.Error("delete NAT", "error", err)
4747
}
48+
if err = delVirtualSubnet(); err != nil {
49+
slog.Error("delete virtual subnet", "error", err)
50+
}
4851
}
4952

5053
func startAllInterfaces(self Device) {
@@ -106,6 +109,7 @@ func startInterface(self Device, network Network) error {
106109
}
107110
if port != network.ListenPort {
108111
portChanged = true
112+
network.ListenPort = port
109113
}
110114
if addressChanged {
111115
if err := boltdb.Save(self, "self", deviceTable); err != nil {
@@ -117,7 +121,7 @@ func startInterface(self Device, network Network) error {
117121
if err := boltdb.Save(network, network.Name, networkTable); err != nil {
118122
return err
119123
}
120-
go publishPeerUpdate(&self, &network)
124+
go publishListenPortUpdate(&self, &network)
121125
}
122126
config := wgtypes.Config{
123127
PrivateKey: &privKey,
@@ -216,8 +220,13 @@ func getAllowedIPs(node plexus.NetworkPeer, peers []plexus.NetworkPeer) []net.IP
216220
IP: node.Address.IP,
217221
Mask: net.CIDRMask(32, 32),
218222
})
219-
if node.IsSubNetRouter {
220-
allowed = append(allowed, node.SubNet)
223+
if node.IsSubnetRouter {
224+
if node.UseVirtSubnet {
225+
allowed = append(allowed, node.VirtSubnet)
226+
} else {
227+
allowed = append(allowed, node.Subnet)
228+
}
229+
slog.Debug("new allowed ips", "allowed", allowed, "virt", node.VirtSubnet, "subnet", node.Subnet)
221230
}
222231
if node.IsRelay {
223232
for _, peer := range peers {

internal/agent/networks.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func toAgentNetwork(in plexus.Network) Network {
3131
return out
3232
}
3333

34-
func saveServerNetworks(networks []plexus.Network) error {
34+
func saveServerNetworks(self Device, networks []plexus.Network) error {
3535
takenInterfaces := []int{}
3636
var err error
3737
for _, serverNet := range networks {
@@ -40,6 +40,9 @@ func saveServerNetworks(networks []plexus.Network) error {
4040
if err != nil {
4141
return fmt.Errorf("unable to get freeport %w", err)
4242
}
43+
if _, _, err := stunCheck(&self, &network, network.ListenPort); err != nil {
44+
return fmt.Errorf("stun check %w", err)
45+
}
4346
interfaceFound := false
4447
for i := range maxNetworks {
4548
if !slices.Contains(takenInterfaces, i) {

internal/agent/nftables.go

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package agent
22

33
import (
44
"log/slog"
5+
"net"
56

7+
"github.com/c-robinson/iplib"
68
"github.com/google/nftables"
79
"github.com/google/nftables/expr"
810
)
911

1012
func addNat() error {
13+
slog.Debug("adding NAT rule")
1114
c := &nftables.Conn{}
1215
table := c.AddTable(&nftables.Table{
1316
Name: "plexus",
@@ -58,8 +61,8 @@ func checkForNat(self Device, network Network) error {
5861
slog.Debug("checking if NAT required")
5962
for _, peer := range network.Peers {
6063
if peer.WGPublicKey == self.WGPublicKey {
61-
slog.Debug("Nat check", "subnet-router", peer.IsSubNetRouter)
62-
if !peer.IsSubNetRouter {
64+
slog.Debug("Nat check", "subnet-router", peer.IsSubnetRouter)
65+
if !peer.IsSubnetRouter {
6366
slog.Debug("nat check -- not subnetrouter")
6467
return nil
6568
}
@@ -68,6 +71,107 @@ func checkForNat(self Device, network Network) error {
6871
slog.Debug("adding NAT", "network", network.Name)
6972
return addNat()
7073
}
74+
if peer.UseVirtSubnet {
75+
slog.Debug("adding virtual subnet", "peer", peer.HostName, "virtual subnet", peer.VirtSubnet, "subnet", peer.Subnet)
76+
return addVirtualSubnet(peer.VirtSubnet, peer.Subnet)
77+
}
78+
}
79+
}
80+
return nil
81+
}
82+
83+
func addVirtualSubnet(virtual, subnet net.IPNet) error {
84+
slog.Debug("add virtual subnet", "virtual", virtual, "subnet", subnet)
85+
c := &nftables.Conn{}
86+
table := c.AddTable(&nftables.Table{
87+
Name: "plexus",
88+
Family: nftables.TableFamilyIPv4,
89+
})
90+
if err := delVirtualSubnet(); err != nil {
91+
slog.Debug("delete virtual subnet", "error", err)
92+
return err
93+
}
94+
chain := c.AddChain(&nftables.Chain{
95+
Name: "plexus-subnet",
96+
Table: table,
97+
Type: nftables.ChainTypeNAT,
98+
Hooknum: nftables.ChainHookPrerouting,
99+
Priority: nftables.ChainPriorityFilter,
100+
})
101+
ones, _ := virtual.Mask.Size()
102+
virtNet := iplib.NewNet4(virtual.IP, ones)
103+
virt := virtNet.FirstAddress()
104+
subNet := iplib.NewNet4(subnet.IP, ones)
105+
sub := subNet.FirstAddress()
106+
rule := &nftables.Rule{
107+
Table: table,
108+
Chain: chain,
109+
Exprs: []expr.Any{
110+
&expr.Payload{
111+
OperationType: expr.PayloadLoad,
112+
SourceRegister: 0,
113+
DestRegister: 1,
114+
Base: expr.PayloadBaseNetworkHeader,
115+
Offset: 0x10,
116+
Len: 0x4,
117+
},
118+
&expr.Cmp{
119+
Op: expr.CmpOpEq,
120+
Register: 1,
121+
Data: virt,
122+
},
123+
&expr.Immediate{
124+
Register: 1,
125+
Data: sub,
126+
},
127+
&expr.NAT{
128+
Type: expr.NATTypeDestNAT,
129+
Family: uint32(nftables.TableFamilyIPv4),
130+
RegAddrMin: 1,
131+
RegAddrMax: 1,
132+
RegProtoMin: 0,
133+
RegProtoMax: 0,
134+
},
135+
},
136+
}
137+
c.AddRule(rule)
138+
if err := c.Flush(); err != nil {
139+
slog.Debug("flush rules", "errror", err)
140+
return err
141+
}
142+
for {
143+
var err error
144+
virt, err = virtNet.NextIP(virt)
145+
if err != nil {
146+
break
147+
}
148+
sub, err = subNet.NextIP(sub)
149+
if err != nil {
150+
break
151+
}
152+
rule.Exprs[1].(*expr.Cmp).Data = virt
153+
rule.Exprs[2].(*expr.Immediate).Data = sub
154+
c.AddRule(rule)
155+
if err := c.Flush(); err != nil {
156+
slog.Debug("flush rules", "errror", err)
157+
return err
158+
}
159+
}
160+
return nil
161+
}
162+
163+
func delVirtualSubnet() error {
164+
slog.Debug("deleting virtual subnet")
165+
c := &nftables.Conn{}
166+
chains, err := c.ListChains()
167+
if err != nil {
168+
return nil
169+
}
170+
for _, chain := range chains {
171+
if chain.Name == "plexus-subnet" {
172+
slog.Debug("deleting plexus-subnet chain")
173+
c.DelChain(chain)
174+
return c.Flush()
71175
}
72176
}
73177
return nil

internal/agent/nftables_test.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,11 @@ func TestCheckForNat(t *testing.T) {
146146
}
147147
})
148148
t.Run("subnetWithoutNat", func(t *testing.T) {
149-
peer.IsSubNetRouter = true
149+
peer.IsSubnetRouter = true
150+
peer.Subnet = net.IPNet{
151+
IP: net.ParseIP("192.168.0.0"),
152+
Mask: net.CIDRMask(24, 32),
153+
}
150154
network.Peers = []plexus.NetworkPeer{peer}
151155
err = checkForNat(self, network)
152156
assert.Nil(t, err)
@@ -197,7 +201,45 @@ func TestCheckForNat(t *testing.T) {
197201
RegProtoMax: 0,
198202
}, rules[0].Exprs[0])
199203
})
204+
t.Run("virtual subnet", func(t *testing.T) {
205+
table := &nftables.Table{}
206+
chain := &nftables.Chain{}
207+
peer.UseNat = false
208+
peer.UseVirtSubnet = true
209+
peer.VirtSubnet = net.IPNet{
210+
IP: net.ParseIP("10.100.0.0").To4(),
211+
Mask: net.CIDRMask(24, 32),
212+
}
213+
network.Peers = []plexus.NetworkPeer{peer}
214+
t.Log(self, network)
215+
err = checkForNat(self, network)
216+
assert.Nil(t, err)
217+
tables, err := c.ListTables()
218+
assert.Nil(t, err)
219+
tableFound := false
220+
for _, t := range tables {
221+
if t.Name == "plexus" {
222+
tableFound = true
223+
table = t
224+
}
225+
}
226+
assert.True(t, tableFound)
227+
chains, err := c.ListChains()
228+
assert.Nil(t, err)
229+
chainFound := false
230+
for _, c := range chains {
231+
if c.Name == "plexus-subnet" {
232+
chainFound = true
233+
chain = c
234+
}
235+
}
236+
assert.True(t, chainFound)
237+
rules, err := c.GetRules(table, chain)
238+
assert.Nil(t, err)
239+
assert.Equal(t, 254, len(rules))
240+
})
200241
cleanNat(t, c)
242+
201243
}
202244

203245
func cleanNat(t *testing.T, c *nftables.Conn) {

internal/agent/publish.go

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,37 +27,28 @@ func publishDeviceUpdate(self *Device) {
2727
}
2828
}
2929

30-
func publishPeerUpdate(self *Device, network *Network) {
31-
slog.Info("publishing network peer update")
32-
me := getSelfFromPeers(self, network.Peers)
30+
// publish new listening ports to server
31+
func publishListenPortUpdate(self *Device, network *Network) {
32+
slog.Info("publishing listen port update")
3333
serverEC := serverConn.Load()
3434
if serverEC == nil {
3535
slog.Error("not connected to server")
3636
return
3737
}
38-
if err := serverEC.Publish(self.WGPublicKey+plexus.UpdateNetworkPeer, plexus.NetworkPeer{
39-
WGPublicKey: self.WGPublicKey,
40-
HostName: self.Name,
41-
Address: me.Address,
38+
if err := serverEC.Publish(self.WGPublicKey+plexus.UpdateListenPorts, plexus.ListenPortResponse{
4239
ListenPort: network.ListenPort,
4340
PublicListenPort: network.PublicListenPort,
44-
Endpoint: self.Endpoint,
45-
NatsConnected: true,
46-
Connectivity: me.Connectivity,
47-
IsRelay: me.IsRelay,
48-
IsRelayed: me.IsRelayed,
49-
RelayedPeers: me.RelayedPeers,
5041
},
5142
); err != nil {
5243
slog.Error("publish network peer update", "error", err)
5344
}
5445
}
5546

56-
func getSelfFromPeers(self *Device, peers []plexus.NetworkPeer) *plexus.NetworkPeer {
57-
for _, peer := range peers {
58-
if peer.WGPublicKey == self.WGPublicKey {
59-
return &peer
60-
}
61-
}
62-
return nil
63-
}
47+
//func getSelfFromPeers(self *Device, peers []plexus.NetworkPeer) *plexus.NetworkPeer {
48+
// for _, peer := range peers {
49+
// if peer.WGPublicKey == self.WGPublicKey {
50+
// return &peer
51+
// }
52+
// }
53+
// return nil
54+
//}

0 commit comments

Comments
 (0)