Skip to content

Commit 33ce1b9

Browse files
Rewrite to support parallel processing.
1 parent 4a2e206 commit 33ce1b9

File tree

7 files changed

+132
-146
lines changed

7 files changed

+132
-146
lines changed

config.go

+16-11
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,18 @@ type Regex struct {
2323
}
2424

2525
type Config struct {
26-
keys map[string]time.Time
27-
db *bolt.DB
28-
Keywords []*Keyword // A list of keywords to search for in the data.
29-
Regexes []*Regex // A list of regular expressions to test against data.
30-
DbFile string `json:"database_file"` // File to use for the Store database.
31-
MaxSize int `json:"max_size"` // Do not save files larger than this many bytes.
32-
MaxTime int `json:"max_time"` // Max time, in seconds, to store previously downloaded keys.
33-
Sleep int // Time, in seconds, to wait between each run.
34-
GhToken string `json:"github_token"` // Github API token
35-
Save bool
36-
LocalPath string `json:"local_path"`
26+
keys map[string]time.Time
27+
db *bolt.DB
28+
Keywords []*Keyword // A list of keywords to search for in the data.
29+
Regexes []*Regex // A list of regular expressions to test against data.
30+
DbFile string `json:"database_file"` // File to use for the Store database.
31+
MaxSize int `json:"max_size"` // Do not save files larger than this many bytes.
32+
MaxTime int `json:"max_time"` // Max time, in seconds, to store previously downloaded keys.
33+
Sleep int // Time, in seconds, to wait between each run.
34+
GhToken string `json:"github_token"` // Github API token
35+
Save bool
36+
LocalPath string `json:"local_path"`
37+
FileBatchSize int `json:"file_batch_size"`
3738
}
3839

3940
func newConfig() Config {
@@ -57,5 +58,9 @@ func newConfig() Config {
5758
r.compiled = regexp.MustCompile(r.Regex)
5859
}
5960

61+
if c.FileBatchSize == 0 {
62+
c.FileBatchSize = 100
63+
}
64+
6065
return c
6166
}

files.go

+21-48
Original file line numberDiff line numberDiff line change
@@ -7,63 +7,36 @@ import (
77
"path/filepath"
88
)
99

10-
type File struct {
11-
Path string
12-
Key string
13-
Content string
14-
}
15-
16-
func (f *File) Read() {
17-
content, err := ioutil.ReadFile(f.Path)
18-
if err != nil {
19-
log.Printf("[-] Could not read file %s\n", f.Path)
20-
f.Content = ""
21-
} else {
22-
log.Printf("[+] Reading file: %s\n", f.Key)
23-
f.Content = string(content)
24-
}
25-
}
26-
27-
func (f *File) Process() {
28-
processContent(f.Key, f.Content)
29-
}
30-
31-
func (f *File) Delete() {
32-
os.Remove(f.Path)
33-
}
34-
35-
func newFile(path string) *File {
36-
f := new(File)
37-
38-
f.Path = path
39-
f.Key = filepath.Base(path)
40-
41-
return f
42-
}
43-
44-
func scrapeFiles() {
10+
func scrapeFiles(c chan<- *ProcessItem) {
4511
if conf.LocalPath == "" {
4612
return
4713
}
4814

4915
log.Println("[+] Checking for local pastes.")
5016

51-
filepath.Walk(conf.LocalPath, func(path string, info os.FileInfo, err error) error {
52-
if err != nil {
53-
log.Printf("[-] Error reading %s: %s\n", conf.LocalPath, err.Error())
54-
return nil
17+
files, err := ioutil.ReadDir(conf.LocalPath)
18+
if err != nil {
19+
log.Printf("[-] Error reading %s: %s\n", conf.LocalPath, err)
20+
return
21+
}
22+
23+
// Process files in batches
24+
for _, file := range files[:conf.FileBatchSize] {
25+
if file.IsDir() {
26+
log.Printf("[+] Skipping directory %s\n", conf.LocalPath)
27+
continue
5528
}
5629

57-
if info.IsDir() {
58-
log.Printf("[+] Skipping directory %s\n", path)
59-
return nil
30+
path := filepath.Join(conf.LocalPath, file.Name())
31+
content, err := ioutil.ReadFile(path)
32+
if err != nil {
33+
log.Printf("[-] Could not read file %s\n", path)
34+
continue
6035
}
6136

62-
f := newFile(path)
63-
f.Read()
64-
f.Process()
65-
f.Delete()
37+
item := &ProcessItem{Source: "Local", Key: file.Name(), Content: string(content)}
38+
c <- item
6639

67-
return nil
68-
})
40+
os.Remove(path)
41+
}
6942
}

gist.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,7 @@ func (g *Gist) Download() {
5353
conf.keys[g.Key] = time.Now()
5454
}
5555

56-
func (g *Gist) Process() {
57-
for _, gist := range g.files {
58-
processContent(g.Key, gist.Content)
59-
}
60-
}
61-
62-
func scrapeGists() {
56+
func scrapeGists(c chan<- *ProcessItem) {
6357
var gists []*Gist
6458

6559
log.Println("[+] Checking for new gists.")
@@ -76,6 +70,10 @@ func scrapeGists() {
7670
for i, _ := range gists {
7771
g := gists[i]
7872
g.Download()
79-
g.Process()
73+
74+
for _, gist := range g.files {
75+
item := &ProcessItem{Source: "Gist", Key: g.Key, Content: gist.Content}
76+
c <- item
77+
}
8078
}
8179
}

paste.go

+4-13
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,7 @@ import (
99
type Paste struct {
1010
ScrapeUrl string `json:"scrape_url"`
1111
Url string `json:"full_url"`
12-
Date string
1312
Key string
14-
Size int `json:",string"`
15-
Expire int `json:",string"`
16-
Title string
17-
Syntax string
18-
User string
19-
Error string
2013
Content string
2114
}
2215

@@ -33,11 +26,7 @@ func (p *Paste) Download() {
3326
conf.keys[p.Key] = time.Now()
3427
}
3528

36-
func (p *Paste) Process() {
37-
processContent(p.Key, p.Content)
38-
}
39-
40-
func scrapePastes() {
29+
func scrapePastes(c chan<- *ProcessItem) {
4130
var pastes []*Paste
4231

4332
log.Println("[+] Checking for new pastes.")
@@ -54,6 +43,8 @@ func scrapePastes() {
5443
for i, _ := range pastes {
5544
p := pastes[i]
5645
p.Download()
57-
p.Process()
46+
47+
item := &ProcessItem{Source: "Pastebin", Key: p.Key, Content: p.Content}
48+
c <- item
5849
}
5950
}

process.go

+32-31
Original file line numberDiff line numberDiff line change
@@ -2,74 +2,75 @@ package main
22

33
import (
44
"fmt"
5+
"log"
56
"strings"
67
)
78

8-
func savePaste(key, content string) {
9-
if conf.Save == false {
9+
type ProcessItem struct {
10+
Source string
11+
Key string
12+
Content string
13+
Save bool
14+
}
15+
16+
func (p *ProcessItem) Write() {
17+
if (conf.Save == false) || (p.Save == false) {
1018
return
1119
}
1220

13-
if len(content) > conf.MaxSize {
21+
if len(p.Content) > conf.MaxSize {
1422
return
1523
}
1624

17-
writeDB(conf.db, "pastes", key, []byte(content))
25+
db.Write("pastes", p.Key, []byte(p.Content))
1826
}
1927

20-
func processRegexes(key, content string) {
21-
save := false
28+
func (p *ProcessItem) Regexes() {
2229
for i, _ := range conf.Regexes {
2330
r := conf.Regexes[i]
2431

2532
switch r.Match {
2633
case "all":
27-
items := r.compiled.FindAllString(content, -1)
34+
items := r.compiled.FindAllString(p.Content, -1)
2835

2936
if items != nil {
30-
save = true
37+
p.Save = true
3138
}
3239

3340
for k := range items {
34-
rKey := fmt.Sprintf("%s-%d", key, k)
35-
writeDB(conf.db, r.Bucket, rKey, []byte(items[k]))
41+
rKey := fmt.Sprintf("%s-%d", p.Key, k)
42+
db.Write(r.Bucket, rKey, []byte(items[k]))
3643
}
3744
case "one":
38-
match := r.compiled.FindString(content)
45+
match := r.compiled.FindString(p.Content)
3946

4047
if match != "" {
41-
save = true
42-
writeDB(conf.db, r.Bucket, key, []byte(match))
48+
p.Save = true
49+
db.Write(r.Bucket, p.Key, []byte(match))
4350
}
4451
default:
4552
}
4653
}
47-
48-
if save {
49-
savePaste(key, content)
50-
}
5154
}
5255

53-
func processKeywords(key, content string) {
54-
save := false
56+
func (p *ProcessItem) Keywords() {
5557
for i, _ := range conf.Keywords {
5658
kwd := conf.Keywords[i]
5759

58-
if strings.Contains(strings.ToLower(content), strings.ToLower(kwd.Keyword)) {
59-
save = true
60-
writeDB(conf.db, kwd.Bucket, key, nil)
60+
if strings.Contains(strings.ToLower(p.Content), strings.ToLower(kwd.Keyword)) {
61+
p.Save = true
62+
db.Write(kwd.Bucket, p.Key, nil)
6163
}
6264
}
63-
64-
if save {
65-
savePaste(key, content)
66-
}
6765
}
6866

69-
func processContent(key, content string) {
70-
conf.db = getDBConn()
71-
defer conf.db.Close()
67+
func process(sem chan struct{}, pi *ProcessItem) {
68+
log.Printf("[+] Processing %s:%s.\n", pi.Source, pi.Key)
69+
sem <- struct{}{}
70+
71+
pi.Regexes()
72+
pi.Keywords()
73+
pi.Write()
7274

73-
processRegexes(key, content)
74-
processKeywords(key, content)
75+
<-sem
7576
}

scrape.go

+34-9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
)
66

77
var conf Config
8+
var db *Database
89

910
func cleanKeys() {
1011
now := time.Now()
@@ -17,19 +18,43 @@ func cleanKeys() {
1718
}
1819
}
1920

20-
func scrape() {
21-
scrapePastes()
22-
scrapeGists()
23-
scrapeFiles()
24-
}
21+
func initDatabase() {
22+
db.CreateBucket("pastes")
2523

26-
func main() {
27-
conf = newConfig()
28-
initDB()
24+
for _, kw := range conf.Keywords {
25+
db.CreateBucket(kw.Bucket)
26+
}
2927

28+
for _, re := range conf.Regexes {
29+
db.CreateBucket(re.Bucket)
30+
}
31+
}
32+
33+
func scrape(piChan chan<- *ProcessItem) {
3034
for {
31-
scrape()
35+
go scrapePastes(piChan)
36+
go scrapeGists(piChan)
37+
go scrapeFiles(piChan)
38+
3239
time.Sleep(time.Duration(conf.Sleep) * time.Second)
3340
cleanKeys()
3441
}
3542
}
43+
44+
func main() {
45+
conf = newConfig()
46+
db = newDatabase(conf.DbFile)
47+
48+
if db != nil {
49+
initDatabase()
50+
51+
processItemChan := make(chan *ProcessItem, 100)
52+
processSemaphore := make(chan struct{}, 10)
53+
54+
go scrape(processItemChan)
55+
56+
for item := range processItemChan {
57+
go process(processSemaphore, item)
58+
}
59+
}
60+
}

0 commit comments

Comments
 (0)