Skip to content

Commit 55bb4d9

Browse files
authored
Merge pull request #30 from devilcove/feature/publicEndpoints
Feature/public endpoints
2 parents 6231693 + d91104b commit 55bb4d9

File tree

14 files changed

+357
-54
lines changed

14 files changed

+357
-54
lines changed

app/plexus-agent/cmd/set.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
Copyright © 2024 Matthew R Kasun <mkasun@nusak.ca>
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package cmd
17+
18+
import (
19+
"fmt"
20+
"net"
21+
22+
"github.com/devilcove/plexus"
23+
"github.com/devilcove/plexus/internal/agent"
24+
"github.com/spf13/cobra"
25+
"github.com/vishvananda/netlink"
26+
)
27+
28+
// setCmd represents the set command
29+
var setCmd = &cobra.Command{
30+
Use: "set ip [network]",
31+
Args: cobra.RangeArgs(1, 2),
32+
Short: "set private endpoint for network",
33+
Long: `set private endpoint ip for a or all networks.`,
34+
Run: func(cmd *cobra.Command, args []string) {
35+
network := ""
36+
if len(args) > 1 {
37+
network = args[1]
38+
}
39+
fmt.Println("set called")
40+
ip := net.ParseIP(args[0])
41+
if ip == nil {
42+
fmt.Println("invalid ip")
43+
return
44+
}
45+
addr, err := netlink.AddrList(nil, netlink.FAMILY_V4)
46+
if err != nil {
47+
fmt.Println("error getting addresses", err)
48+
return
49+
}
50+
found := false
51+
for _, add := range addr {
52+
if ip.Equal(add.IP) {
53+
found = true
54+
}
55+
}
56+
if !found {
57+
fmt.Println("invalid ip")
58+
return
59+
}
60+
request := plexus.PrivateEndpoint{
61+
IP: args[0],
62+
Network: network,
63+
}
64+
resp := plexus.MessageResponse{}
65+
ec, err := agent.ConnectToAgentBroker()
66+
cobra.CheckErr(err)
67+
cobra.CheckErr(ec.Request(agent.Agent+plexus.SetPrivateEndpoint, request, &resp, agent.NatsTimeout))
68+
fmt.Println(resp.Message)
69+
},
70+
}
71+
72+
func init() {
73+
rootCmd.AddCommand(setCmd)
74+
75+
// Here you will define your flags and configuration settings.
76+
77+
// Cobra supports Persistent Flags which will work for this command
78+
// and all subcommands, e.g.:
79+
// setCmd.PersistentFlags().String("foo", "", "A help for foo")
80+
81+
// Cobra supports local flags which will only run when this command
82+
// is called directly, e.g.:
83+
// setCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
84+
}

internal/agent/broker.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"log"
77
"log/slog"
8+
"net"
89
"runtime/debug"
910
"strings"
1011

@@ -186,6 +187,53 @@ func subcribe(ec *nats.EncodedConn) {
186187
slog.Error("publish reply to version request", "error", err)
187188
}
188189
})
190+
_, _ = ec.Subscribe(Agent+plexus.SetPrivateEndpoint, func(sub, reply string, request plexus.PrivateEndpoint) {
191+
slog.Debug("set private endpoint", "endpoint", request.IP, "network", request.Network)
192+
var err error
193+
var networks []Network
194+
self, err := boltdb.Get[Device]("self", deviceTable)
195+
if err != nil {
196+
_ = ec.Publish(reply, plexus.MessageResponse{
197+
Message: "error getting device" + err.Error(),
198+
})
199+
}
200+
if request.Network == "" {
201+
networks, err = boltdb.GetAll[Network](networkTable)
202+
if err != nil {
203+
_ = ec.Publish(reply, plexus.MessageResponse{
204+
Message: "error reading networks" + err.Error(),
205+
})
206+
}
207+
} else {
208+
network, err := boltdb.Get[Network](request.Network, networkTable)
209+
if err != nil {
210+
_ = ec.Publish(reply, plexus.MessageResponse{
211+
Message: "error reading network " + err.Error(),
212+
})
213+
}
214+
networks = append(networks, network)
215+
}
216+
for _, network := range networks {
217+
for i, peer := range network.Peers {
218+
if peer.WGPublicKey == self.WGPublicKey {
219+
network.Peers[i].PrivateEndpoint = net.ParseIP(request.IP)
220+
if err := publishNetworkPeerUpdate(self, &network.Peers[i]); err != nil {
221+
_ = ec.Publish(reply, plexus.MessageResponse{
222+
Message: "error publishing update to server " + err.Error(),
223+
})
224+
}
225+
}
226+
}
227+
if err := boltdb.Save(network, network.Name, networkTable); err != nil {
228+
_ = ec.Publish(reply, plexus.MessageResponse{
229+
Message: "error saving network " + network.Name + err.Error(),
230+
})
231+
}
232+
}
233+
_ = ec.Publish(reply, plexus.MessageResponse{
234+
Message: "private endpoint added",
235+
})
236+
})
189237
}
190238

191239
func ConnectToAgentBroker() (*nats.EncodedConn, error) {

internal/agent/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const (
1818
serverCheckTime = time.Hour * 1
1919
connectivityTimeout = time.Minute * 3
2020
networkNotMapped = "network not mapped to server"
21-
version = "v0.1.0"
21+
version = "v0.2.1"
2222
networkTable = "networks"
2323
deviceTable = "devices"
2424
path = "/var/lib/plexus/"
@@ -30,7 +30,7 @@ var (
3030
subscriptions []*nats.Subscription
3131
//errors
3232
ErrNetNotMapped = errors.New("network not mapped to server")
33-
ErrConnected = errors.New("network connected")
33+
ErrNotConnected = errors.New("not connected to server")
3434
)
3535

3636
type Configuration struct {

internal/agent/daemon.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package agent
22

33
import (
4+
"bufio"
5+
"context"
6+
"fmt"
47
"log"
58
"log/slog"
9+
"net"
610
"os"
711
"os/signal"
12+
"sync"
813
"syscall"
914
"time"
1015

@@ -33,10 +38,15 @@ func Run() {
3338
startAllInterfaces(self)
3439
checkinTicker := time.NewTicker(checkinTime)
3540
serverTicker := time.NewTicker(serverCheckTime)
41+
wg := &sync.WaitGroup{}
42+
wg.Add(1)
43+
ctx, cancel := context.WithCancel(context.Background())
44+
go privateEndpointServer(ctx, wg)
3645
for {
3746
select {
3847
case <-quit:
3948
slog.Info("quit")
49+
cancel()
4050
slog.Info("deleting wg interfaces")
4151
deleteAllInterfaces()
4252
slog.Info("stopping tickers")
@@ -49,6 +59,7 @@ func Run() {
4959
slog.Info("wait for nat server shutdown to complete")
5060
ns.WaitForShutdown()
5161
slog.Info("nats server has shutdown")
62+
wg.Wait()
5263
slog.Info("exiting ...")
5364
return
5465
case <-checkinTicker.C:
@@ -151,3 +162,53 @@ func closeServerConnections() {
151162
ec.Close()
152163
}
153164
}
165+
166+
func privateEndpointServer(ctx context.Context, wg *sync.WaitGroup) {
167+
defer wg.Done()
168+
networks, err := boltdb.GetAll[Network](networkTable)
169+
if err != nil {
170+
return
171+
}
172+
self, err := boltdb.Get[Device]("self", deviceTable)
173+
if err != nil {
174+
return
175+
}
176+
for _, network := range networks {
177+
me := getSelfFromPeers(&self, network.Peers)
178+
if me.PrivateEndpoint == nil {
179+
continue
180+
}
181+
slog.Info("tcp listener staating on prviate endpoint")
182+
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", me.PrivateEndpoint, me.ListenPort))
183+
if err != nil {
184+
slog.Error("public endpoint server", "error", err)
185+
return
186+
}
187+
go func() {
188+
for {
189+
select {
190+
case <-ctx.Done():
191+
listener.Close()
192+
return
193+
default:
194+
c, err := listener.Accept()
195+
if err != nil {
196+
slog.Warn("connect error", "error", err)
197+
continue
198+
}
199+
go handleConn(c, self.WGPublicKey)
200+
}
201+
}
202+
}()
203+
}
204+
}
205+
206+
func handleConn(c net.Conn, reply string) {
207+
defer c.Close()
208+
reader := bufio.NewReader(c)
209+
_, err := reader.ReadBytes(byte('.'))
210+
if err != nil {
211+
return
212+
}
213+
_, _ = c.Write([]byte(reply))
214+
}

internal/agent/handlers.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ func networkUpdates(subject string, update plexus.NetworkUpdate) {
4141
return
4242
}
4343
}
44+
if update.Peer.PrivateEndpoint != nil {
45+
if connectToPublicEndpoint(update.Peer) {
46+
update.Peer.UsePrivateEndpoint = true
47+
}
48+
}
4449
network.Peers = append(network.Peers, update.Peer)
4550
if err := boltdb.Save(network, network.Name, networkTable); err != nil {
4651
slog.Error("update network -- add peer", "error", err)
@@ -93,6 +98,11 @@ func networkUpdates(subject string, update plexus.NetworkUpdate) {
9398
found := false
9499
for i, oldpeer := range network.Peers {
95100
if oldpeer.WGPublicKey == update.Peer.WGPublicKey {
101+
if update.Peer.PrivateEndpoint != nil {
102+
if connectToPublicEndpoint(update.Peer) {
103+
update.Peer.UsePrivateEndpoint = true
104+
}
105+
}
96106
network.Peers = slices.Replace(network.Peers, i, i+1, update.Peer)
97107
found = true
98108
break

internal/agent/interface.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package agent
22

33
import (
4+
"bufio"
45
"errors"
56
"fmt"
67
"log"
@@ -287,6 +288,15 @@ func getWGPeers(self Device, network Network) []wgtypes.PeerConfig {
287288
},
288289
PersistentKeepaliveInterval: &keepalive,
289290
}
291+
if peer.PrivateEndpoint != nil {
292+
if connectToPublicEndpoint(peer) {
293+
peer.UsePrivateEndpoint = true
294+
wgPeer.Endpoint = &net.UDPAddr{
295+
IP: peer.PrivateEndpoint,
296+
Port: peer.ListenPort,
297+
}
298+
}
299+
}
290300
peers = append(peers, wgPeer)
291301
}
292302
return peers
@@ -385,3 +395,29 @@ func convertPeerToWG(netPeer plexus.NetworkPeer, peers []plexus.NetworkPeer) (wg
385395
AllowedIPs: getAllowedIPs(netPeer, peers),
386396
}, nil
387397
}
398+
399+
func connectToPublicEndpoint(peer plexus.NetworkPeer) bool {
400+
slog.Debug("checking private endpoint", "peer", peer.HostName)
401+
endpoint := fmt.Sprintf("%s:%d", peer.PrivateEndpoint, peer.ListenPort)
402+
c, err := net.Dial("tcp", endpoint)
403+
if err != nil {
404+
slog.Debug("err dialing endpoint", "error", err)
405+
return false
406+
}
407+
defer c.Close()
408+
p := make([]byte, 1024)
409+
if _, err := c.Write([]byte("olleh.")); err != nil {
410+
slog.Debug("error writing", "error", err)
411+
return false
412+
}
413+
if _, err := bufio.NewReader(c).Read(p); err != nil {
414+
slog.Debug("error reading", "error", err)
415+
return false
416+
}
417+
if string(p[:44]) != peer.WGPublicKey {
418+
slog.Debug("bad response", "response", string(p))
419+
return false
420+
}
421+
slog.Debug("use private endpoint for", "peer", peer.HostName)
422+
return true
423+
}

internal/agent/publish.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,24 @@ func publishListenPortUpdate(self *Device, network *Network) {
4444
}
4545
}
4646

47-
//func getSelfFromPeers(self *Device, peers []plexus.NetworkPeer) *plexus.NetworkPeer {
48-
// for _, peer := range peers {
49-
// if peer.WGPublicKey == self.WGPublicKey {
50-
// return &peer
51-
// }
52-
// }
53-
// return nil
54-
//}
47+
// publish network peer update to server
48+
func publishNetworkPeerUpdate(self Device, peer *plexus.NetworkPeer) error {
49+
slog.Info("publishing network peer update")
50+
serverEC := serverConn.Load()
51+
if serverEC == nil {
52+
return ErrNotConnected
53+
}
54+
if err := serverEC.Publish(self.WGPublicKey+plexus.UpdateNetworkPeer, peer); err != nil {
55+
return err
56+
}
57+
return nil
58+
}
59+
60+
func getSelfFromPeers(self *Device, peers []plexus.NetworkPeer) *plexus.NetworkPeer {
61+
for _, peer := range peers {
62+
if peer.WGPublicKey == self.WGPublicKey {
63+
return &peer
64+
}
65+
}
66+
return nil
67+
}

internal/agent/register.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func createPeer() (*plexus.Peer, *wgtypes.Key, string, error) {
118118
WGPublicKey: pubKey.String(),
119119
PubNkey: nkey,
120120
Name: name,
121-
Version: "v0.1.0",
121+
Version: version,
122122
Endpoint: stunAddr.IP,
123123
OS: runtime.GOOS,
124124
Updated: time.Now(),

internal/server/broker.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,5 +363,18 @@ func serverSubcriptions() []*nats.Subscription {
363363
}
364364
subcriptions = append(subcriptions, deviceUpdate)
365365

366+
// network peer updates
367+
peerUpdate, err := eConn.Subscribe("*"+plexus.UpdateNetworkPeer, func(subj string, request *plexus.NetworkPeer) {
368+
if len(subj) != 44+len(plexus.UpdateNetworkPeer) {
369+
slog.Error("invalid sub", "subj", subj)
370+
return
371+
}
372+
processNetworkPeerUpdate(subj[:44], request)
373+
})
374+
if err != nil {
375+
slog.Error("subscribe peer update", "error", err)
376+
}
377+
subcriptions = append(subcriptions, peerUpdate)
378+
366379
return subcriptions
367380
}

0 commit comments

Comments
 (0)