Skip to content

Commit cbc3c1b

Browse files
authored
Merge pull request #28 from deepfence/sensor-plugin-output
sensor: Add a posssibility to output directly to plugins
2 parents 83cb8f2 + e9dd214 commit cbc3c1b

File tree

16 files changed

+233
-89
lines changed

16 files changed

+233
-89
lines changed

cmd/sensor.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package cmd
22

33
import (
4+
"context"
45
"log"
6+
"os"
7+
"os/signal"
8+
"syscall"
59

610
"github.com/spf13/cobra"
711

@@ -19,17 +23,20 @@ var sensorCmd = &cobra.Command{
1923
log.Fatalf("Invalid configuration: %v", err)
2024
}
2125

22-
mainSignalChannel := make(chan bool)
23-
2426
proto := "tcp"
2527
if err := streamer.InitOutput(cfg, proto); err != nil {
2628
log.Fatalf("Failed to connect: %v", err)
2729
}
2830

31+
sigs := make(chan os.Signal, 1)
32+
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
33+
ctx, cancel := context.WithCancel(context.Background())
34+
2935
log.Println("Start sending")
30-
streamer.StartSensor(cfg, mainSignalChannel)
36+
streamer.StartSensor(ctx, cfg)
3137
log.Println("Now waiting in main")
32-
<-mainSignalChannel
38+
<-sigs
39+
cancel()
3340
},
3441
}
3542

contrib/config/receiver-s3.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ input:
22
address: 0.0.0.0
33
port: 8081
44
output:
5-
file:
6-
path: /dev/null
75
plugins:
86
s3:
97
region: eu-west-1

contrib/config/sensor-s3.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
output:
2+
plugins:
3+
s3:
4+
region: eu-west-1
5+
bucket: foo-pcap
6+
totalFileSize: 10MB
7+
uploadChunkSize: 5MB
8+
uploadTimeout: 1m
9+
cannedACL: bucket-owner-full-control
10+
pcapMode: all

docs/src/SUMMARY.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
- [Using with Docker](./quickstart/docker.md)
88
- [Using on Kubernetes](./quickstart/kubernetes.md)
99
- [Using on Vagrant](./quickstart/vagrant.md)
10+
- [Plugins](./plugins/README.md)
11+
- [S3](./plugins/s3.md)
1012
- [Using with other tools](./tools/README.md)
1113
- [Suricata](./tools/suricata.md)
1214
- [Configuration](./configuration.md)

docs/src/configuration.md

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,29 +3,37 @@
33
`packetstreamer` is configured using a yaml-formatted configuration file.
44

55
```yaml
6-
input: # required in 'receiver' mode
6+
input: # required in 'receiver' mode
77
address: _ip-address_
88
port: _listen-port_
99
output:
10-
server: # required in 'sensor' mode
10+
server: # required in 'sensor' mode
1111
address: _ip-address_
1212
port: _listen-port_
13-
file: # required in 'receiver' mode
14-
path: _filename_|stdout # 'stdout' is a reserved name. Receiver will write to stdout
15-
tls: # optional
13+
file: # required in 'receiver' mode
14+
path: _filename_|stdout # 'stdout' is a reserved name. Receiver will write to stdout
15+
plugins: # optional
16+
s3:
17+
bucket: _string_
18+
region: _string_
19+
totalFileSize: _file_size_ # optional; default: 10 MB
20+
uploadChunkSize: _file_size_ # optional; default: 5 MB
21+
uploadTimeout: _timeout_ # optional; default: 1m
22+
cannedACL: _acl_ # optional; default: Bucket owner enforced
23+
tls: # optional
1624
enable: _true_|_false_
1725
certfile: _filename_
1826
keyfile: _filename_
19-
auth: # optional; receiver and sensor must use same shared key
27+
auth: # optional; receiver and sensor must use same shared key
2028
enable: _true_|_false_
2129
key: _string_
22-
compressBlockSize: _integer_ # optional; default: 65
23-
inputPacketLen: _integer_ # optional; default: 65535
24-
logFilename: _filename_ # optional
25-
pcapMode: _Allow_|_Deny_|_All_ # optional
26-
capturePorts: _list-of-ports_ # optional
30+
compressBlockSize: _integer_ # optional; default: 65
31+
inputPacketLen: _integer_ # optional; default: 65535
32+
logFilename: _filename_ # optional
33+
pcapMode: _Allow_|_Deny_|_All_ # optional
34+
capturePorts: _list-of-ports_ # optional
2735
captureInterfacesPorts: _map: interface-name:port_ # optional
28-
ignorePorts: _list-of-ports_ # optional
36+
ignorePorts: _list-of-ports_ # optional
2937
```
3038
3139
You can find example configuration files in the [`/contrib/config/`](https://github.com/deepfence/PacketStreamer/tree/main/contrib/config)

docs/src/plugins/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Plugins
2+
3+
This documentation section is about plugins which allow to stream packets to
4+
various external storage services.
5+
6+
Plugins can be used both from:
7+
8+
- **sensor** - in that case, locally captured packets are streamed through the
9+
plugin
10+
- **receiver** - all packets retrieved from (potentially multiple) sensors are
11+
streamed through the plugin
12+
13+
Currently the plugins are:
14+
15+
- [S3](./s3.md)

docs/src/plugins/s3.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# S3
2+
3+
The S3 plugins allows to stream packets to the given S3 buckets.
4+
5+
## Configuration
6+
7+
### AWS credentials
8+
9+
Before running PacketStreamer, AWS credentials need to be configured by one of
10+
the following ways:
11+
12+
- `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables
13+
- `~/.aws/config` file - it can be created by `aws configure`
14+
15+
The first way might be more convenient when running as root (required when
16+
running a sensor).
17+
18+
### Configuration scheme
19+
20+
S3 plugin configuration has the following syntax:
21+
22+
```yaml
23+
output:
24+
plugins: # optional
25+
s3:
26+
bucket: _string_
27+
region: _string_
28+
totalFileSize: _file_size_ # optional; default: 10 MB
29+
uploadChunkSize: _file_size_ # optional; default: 5 MB
30+
uploadTimeout: _timeout_ # optional; default: 1m
31+
cannedACL: _acl_ # optional; default: Bucket owner enforced
32+
```
33+
34+
### Sensor configuration
35+
36+
If you want to stream locally captured packets from sensor to S3, you can use
37+
the following example configuration from
38+
[contrib/config/sensor-s3.yaml](https://raw.githubusercontent.com/deepfence/PacketStreamer/main/contrib/config/sensor-s3.yaml):
39+
40+
```yaml
41+
{{#rustdoc_include ../../../contrib/config/sensor-s3.yaml}}
42+
```
43+
44+
And run PacketStreamer with it:
45+
46+
```bash
47+
sudo packetstreamer sensor --config ./contrib/config/sensor-s3.yaml
48+
```
49+
50+
### Receiver configuration
51+
52+
If you want to stream packets from receiver to S3, you can use the following
53+
example configuration from
54+
[contrib/config/receiver-s3.yaml]
55+
56+
```yaml
57+
{{#rustdoc_include ../../../contrib/config/receiver-s3.yaml}}
58+
```
59+
60+
```bash
61+
packetstreamer receiver --config ./contrib/config/receiver-s3.yaml
62+
```

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/deepfence/PacketStreamer
33
go 1.17
44

55
require (
6-
github.com/aws/aws-sdk-go-v2 v1.16.2
6+
github.com/aws/aws-sdk-go-v2 v1.16.4
77
github.com/aws/aws-sdk-go-v2/config v1.15.3
88
github.com/aws/aws-sdk-go-v2/service/s3 v1.26.3
99
github.com/confluentinc/confluent-kafka-go v1.8.2

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
github.com/aws/aws-sdk-go-v2 v1.16.2 h1:fqlCk6Iy3bnCumtrLz9r3mJ/2gUT0pJ0wLFVIdWh+JA=
22
github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU=
3+
github.com/aws/aws-sdk-go-v2 v1.16.4 h1:swQTEQUyJF/UkEA94/Ga55miiKFoXmm/Zd67XHgmjSg=
4+
github.com/aws/aws-sdk-go-v2 v1.16.4/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU=
35
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 h1:SdK4Ppk5IzLs64ZMvr6MrSficMtjY2oS0WOORXTlxwU=
46
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1/go.mod h1:n8Bs1ElDD2wJ9kCRTczA83gYbBmjSwZp3umc6zF4EeM=
57
github.com/aws/aws-sdk-go-v2/config v1.15.3 h1:5AlQD0jhVXlGzwo+VORKiUuogkG7pQcLJNzIzK7eodw=

pkg/config/sensor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ var (
1010
)
1111

1212
func ValidateSensorConfig(config *Config) error {
13-
if config.Output.File == nil && config.Output.Server == nil {
13+
if config.Output.File == nil && config.Output.Server == nil &&
14+
(config.Output.Plugins == nil ||
15+
(config.Output.Plugins.S3 == nil && config.Output.Plugins.Kafka == nil)) {
1416
return ErrNoOutputConfigured
1517
}
1618
if config.Output.Server != nil && config.Output.Server.Port == nil {

pkg/plugins/plugins.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package plugins
33
import (
44
"context"
55
"fmt"
6+
"log"
7+
68
"github.com/deepfence/PacketStreamer/pkg/config"
79
"github.com/deepfence/PacketStreamer/pkg/plugins/kafka"
810
"github.com/deepfence/PacketStreamer/pkg/plugins/s3"
@@ -18,7 +20,8 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) {
1820
var plugins []chan<- string
1921

2022
if config.Output.Plugins.S3 != nil {
21-
s3plugin, err := s3.NewPlugin(ctx, config.Output.Plugins.S3)
23+
log.Println("Starting S3 plugin")
24+
s3plugin, err := s3.NewPlugin(ctx, config)
2225

2326
if err != nil {
2427
return nil, fmt.Errorf("error starting S3 plugin, %v", err)
@@ -29,6 +32,7 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) {
2932
}
3033

3134
if config.Output.Plugins.Kafka != nil {
35+
log.Println("Starting Kafka plugin")
3236
kafkaPlugin, err := kafka.NewPlugin(config.Output.Plugins.Kafka)
3337

3438
if err != nil {

pkg/plugins/s3/s3.go

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,18 @@ package s3
33
import (
44
"bytes"
55
"context"
6-
"encoding/binary"
76
"fmt"
7+
"log"
8+
"time"
9+
810
"github.com/aws/aws-sdk-go-v2/aws"
911
awsConfig "github.com/aws/aws-sdk-go-v2/config"
1012
"github.com/aws/aws-sdk-go-v2/service/s3"
1113
"github.com/aws/aws-sdk-go-v2/service/s3/types"
14+
"github.com/google/gopacket/layers"
15+
"github.com/google/gopacket/pcapgo"
16+
1217
"github.com/deepfence/PacketStreamer/pkg/config"
13-
"github.com/deepfence/PacketStreamer/pkg/file"
14-
"log"
15-
"time"
1618
)
1719

1820
const (
@@ -23,6 +25,7 @@ type Plugin struct {
2325
S3Client *s3.Client
2426
Region string
2527
Bucket string
28+
InputPacketLen int
2629
TotalFileSize uint64
2730
UploadChunkSize uint64
2831
UploadTimeout time.Duration
@@ -36,8 +39,9 @@ type MultipartUpload struct {
3639
TotalDataSent int
3740
}
3841

39-
func NewPlugin(ctx context.Context, config *config.S3PluginConfig) (*Plugin, error) {
40-
awsCfg, err := awsConfig.LoadDefaultConfig(ctx, awsConfig.WithRegion(config.Region))
42+
func NewPlugin(ctx context.Context, config *config.Config) (*Plugin, error) {
43+
awsCfg, err := awsConfig.LoadDefaultConfig(ctx,
44+
awsConfig.WithRegion(config.Output.Plugins.S3.Region))
4145

4246
if err != nil {
4347
return nil, fmt.Errorf("error loading AWS config when creating S3 client, %v", err)
@@ -51,12 +55,12 @@ func NewPlugin(ctx context.Context, config *config.S3PluginConfig) (*Plugin, err
5155

5256
return &Plugin{
5357
S3Client: s3Client,
54-
Region: config.Region,
55-
Bucket: config.Bucket,
56-
TotalFileSize: uint64(*config.TotalFileSize),
57-
UploadChunkSize: uint64(*config.UploadChunkSize),
58-
UploadTimeout: config.UploadTimeout,
59-
CannedACL: config.CannedACL,
58+
Region: config.Output.Plugins.S3.Region,
59+
Bucket: config.Output.Plugins.S3.Bucket,
60+
TotalFileSize: uint64(*config.Output.Plugins.S3.TotalFileSize),
61+
UploadChunkSize: uint64(*config.Output.Plugins.S3.UploadChunkSize),
62+
UploadTimeout: config.Output.Plugins.S3.UploadTimeout,
63+
CannedACL: config.Output.Plugins.S3.CannedACL,
6064
}, nil
6165
}
6266

@@ -78,7 +82,6 @@ func (mpu *MultipartUpload) appendToBuffer(data []byte) {
7882
func (p *Plugin) Start(ctx context.Context) chan<- string {
7983
inputChan := make(chan string)
8084
go func() {
81-
payloadMarker := []byte{0x0, 0x0, 0x0, 0x0}
8285
var mpu *MultipartUpload
8386

8487
for {
@@ -92,18 +95,8 @@ func (p *Plugin) Start(ctx context.Context) chan<- string {
9295
log.Printf("error creating multipart upload, stopping... - %v\n", err)
9396
return
9497
}
95-
96-
mpu.appendToBuffer(file.Header)
97-
98-
if err != nil {
99-
log.Printf("error adding header to buffer, stopping... - %v\n", err)
100-
return
101-
}
10298
}
10399
data := []byte(chunk)
104-
dataLen := len(data)
105-
binary.LittleEndian.PutUint32(payloadMarker[:], uint32(dataLen))
106-
mpu.appendToBuffer(payloadMarker)
107100
mpu.appendToBuffer(data)
108101

109102
if uint64(len(mpu.Buffer)) >= p.UploadChunkSize {
@@ -230,5 +223,13 @@ func (p *Plugin) createMultipartUpload(ctx context.Context) (*MultipartUpload, e
230223
return nil, fmt.Errorf("error creating multipart upload, %v", err)
231224
}
232225

233-
return newMultipartUpload(output), nil
226+
mpu := newMultipartUpload(output)
227+
228+
var pcapBuffer bytes.Buffer
229+
pcapWriter := pcapgo.NewWriter(&pcapBuffer)
230+
pcapWriter.WriteFileHeader(uint32(p.InputPacketLen), layers.LinkTypeEthernet)
231+
232+
mpu.appendToBuffer(pcapBuffer.Bytes())
233+
234+
return mpu, nil
234235
}

pkg/streamer/common.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ var (
2626
hdrData = [...]byte{0xde, 0xef, 0xec, 0xe0}
2727
)
2828

29-
func writeOutput(config *config.Config, tmpData []byte) int {
29+
func writeOutput(config *config.Config, tmpData []byte) error {
30+
if outputFd == nil {
31+
return nil
32+
}
3033

3134
var numAttempts = 0
3235
reconnectAttempt := false
@@ -38,15 +41,13 @@ func writeOutput(config *config.Config, tmpData []byte) int {
3841
reconnectAttempt = true
3942
err := InitOutput(config, "tcp")
4043
if err != nil {
41-
log.Printf("Tried to reconnect but got: %v\n", err)
42-
return 1
44+
return fmt.Errorf("tried to reconnect but got: %w", err)
4345
}
4446
log.Printf("Tried to write for %d times. Reconnecting once. \n", numAttempts)
4547
numAttempts = 0
4648
continue
4749
}
48-
log.Printf("Tried to write for %d times. Bailing out. \n", numAttempts)
49-
return 1
50+
return fmt.Errorf("tried to write for %d times", numAttempts)
5051
}
5152

5253
bytesWritten, err := outputFd.Write(tmpData[totalBytesWritten:])
@@ -63,7 +64,7 @@ func writeOutput(config *config.Config, tmpData []byte) int {
6364
continue
6465
}
6566

66-
return 0
67+
return nil
6768
}
6869
}
6970

0 commit comments

Comments
 (0)