Skip to content

Commit da96f78

Browse files
committed
Changes:
- #71 EdrData section in events - #70 API endpoint /endpoint/artifacts - #69 Implement API endpoint used to stream events
1 parent 9cdc2f7 commit da96f78

File tree

11 files changed

+433
-187
lines changed

11 files changed

+433
-187
lines changed

api/api_client.go

Lines changed: 62 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ type ClientConfig struct {
3232
ServerFingerprint string `toml:"server-fingerprint" comment:"Configure manager certificate pinning\n Put here the manager's certificate fingerprint"`
3333
Unsafe bool `toml:"unsafe" comment:"Allow unsafe HTTPS connection"`
3434
MaxUploadSize int64 `toml:"max-upload-size" comment:"Maximum allowed upload size"`
35+
36+
localAddr string
3537
}
3638

3739
// ManagerIP returns the IP address of the manager if any, returns nil otherwise
@@ -47,40 +49,70 @@ func (cc *ClientConfig) ManagerIP() net.IP {
4749
return nil
4850
}
4951

52+
func (cc *ClientConfig) DialContext(ctx context.Context, network, addr string) (con net.Conn, err error) {
53+
log.Infof("Dial")
54+
dialer := net.Dialer{
55+
Timeout: 30 * time.Second,
56+
KeepAlive: 30 * time.Second,
57+
DualStack: true,
58+
}
59+
con, err = dialer.DialContext(ctx, network, addr)
60+
61+
if err == nil && con != nil {
62+
if addr, ok := con.LocalAddr().(*net.TCPAddr); ok {
63+
cc.localAddr = addr.IP.String()
64+
}
65+
}
66+
67+
return
68+
}
69+
70+
func (cc *ClientConfig) DialTLSContext(ctx context.Context, network, addr string) (net.Conn, error) {
71+
72+
log.Infof("Dial TLS")
73+
c, err := tls.Dial(network, addr, &tls.Config{InsecureSkipVerify: cc.Unsafe})
74+
75+
if err != nil {
76+
return c, err
77+
}
78+
79+
if c != nil {
80+
if addr, ok := c.LocalAddr().(*net.TCPAddr); ok {
81+
cc.localAddr = addr.IP.String()
82+
}
83+
}
84+
85+
if cc.ServerFingerprint == "" {
86+
return c, err
87+
}
88+
89+
connstate := c.ConnectionState()
90+
for _, peercert := range connstate.PeerCertificates {
91+
der, err := x509.MarshalPKIXPublicKey(peercert.PublicKey)
92+
hash := data.Sha256(der)
93+
if err != nil {
94+
return c, err
95+
}
96+
97+
if hash == cc.ServerFingerprint {
98+
return c, err
99+
}
100+
}
101+
return c, fmt.Errorf("server fingerprint not verified")
102+
}
103+
50104
// Transport creates an approriate HTTP transport from a configuration
51105
// Cert pinning inspired by: https://medium.com/@zmanian/server-public-key-pinning-in-go-7a57bbe39438
52106
func (cc *ClientConfig) Transport() http.RoundTripper {
53107
return &http.Transport{
54108
Proxy: nil,
55-
DialContext: (&net.Dialer{
109+
/*DialContext: (&net.Dialer{
56110
Timeout: 30 * time.Second,
57111
KeepAlive: 30 * time.Second,
58112
DualStack: true,
59-
}).DialContext,
60-
DialTLSContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
61-
c, err := tls.Dial(network, addr, &tls.Config{InsecureSkipVerify: cc.Unsafe})
62-
63-
if err != nil {
64-
return c, err
65-
}
66-
67-
if cc.ServerFingerprint == "" {
68-
return c, err
69-
}
70-
connstate := c.ConnectionState()
71-
for _, peercert := range connstate.PeerCertificates {
72-
der, err := x509.MarshalPKIXPublicKey(peercert.PublicKey)
73-
hash := data.Sha256(der)
74-
if err != nil {
75-
return c, err
76-
}
77-
78-
if hash == cc.ServerFingerprint {
79-
return c, err
80-
}
81-
}
82-
return c, fmt.Errorf("server fingerprint not verified")
83-
},
113+
}).DialContext,*/
114+
DialContext: cc.DialContext,
115+
DialTLSContext: cc.DialTLSContext,
84116
MaxIdleConns: 100,
85117
IdleConnTimeout: 90 * time.Second,
86118
TLSHandshakeTimeout: 10 * time.Second,
@@ -90,7 +122,7 @@ func (cc *ClientConfig) Transport() http.RoundTripper {
90122

91123
// ManagerClient structure definition
92124
type ManagerClient struct {
93-
config ClientConfig
125+
config *ClientConfig
94126
ManagerIP net.IP
95127

96128
HTTPClient http.Client
@@ -124,7 +156,7 @@ func NewManagerClient(c *ClientConfig) (*ManagerClient, error) {
124156

125157
mc := &ManagerClient{
126158
HTTPClient: http.Client{Transport: tpt},
127-
config: *c,
159+
config: c,
128160
ManagerIP: c.ManagerIP(),
129161
}
130162

@@ -158,6 +190,8 @@ func (m *ManagerClient) Prepare(method, url string, body io.Reader) (*http.Reque
158190
if err == nil {
159191
r.Header.Add("User-Agent", UserAgent)
160192
r.Header.Add("Hostname", Hostname)
193+
// the address used by the client to connect to the manager
194+
r.Header.Add("IP", m.config.localAddr)
161195
r.Header.Add("UUID", m.config.UUID)
162196
r.Header.Add("Api-Key", m.config.Key)
163197
}

api/log_streamer.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package api
2+
3+
import (
4+
"math/rand"
5+
"sync"
6+
"time"
7+
8+
"github.com/0xrawsec/golang-evtx/evtx"
9+
"github.com/0xrawsec/golang-utils/datastructs"
10+
)
11+
12+
type LogStream struct {
13+
closed bool
14+
S chan evtx.GoEvtxMap
15+
}
16+
17+
func (s *LogStream) Stream(e evtx.GoEvtxMap) bool {
18+
for {
19+
if s.closed {
20+
close(s.S)
21+
return false
22+
}
23+
select {
24+
case s.S <- e:
25+
return true
26+
default:
27+
time.Sleep(time.Millisecond * 10)
28+
}
29+
}
30+
}
31+
32+
func (s *LogStream) Close() {
33+
s.closed = true
34+
}
35+
36+
type EventStreamer struct {
37+
sync.RWMutex
38+
queue datastructs.Fifo
39+
streams map[int]*LogStream
40+
}
41+
42+
func NewEventStreamer() *EventStreamer {
43+
return &EventStreamer{
44+
queue: datastructs.Fifo{},
45+
streams: map[int]*LogStream{},
46+
}
47+
}
48+
49+
func (s *EventStreamer) NewStream() *LogStream {
50+
s.Lock()
51+
defer s.Unlock()
52+
ls := &LogStream{S: make(chan evtx.GoEvtxMap)}
53+
s.streams[s.newId()] = ls
54+
return ls
55+
}
56+
57+
func (s *EventStreamer) newId() int {
58+
var id int
59+
for {
60+
id = rand.Int()
61+
if _, ok := s.streams[id]; !ok {
62+
return id
63+
}
64+
}
65+
}
66+
67+
func (s *EventStreamer) Queue(e evtx.GoEvtxMap) {
68+
s.Lock()
69+
defer s.Unlock()
70+
// we queue only if there is at least a stream open
71+
if len(s.streams) > 0 {
72+
s.queue.Push(e)
73+
}
74+
}
75+
76+
func (s *EventStreamer) Stream() {
77+
go func() {
78+
for {
79+
if i := s.queue.Pop(); i != nil {
80+
e := i.Value.(evtx.GoEvtxMap)
81+
for id, stream := range s.streams {
82+
if ok := stream.Stream(e); !ok {
83+
s.delStream(id)
84+
}
85+
}
86+
} else {
87+
// we sleep only if there is nothing to stream
88+
// to minimize delay
89+
time.Sleep(time.Millisecond * 50)
90+
}
91+
}
92+
}()
93+
}
94+
95+
func (s *EventStreamer) delStream(id int) {
96+
s.Lock()
97+
defer s.Unlock()
98+
delete(s.streams, id)
99+
}
100+
101+
func (s *EventStreamer) Close() {
102+
103+
}

api/manager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,7 @@ func (es *Endpoints) MutEndpoints() []*Endpoint {
440440
type Manager struct {
441441
sync.RWMutex
442442
Config *ManagerConfig
443+
eventStreamer *EventStreamer
443444
eventLogger *logger.EventLogger
444445
eventSearcher *logger.EventSearcher
445446
detectionLogger *logger.EventLogger
@@ -472,6 +473,10 @@ func NewManager(c *ManagerConfig) (*Manager, error) {
472473
m.detectionLogger = logger.NewEventLogger(detectionDir, "logs.gz", utils.Giga)
473474
m.detectionSearcher = logger.NewEventSearcher(detectionDir)
474475

476+
// Create a new streamer
477+
m.eventStreamer = NewEventStreamer()
478+
m.eventStreamer.Stream()
479+
475480
if c.EndpointAPI.Port <= 0 || c.EndpointAPI.Port > 65535 {
476481
return nil, fmt.Errorf("Manager Endpoint API Error: invalid port to listen to %d", c.EndpointAPI.Port)
477482
}

0 commit comments

Comments
 (0)