Skip to content
This repository was archived by the owner on Dec 20, 2024. It is now read-only.

Commit 5c732d5

Browse files
authored
Merge pull request #1456 from ansinlee/master
add preheat implements
2 parents 4906cd1 + 9aafd32 commit 5c732d5

16 files changed

+1010
-14
lines changed

.circleci/config.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ jobs:
6666

6767
unit-test-golang:
6868
docker:
69-
- image: circleci/golang:1.12.10
69+
- image: circleci/golang:1.13.15
7070
working_directory: /go/src/github.com/dragonflyoss/Dragonfly
7171
steps:
7272
- checkout
@@ -88,7 +88,7 @@ jobs:
8888

8989
api-integration-test:
9090
docker:
91-
- image: circleci/golang:1.12.10
91+
- image: circleci/golang:1.13.15
9292
working_directory: /go/src/github.com/dragonflyoss/Dragonfly
9393
steps:
9494
- checkout
@@ -104,7 +104,7 @@ jobs:
104104

105105
release:
106106
docker:
107-
- image: circleci/golang:1.12.10
107+
- image: circleci/golang:1.13.15
108108
working_directory: /go/src/github.com/dragonflyoss/Dragonfly
109109
steps:
110110
- checkout

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM golang:1.12.10-alpine as builder
1+
FROM golang:1.13.15-alpine as builder
22

33
WORKDIR /go/src/github.com/dragonflyoss/Dragonfly
44
RUN apk --no-cache add bash make gcc libc-dev git

Dockerfile.supernode

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM golang:1.12.10-alpine as builder
1+
FROM golang:1.13.15-alpine as builder
22

33
WORKDIR /go/src/github.com/dragonflyoss/Dragonfly
44
RUN apk --no-cache add bash make gcc libc-dev git
@@ -9,6 +9,7 @@ COPY . /go/src/github.com/dragonflyoss/Dragonfly
99
# write the resulting executable to the dir /opt/dragonfly/df-supernode.
1010
ARG GOPROXY
1111
RUN make build-supernode && make install-supernode
12+
RUN make build-client && make install-client
1213

1314
FROM dragonflyoss/nginx:apline
1415

@@ -17,6 +18,9 @@ RUN apk --no-cache add ca-certificates bash
1718
COPY --from=builder /go/src/github.com/dragonflyoss/Dragonfly/hack/start-supernode.sh /root/start.sh
1819
COPY --from=builder /go/src/github.com/dragonflyoss/Dragonfly/hack/supernode-nginx.conf /etc/nginx/nginx.conf
1920
COPY --from=builder /opt/dragonfly/df-supernode/supernode /opt/dragonfly/df-supernode/supernode
21+
COPY --from=builder /opt/dragonfly/df-client /opt/dragonfly/df-client
22+
RUN ln -s /opt/dragonfly/df-client/dfget /usr/local/bin/dfget
23+
2024

2125
# supernode will listen 8001,8002 in default.
2226
EXPOSE 8001 8002

apis/types/preheat_info.go

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hack/build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ DFDAEMON_BINARY_NAME=dfdaemon
88
DFGET_BINARY_NAME=dfget
99
SUPERNODE_BINARY_NAME=supernode
1010
PKG=github.com/dragonflyoss/Dragonfly
11-
BUILD_IMAGE=golang:1.12.10
11+
BUILD_IMAGE=golang:1.13.15
1212
VERSION=$(git describe --tags "$(git rev-list --tags --max-count=1)")
1313
REVISION=$(git rev-parse --short HEAD)
1414
DATE=$(date "+%Y%m%d-%H:%M:%S")
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright The Dragonfly Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package preheat
17+
18+
import (
19+
"sync"
20+
21+
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
22+
)
23+
24+
var _ Preheater = &BasePreheater{}
25+
26+
type BasePreheater struct {}
27+
28+
/**
29+
* The type of this preheater
30+
*/
31+
func (p *BasePreheater) Type() string {
32+
panic("not implement")
33+
}
34+
35+
/**
36+
* Create a worker to preheat the task.
37+
*/
38+
func (p *BasePreheater) NewWorker(task *mgr.PreheatTask , service *PreheatService) IWorker {
39+
panic("not implement")
40+
}
41+
42+
/**
43+
* cancel the running task
44+
*/
45+
func (p *BasePreheater) Cancel(id string) {
46+
woker, ok := workerMap.Load(id)
47+
if !ok {
48+
return
49+
}
50+
woker.(IWorker).Stop()
51+
}
52+
53+
/**
54+
* remove a running preheat task
55+
*/
56+
func (p *BasePreheater) Remove(id string) {
57+
p.Cancel(id)
58+
workerMap.Delete(id)
59+
}
60+
61+
/**
62+
* add a worker to workerMap.
63+
*/
64+
func (p *BasePreheater) addWorker(id string, worker IWorker) {
65+
workerMap.Store(id, worker)
66+
}
67+
68+
var workerMap = new(sync.Map)
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright The Dragonfly Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package preheat
17+
18+
import (
19+
"runtime/debug"
20+
"sync/atomic"
21+
"time"
22+
23+
"github.com/dragonflyoss/Dragonfly/apis/types"
24+
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
25+
)
26+
27+
const TIMEOUT = 30 * 60
28+
29+
var _ IWorker = &BaseWorker{}
30+
31+
type IWorker interface{
32+
Run()
33+
Stop()
34+
query() chan error
35+
preRun() bool
36+
failed(errMsg string)
37+
afterRun()
38+
}
39+
40+
type BaseWorker struct {
41+
Task *mgr.PreheatTask
42+
Preheater Preheater
43+
PreheatService *PreheatService
44+
stop *atomic.Value
45+
worker IWorker
46+
}
47+
48+
func newBaseWorker(task *mgr.PreheatTask, preheater Preheater, preheatService *PreheatService) *BaseWorker {
49+
worker := &BaseWorker{
50+
Task: task,
51+
Preheater: preheater,
52+
PreheatService: preheatService,
53+
stop: new(atomic.Value),
54+
}
55+
worker.worker = worker
56+
return worker
57+
}
58+
59+
func (w *BaseWorker) Run() {
60+
go func() {
61+
defer func(){
62+
e := recover()
63+
if e != nil {
64+
debug.PrintStack()
65+
}
66+
}()
67+
68+
if w.worker.preRun() {
69+
timer := time.NewTimer(time.Second*TIMEOUT)
70+
ch := w.worker.query()
71+
select {
72+
case <-timer.C:
73+
w.worker.failed("timeout")
74+
case err := <-ch:
75+
if err != nil {
76+
w.worker.failed(err.Error())
77+
}
78+
}
79+
}
80+
w.worker.afterRun()
81+
}()
82+
}
83+
84+
func (w *BaseWorker) Stop() {
85+
w.stop.Store(true)
86+
}
87+
88+
func (w *BaseWorker) isRunning() bool {
89+
return w.stop.Load() == nil
90+
}
91+
92+
func (w *BaseWorker) preRun() bool {
93+
panic("not implement")
94+
}
95+
96+
func (w *BaseWorker) afterRun() {
97+
w.Preheater.Remove(w.Task.ID)
98+
}
99+
100+
func (w *BaseWorker) query() chan error {
101+
panic("not implement")
102+
}
103+
104+
func (w *BaseWorker) succeed() {
105+
w.Task.FinishTime = time.Now().UnixNano()/int64(time.Millisecond)
106+
w.Task.Status = types.PreheatStatusSUCCESS
107+
w.PreheatService.Update(w.Task.ID, w.Task)
108+
}
109+
110+
func (w *BaseWorker) failed(errMsg string) {
111+
w.Task.FinishTime = time.Now().UnixNano()/int64(time.Millisecond)
112+
w.Task.Status = types.PreheatStatusFAILED
113+
w.Task.ErrorMsg = errMsg
114+
w.PreheatService.Update(w.Task.ID, w.Task)
115+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright The Dragonfly Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package preheat
17+
18+
import (
19+
"errors"
20+
"fmt"
21+
"github.com/sirupsen/logrus"
22+
"time"
23+
24+
"github.com/dragonflyoss/Dragonfly/apis/types"
25+
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
26+
)
27+
28+
func init() {
29+
RegisterPreheater("file", &FilePreheat{BasePreheater:new(BasePreheater)})
30+
logrus.StandardLogger().SetLevel(logrus.DebugLevel)
31+
}
32+
33+
type FilePreheat struct {
34+
*BasePreheater
35+
}
36+
37+
func (p *FilePreheat) Type() string {
38+
return "file"
39+
}
40+
41+
/**
42+
* Create a worker to preheat the task.
43+
*/
44+
func (p *FilePreheat) NewWorker(task *mgr.PreheatTask , service *PreheatService) IWorker {
45+
worker := &FileWorker{BaseWorker: newBaseWorker(task, p, service)}
46+
worker.worker = worker
47+
p.addWorker(task.ID, worker)
48+
return worker
49+
}
50+
51+
type FileWorker struct {
52+
*BaseWorker
53+
progress *PreheatProgress
54+
}
55+
56+
func (w *FileWorker) preRun() bool {
57+
w.Task.Status = types.PreheatStatusRUNNING
58+
w.PreheatService.Update(w.Task.ID, w.Task)
59+
var err error
60+
w.progress, err = w.PreheatService.ExecutePreheat(w.Task)
61+
if err != nil {
62+
w.failed(err.Error())
63+
return false
64+
}
65+
return true
66+
}
67+
68+
func (w *FileWorker) afterRun() {
69+
if w.progress != nil {
70+
w.progress.cmd.Process.Kill()
71+
}
72+
w.BaseWorker.afterRun()
73+
}
74+
75+
func (w *FileWorker) query() chan error {
76+
result := make(chan error, 1)
77+
go func(){
78+
time.Sleep(time.Second*2)
79+
for w.isRunning() {
80+
if w.Task.FinishTime > 0 {
81+
w.Preheater.Cancel(w.Task.ID)
82+
return
83+
}
84+
if w.progress == nil {
85+
w.succeed()
86+
return
87+
}
88+
status := w.progress.cmd.ProcessState
89+
if status != nil && status.Exited() {
90+
if !status.Success() {
91+
errMsg := fmt.Sprintf("dfget failed: %s err: %s", status.String(), w.progress.errmsg.String())
92+
w.failed(errMsg)
93+
w.Preheater.Cancel(w.Task.ID)
94+
result <- errors.New(errMsg)
95+
return
96+
} else {
97+
w.succeed()
98+
w.Preheater.Cancel(w.Task.ID)
99+
result <- nil
100+
return
101+
}
102+
}
103+
104+
time.Sleep(time.Second*10)
105+
}
106+
}()
107+
return result
108+
}
109+

0 commit comments

Comments
 (0)