Skip to content

Commit 8bd15c4

Browse files
committed
- removes dir from the sources
- fix default not working for transfer type - new handle pool without locks
1 parent 66b97f0 commit 8bd15c4

File tree

7 files changed

+288
-212
lines changed

7 files changed

+288
-212
lines changed

args.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ func newParamParserValidator() paramParserValidator {
129129
}
130130

131131
args := &arguments{
132+
keepDirStructure: true,
132133
numberOfReaders: defaultNumberOfReaders,
133134
numberOfWorkers: defaultNumberOfWorkers,
134135
blockSizeStr: defaultBlockSizeStr,
@@ -163,7 +164,8 @@ func (p *paramParserValidator) parseAndValidate() error {
163164
p.pvgHTTPTimeOut,
164165
p.pvgDupCheck,
165166
p.pvgParseBlockSize,
166-
p.pvgQuietMode)
167+
p.pvgQuietMode,
168+
p.pvgKeepDirectoryStructure)
167169

168170
if err != nil {
169171
return err
@@ -246,6 +248,10 @@ func (p *paramParserValidator) getSourceRules() ([]parseAndValidationRule, error
246248
//**************************
247249

248250
//Global rules....
251+
func (p *paramParserValidator) pvgKeepDirectoryStructure() error {
252+
p.params.keepDirStructure = p.args.keepDirStructure
253+
return nil
254+
}
249255
func (p *paramParserValidator) pvgQuietMode() error {
250256
p.params.quietMode = p.args.quietMode
251257
return nil

sources/multifile.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func newMultiFilePipeline(files []string, targetAliases []string, blockSize uint
104104
totalNumberOfBlocks := 0
105105
var totalSize uint64
106106
var err error
107-
fileInfos := make(map[string]FileInfo, len(files))
107+
fileInfos := make(map[string]FileInfo)
108108
useTargetAlias := len(targetAliases) == len(files)
109109
for f := 0; f < len(files); f++ {
110110
var fileStat os.FileInfo
@@ -114,6 +114,11 @@ func newMultiFilePipeline(files []string, targetAliases []string, blockSize uint
114114
log.Fatalf("Error: %v", err)
115115
}
116116

117+
//directories are not allowed... so skipping them
118+
if fileStat.IsDir() {
119+
continue
120+
}
121+
117122
if fileStat.Size() == 0 {
118123
log.Fatalf("Empty files are not allowed. The file %v is empty", files[f])
119124
}

targets/azureblock.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,6 @@ func (t *AzureBlock) PreProcessSourceInfo(source *pipeline.SourceInfo, blockSize
8888
return fmt.Errorf("Block size is too small, minimum block size for this file would be %d bytes", minBlkSize)
8989
}
9090

91-
//clean uncommitted blocks for large files only (>1GB)
92-
/*
93-
if source.Size > util.GB {
94-
return t.azutil.CleanUncommittedBlocks(source.TargetAlias)
95-
}
96-
*/
97-
9891
return nil
9992
}
10093

@@ -126,7 +119,7 @@ func (t *AzureBlock) ProcessWrittenPart(result *pipeline.WorkerResult, listInfo
126119
func (t *AzureBlock) WritePart(part *pipeline.Part) (duration time.Duration, startTime time.Time, numOfRetries int, err error) {
127120
startTime = time.Now()
128121
defer func() { duration = time.Now().Sub(startTime) }()
129-
defer util.PrintfIfDebug("WritePart -> blockid:%v read:%v name:%v err:%v", part.BlockID, len(part.Data), part.TargetAlias, err)
122+
util.PrintfIfDebug("WritePart -> blockid:%v read:%v name:%v err:%v", part.BlockID, len(part.Data), part.TargetAlias, err)
130123
//computation of the MD5 happens is done by the readers.
131124
var md5 []byte
132125
if part.IsMD5Computed() {
@@ -140,6 +133,8 @@ func (t *AzureBlock) WritePart(part *pipeline.Part) (duration time.Duration, sta
140133
return
141134
}
142135
reader := bytes.NewReader(part.Data)
136+
util.PrintfIfDebug("WritePart -> blockid:%v reader:%v name:%v err:%v", part.BlockID, reader.Len(), part.TargetAlias, err)
137+
143138
err = t.azutil.PutBlock(t.container, part.TargetAlias, part.BlockID, reader)
144139
return
145140
}

targets/azurepage.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@ func NewAzurePagePipeline(params AzureTargetParams) pipeline.TargetPipeline {
2727
log.Fatal(err)
2828
}
2929

30-
_, err = az.CreateContainerIfNotExists()
30+
var notfound bool
31+
notfound, err = az.CreateContainerIfNotExists()
32+
33+
if notfound {
34+
fmt.Printf("Info! Container was not found, creating it...\n")
35+
}
3136

3237
if err != nil {
3338
log.Fatal(err)

targets/handleman.go

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
package targets
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"os"
7+
"path/filepath"
8+
)
9+
10+
type fileHandleFactory struct {
11+
cacheSize int
12+
maxCacheSize int
13+
handleProvider handleProvider
14+
handleReq chan factoryRequest
15+
closeReq chan factoryCloseRequest
16+
returnReq chan factoryReturnRequest
17+
fileHandles map[string]*os.File
18+
overwrite bool
19+
}
20+
type factoryCloseRequest struct {
21+
path string
22+
err chan error
23+
}
24+
type factoryReturnRequest struct {
25+
path string
26+
handle *os.File
27+
}
28+
type factoryRequest struct {
29+
path string
30+
response chan factoryResponse
31+
}
32+
33+
type factoryResponse struct {
34+
handle *os.File
35+
err error
36+
}
37+
38+
func (f *fileHandleFactory) startFactory() {
39+
40+
go func() {
41+
for {
42+
select {
43+
case req, ok := <-f.handleReq:
44+
var err error
45+
46+
if !ok {
47+
break
48+
}
49+
50+
fh, exists := f.fileHandles[req.path]
51+
52+
if !exists {
53+
responseChan := make(chan handleProviderResponse, 1)
54+
f.handleProvider.handleReq <- handleProviderRequest{path: req.path,
55+
response: responseChan}
56+
resp := <-responseChan
57+
fh = resp.handle
58+
err = resp.err
59+
} else {
60+
delete(f.fileHandles, req.path)
61+
}
62+
63+
resp := factoryResponse{handle: fh, err: err}
64+
65+
select {
66+
case req.response <- resp:
67+
default:
68+
}
69+
70+
case ret := <-f.returnReq:
71+
var err error
72+
if f.cacheSize < f.maxCacheSize {
73+
f.fileHandles[ret.path] = ret.handle
74+
f.cacheSize++
75+
} else {
76+
err = ret.handle.Close()
77+
78+
if err != nil {
79+
log.Fatalf("The handle failed close. Err:%v", err)
80+
}
81+
}
82+
83+
case close := <-f.closeReq:
84+
var err error
85+
if fh, ok := f.fileHandles[close.path]; ok {
86+
87+
err = fh.Close()
88+
delete(f.fileHandles, close.path)
89+
f.cacheSize--
90+
}
91+
92+
select {
93+
case close.err <- err:
94+
default:
95+
}
96+
97+
}
98+
}
99+
}()
100+
}
101+
102+
type poolHandlerManager struct {
103+
factories []fileHandleFactory
104+
handleReq chan factoryRequest
105+
returnReq chan factoryReturnRequest
106+
}
107+
108+
func newpoolHandlerManager(numOfHandlesPerFile int, numberOfHandlersInCache int, overwrite bool) *poolHandlerManager {
109+
factories := make([]fileHandleFactory, numOfHandlesPerFile)
110+
handleReq := make(chan factoryRequest, 100)
111+
returnReq := make(chan factoryReturnRequest, 100)
112+
handleProvider := newhandleProvider()
113+
handleProvider.startProvider(overwrite)
114+
115+
for i := 0; i < numOfHandlesPerFile; i++ {
116+
//close request is unique for factory as handles need to close in all of them
117+
closeReq := make(chan factoryCloseRequest, 100)
118+
119+
factory := fileHandleFactory{
120+
maxCacheSize: numberOfHandlersInCache / numOfHandlesPerFile,
121+
overwrite: overwrite,
122+
handleReq: handleReq,
123+
returnReq: returnReq,
124+
fileHandles: make(map[string]*os.File),
125+
handleProvider: handleProvider,
126+
closeReq: closeReq}
127+
128+
factory.startFactory()
129+
130+
factories[i] = factory
131+
}
132+
133+
return &poolHandlerManager{factories: factories, handleReq: handleReq, returnReq: returnReq}
134+
}
135+
func (p *poolHandlerManager) getHandle(path string) (*os.File, error) {
136+
respChan := make(chan factoryResponse, 1)
137+
req := factoryRequest{path: path, response: respChan}
138+
139+
p.handleReq <- req
140+
141+
response := <-respChan
142+
143+
return response.handle, response.err
144+
}
145+
146+
func (p *poolHandlerManager) returnHandle(path string, fileHandle *os.File) error {
147+
retrequest := factoryReturnRequest{path: path, handle: fileHandle}
148+
p.returnReq <- retrequest
149+
return nil
150+
}
151+
152+
func (p *poolHandlerManager) closeCacheHandles(path string) error {
153+
154+
for _, factory := range p.factories {
155+
errch := make(chan error, 1)
156+
closerequest := factoryCloseRequest{path: path, err: errch}
157+
factory.closeReq <- closerequest
158+
err := <-errch
159+
160+
if err != nil {
161+
return err
162+
}
163+
}
164+
return nil
165+
}
166+
167+
type handleProvider struct {
168+
init map[string]bool
169+
handleReq chan handleProviderRequest
170+
}
171+
172+
type handleProviderRequest struct {
173+
path string
174+
response chan handleProviderResponse
175+
}
176+
177+
type handleProviderResponse struct {
178+
handle *os.File
179+
err error
180+
}
181+
182+
func newhandleProvider() handleProvider {
183+
reqChan := make(chan handleProviderRequest, 100)
184+
return handleProvider{
185+
init: make(map[string]bool),
186+
handleReq: reqChan,
187+
}
188+
}
189+
190+
func (h *handleProvider) startProvider(overwrite bool) {
191+
go func() {
192+
for {
193+
req, ok := <-h.handleReq
194+
195+
if !ok {
196+
return
197+
}
198+
199+
_, exists := h.init[req.path]
200+
var fh *os.File
201+
var err error
202+
if !exists {
203+
fh, err = h.initFile(req.path, overwrite)
204+
} else {
205+
fh, err = os.OpenFile(req.path, os.O_WRONLY, os.ModeAppend)
206+
}
207+
208+
select {
209+
case req.response <- handleProviderResponse{handle: fh, err: err}:
210+
default:
211+
}
212+
213+
h.init[req.path] = true
214+
215+
}
216+
}()
217+
}
218+
219+
func (h *handleProvider) initFile(filePath string, overwrite bool) (*os.File, error) {
220+
var fh *os.File
221+
var err error
222+
223+
path := filepath.Dir(filePath)
224+
225+
if path != "" {
226+
err = os.MkdirAll(path, 0777)
227+
228+
if err != nil {
229+
return nil, err
230+
}
231+
}
232+
233+
if _, err = os.Stat(filePath); os.IsExist(err) || !overwrite {
234+
return nil, fmt.Errorf("The file already exists and file overwrite is disabled")
235+
}
236+
237+
if fh, err = os.Create(filePath); os.IsExist(err) {
238+
if err = os.Remove(filePath); err != nil {
239+
return nil, err
240+
}
241+
242+
if fh, err = os.Create(filePath); err != nil {
243+
return nil, err
244+
}
245+
}
246+
247+
return fh, nil
248+
}

0 commit comments

Comments
 (0)