Skip to content

Commit 59f96f0

Browse files
authored
Merge pull request #3 from rapidclock/#9nkd5-design-crawling
#9nkd5 finalized and implemented initial working design
2 parents d787233 + a768e58 commit 59f96f0

15 files changed

+508
-41
lines changed

adapter/basicadapters.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,33 +12,48 @@ import (
1212
// StdOpAdapter is an output adapter that just prints the output onto the screen.
1313
type StdOpAdapter struct{}
1414

15-
func (s *StdOpAdapter) Consume(quitCh <-chan bool) chan<- oct.CrawlOutput {
16-
listenCh := make(chan oct.CrawlOutput)
15+
func (s *StdOpAdapter) Consume() *oct.NodeChSet {
16+
listenCh := make(chan *oct.Node)
17+
quitCh := make(chan int, 1)
18+
listenChSet := &oct.NodeChSet{
19+
NodeCh: listenCh,
20+
StdChannels: &oct.StdChannels{
21+
QuitCh: quitCh,
22+
},
23+
}
1724
go func() {
1825
for {
1926
select {
2027
case output := <-listenCh:
21-
fmt.Printf("%d - %s\n", output.Depth, output.URLString)
28+
fmt.Printf("%d - %s\n", output.Depth, output.UrlString)
2229
case <-quitCh:
2330
return
2431
}
2532
}
2633
}()
27-
return listenCh
34+
return listenChSet
2835
}
2936

3037
// FileWriterAdapter is an output adapter that writes the output to a specified file.
3138
type FileWriterAdapter struct {
3239
FilePath string
3340
}
3441

35-
func (fw *FileWriterAdapter) Consume(quitCh <-chan bool) chan<- oct.CrawlOutput {
36-
listenCh := make(chan oct.CrawlOutput)
37-
fw.writeToFile(quitCh, listenCh)
38-
return listenCh
42+
func (fw *FileWriterAdapter) Consume() *oct.NodeChSet {
43+
listenCh := make(chan *oct.Node)
44+
quitCh := make(chan int, 1)
45+
listenChSet := &oct.NodeChSet{
46+
NodeCh: listenCh,
47+
StdChannels: &oct.StdChannels{
48+
QuitCh: quitCh,
49+
},
50+
}
51+
fw.writeToFile(listenCh, quitCh)
52+
return listenChSet
3953
}
4054

41-
func (fw *FileWriterAdapter) writeToFile(quitCh <-chan bool, ch <-chan oct.CrawlOutput) {
55+
func (fw *FileWriterAdapter) writeToFile(listenCh chan *oct.Node,
56+
quitCh chan int) {
4257
fp, err := fw.getFilePointer()
4358
if err != nil {
4459
fp.Close()
@@ -48,8 +63,8 @@ func (fw *FileWriterAdapter) writeToFile(quitCh <-chan bool, ch <-chan oct.Crawl
4863
defer fp.Close()
4964
for {
5065
select {
51-
case output := <-ch:
52-
fmt.Fprintf(fp, "%d - %s\n", output.Depth, output.URLString)
66+
case output := <-listenCh:
67+
fmt.Fprintf(fp, "%d - %s\n", output.Depth, output.UrlString)
5368
case <-quitCh:
5469
return
5570
}
@@ -58,6 +73,6 @@ func (fw *FileWriterAdapter) writeToFile(quitCh <-chan bool, ch <-chan oct.Crawl
5873
}
5974

6075
func (fw *FileWriterAdapter) getFilePointer() (w io.WriteCloser, err error) {
61-
w, err = os.OpenFile(fw.FilePath, os.O_RDWR|os.O_CREATE, 0755)
76+
w, err = os.OpenFile(fw.FilePath, os.O_RDWR|os.O_CREATE, 0644)
6277
return
6378
}

octopus/core.go

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,90 @@
11
package octopus
22

3-
func (o *webOctopus) setup() {}
3+
import (
4+
"fmt"
5+
"log"
6+
"time"
7+
)
8+
9+
func (o *octopus) setupOctopus() {
10+
o.setupValidProtocolMap()
11+
o.setupTimeToQuit()
12+
}
13+
14+
func (o *octopus) setupValidProtocolMap() {
15+
o.isValidProtocol = make(map[string]bool)
16+
for _, protocol := range o.ValidProtocols {
17+
o.isValidProtocol[protocol] = true
18+
}
19+
}
20+
21+
func (o *octopus) setupTimeToQuit() {
22+
if o.TimeToQuit > 0 {
23+
o.timeToQuit = time.Duration(o.TimeToQuit) * time.Second
24+
} else {
25+
log.Fatalln("TimeToQuit is not greater than 0")
26+
}
27+
}
28+
29+
func (o *octopus) SetupSystem() {
30+
o.isReady = false
31+
o.setupOctopus()
32+
33+
ingestNodeCh := make(chan *Node)
34+
ingestQuitCh := make(chan int, 1)
35+
ingestStrCh := make(chan string)
36+
37+
ingestChSet := MakeNodeChSet(ingestNodeCh, ingestQuitCh)
38+
inPipeChSet := &ingestPipeChSet{
39+
ingestNodeCh,
40+
ingestStrCh,
41+
ingestQuitCh,
42+
}
43+
44+
o.inputUrlStrChan = ingestStrCh
45+
o.masterQuitCh = make(chan int, 1)
46+
47+
outAdapterChSet := o.OpAdapter.Consume()
48+
49+
pageParseChSet := o.makeParseNodeFromHtmlPipe(ingestChSet)
50+
depthLimitChSet := o.makeCrawlDepthFilterPipe(pageParseChSet)
51+
maxDelayChSet := o.makeMaxDelayPipe(depthLimitChSet)
52+
distributorChSet := o.makeDistributorPipe(maxDelayChSet, outAdapterChSet)
53+
pageReqChSet := o.makePageRequisitionPipe(distributorChSet)
54+
invUrlFilterChSet := o.makeInvalidUrlFilterPipe(pageReqChSet)
55+
dupFilterChSet := o.makeDuplicateUrlFilterPipe(invUrlFilterChSet)
56+
protoFilterChSet := o.makeUrlProtocolFilterPipe(dupFilterChSet)
57+
linkAbsChSet := o.makeLinkAbsolutionPipe(protoFilterChSet)
58+
59+
o.makeIngestPipe(inPipeChSet, linkAbsChSet)
60+
61+
<-time.After(500 * time.Millisecond)
62+
o.isReady = true
63+
}
64+
65+
func (o *octopus) BeginCrawling(baseUrlStr string) {
66+
if !o.isReady {
67+
log.Fatal("Call BuildSystem first to setup Octopus")
68+
}
69+
go func() {
70+
o.inputUrlStrChan <- baseUrlStr
71+
}()
72+
<-o.masterQuitCh
73+
fmt.Println("Master Kill Switch Activated")
74+
}
75+
76+
func (o *octopus) GetInputUrlStrChan() chan<- string {
77+
if o.isReady {
78+
return o.inputUrlStrChan
79+
} else {
80+
return nil
81+
}
82+
}
83+
84+
func (o *octopus) GetMasterQuitChan() chan<- int {
85+
if o.isReady {
86+
return o.masterQuitCh
87+
} else {
88+
return nil
89+
}
90+
}

octopus/modelfactory.go

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,64 @@
11
package octopus
22

3+
import "sync"
4+
35
const (
4-
defaultMaxDepth int16 = 2
5-
anchorTag = "a"
6-
anchorAttrb = "href"
6+
defaultMaxDepth int64 = 2
7+
anchorTag = "a"
8+
anchorAttrb = "href"
9+
defaultTimeToQuit = 5
710
)
811

9-
// New - Creates an Instance of the Octopus Crawler with the given options.
10-
func New(opt CrawlOptions) *webOctopus {
11-
oct := &webOctopus{
12+
// NewWithDefaultOptions - Create an Instance of the Octopus with the default CrawlOptions.
13+
func NewWithDefaultOptions() *octopus {
14+
oct := &octopus{
15+
CrawlOptions: GetDefaultCrawlOptions(),
16+
visited: new(sync.Map),
17+
isReady: false,
18+
}
19+
return oct
20+
}
21+
22+
// New - Create an Instance of the Octopus with the given CrawlOptions.
23+
func New(opt *CrawlOptions) *octopus {
24+
oct := &octopus{
1225
CrawlOptions: opt,
13-
visited: make(map[Node]bool),
26+
visited: new(sync.Map),
27+
isReady: false,
1428
}
15-
oct.setup()
1629
return oct
1730
}
31+
32+
func createNode(parentUrlStr, urlStr string, depth int64) *Node {
33+
return &Node{
34+
NodeInfo: &NodeInfo{
35+
ParentUrlString: parentUrlStr,
36+
UrlString: urlStr,
37+
Depth: depth,
38+
},
39+
Body: nil,
40+
}
41+
}
42+
43+
func GetDefaultCrawlOptions() *CrawlOptions {
44+
return &CrawlOptions{
45+
MaxCrawlDepth: defaultMaxDepth,
46+
MaxCrawlLinks: -1,
47+
StayWithinBaseHost: false,
48+
CrawlRatePerSec: -1,
49+
RespectRobots: false,
50+
IncludeBody: true,
51+
OpAdapter: nil,
52+
ValidProtocols: []string{"http", "https"},
53+
TimeToQuit: defaultTimeToQuit,
54+
}
55+
}
56+
57+
func MakeNodeChSet(nodeCh chan<- *Node, quitCh chan<- int) *NodeChSet {
58+
return &NodeChSet{
59+
NodeCh: nodeCh,
60+
StdChannels: &StdChannels{
61+
QuitCh: quitCh,
62+
},
63+
}
64+
}

octopus/models.go

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,51 +2,84 @@ package octopus
22

33
import (
44
"io"
5+
"sync"
56
"time"
67
)
78

8-
// Node is used to represent each crawled link and its associated depth of crawl.
9-
type Node struct {
10-
URLString string
11-
Depth int
12-
}
13-
14-
// webOctopus is a concurrent version of webSpider.
15-
// It has an inbuilt parser based of htmlparser.Parser to collect all links in a web-page.
9+
// octopus is a concurrent web crawler.
10+
// It has an inbuilt parser based of html.NewTokenizer to collect all links in a web-page.
1611
// It also has a CrawlOptions structure to initialize setting specific
1712
// to an instance of the crawler.
18-
type webOctopus struct {
19-
CrawlOptions
20-
visited map[Node]bool
13+
type octopus struct {
14+
*CrawlOptions
15+
visited *sync.Map
16+
isReady bool
17+
adapterChSet *NodeChSet
18+
isValidProtocol map[string]bool
19+
timeToQuit time.Duration
20+
inputUrlStrChan chan string
21+
masterQuitCh chan int
2122
}
2223

2324
// CrawlOptions is used to house options for crawling.
2425
// You can specify depth of exploration for each link,
2526
// if crawler should ignore other hostnames (except from base host).
26-
// MaxLinksCrawled - Specifies the Maximum Number of Unique Links that will be crawled.
27+
// MaxCrawlDepth - Indicates the maximum depth that will be crawled,
28+
// for each new link.
29+
// MaxCrawlLinks - Specifies the Maximum Number of Unique Links that will be crawled.
2730
// Note : When combined with DepthPerLink, it will combine both.
2831
// Use -1 to indicate infinite links to be crawled (only bounded by depth of traversal).
29-
// IncludeBody - Include the response Body in the crawled Node (for further processing).
32+
// IncludeBody - Include the response Body in the crawled NodeInfo (for further processing).
3033
// OpAdapter is a user specified concrete implementation of an Output Adapter. The crawler
3134
// will pump output onto the implementation's channel returned by its Consume method.
3235
// CrawlRate is the rate at which requests will be made.
3336
// RespectRobots (unimplemented) choose whether to respect robots.txt or not.
37+
// ValidProtocols - This is an array containing the list of url protocols that
38+
// should be crawled.
39+
// TimeToQuit - represents the total time to wait between two new nodes to be
40+
// generated before the crawler quits. This is in seconds.
3441
type CrawlOptions struct {
35-
DepthPerLink int16
36-
MaxLinksCrawled int64
42+
MaxCrawlDepth int64
43+
MaxCrawlLinks int64
3744
StayWithinBaseHost bool
38-
BaseURLString string
39-
CrawlRate time.Duration
45+
CrawlRatePerSec int64
4046
RespectRobots bool
4147
IncludeBody bool
4248
OpAdapter OutputAdapter
49+
ValidProtocols []string
50+
TimeToQuit int64
4351
}
4452

45-
type CrawlOutput struct {
46-
Node
53+
// NodeInfo is used to represent each crawled link and its associated crawl depth.
54+
type NodeInfo struct {
55+
ParentUrlString string
56+
UrlString string
57+
Depth int64
58+
}
59+
60+
// Node encloses a NodeInfo and its Body (HTML) Content.
61+
type Node struct {
62+
*NodeInfo
4763
Body io.ReadCloser
4864
}
4965

66+
type StdChannels struct {
67+
QuitCh chan<- int
68+
// logCh chan<- string
69+
// errorCh chan<- string
70+
}
71+
72+
type NodeChSet struct {
73+
NodeCh chan<- *Node
74+
*StdChannels
75+
}
76+
77+
type ingestPipeChSet struct {
78+
NodeCh chan *Node
79+
StrCh chan string
80+
QuitCh chan int
81+
}
82+
5083
// OutputAdapter is the interface for the Adapter that is used to handle
5184
// output from the Octopus Crawler.
5285
// The contract stipulates that the crawler provides the channel
@@ -55,5 +88,5 @@ type CrawlOutput struct {
5588
// Implementers of the interface should listen on this channel for output from
5689
// the crawler.
5790
type OutputAdapter interface {
58-
Consume(quitCh <-chan bool) chan<- CrawlOutput
91+
Consume() *NodeChSet
5992
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package octopus
2+
3+
import (
4+
"log"
5+
"net/url"
6+
)
7+
8+
func (o *octopus) makeLinkAbsolutionPipe(outChSet *NodeChSet) *NodeChSet {
9+
return stdLinearNodeFunc(makeLinkAbsolute, outChSet)
10+
}
11+
12+
func makeLinkAbsolute(node *Node, outChSet *NodeChSet) {
13+
if node == nil || outChSet == nil {
14+
log.Fatal("NIL ERROR")
15+
return
16+
}
17+
if node.ParentUrlString != "" {
18+
linkUrl, err := url.Parse(node.UrlString)
19+
if err != nil {
20+
return
21+
}
22+
if !linkUrl.IsAbs() {
23+
baseUrl, err := url.Parse(node.ParentUrlString)
24+
if err != nil {
25+
return
26+
}
27+
absLinkUrl := baseUrl.ResolveReference(linkUrl)
28+
node.UrlString = absLinkUrl.String()
29+
}
30+
}
31+
outChSet.NodeCh <- node
32+
}

octopus/pipe_filter_crawldepth.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package octopus
2+
3+
func (o *octopus) makeCrawlDepthFilterPipe(outChSet *NodeChSet) *NodeChSet {
4+
return stdLinearNodeFunc(o.filterByUrlDepth, outChSet)
5+
}
6+
7+
func (o *octopus) filterByUrlDepth(node *Node, outChSet *NodeChSet) {
8+
if node.Depth < o.MaxCrawlDepth {
9+
outChSet.NodeCh <- node
10+
}
11+
}

0 commit comments

Comments
 (0)