Skip to content

Commit 13f6a6a

Browse files
committed
add timeout for reconnecting
1 parent 1bf475c commit 13f6a6a

File tree

2 files changed

+31
-5
lines changed

2 files changed

+31
-5
lines changed

cluster.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,9 @@ func (cr *Cluster) Connect(ctx context.Context) bool {
175175
}()
176176
}
177177
logInfof("Dialing %s", strings.ReplaceAll(wsurl, cr.password, "<******>"))
178-
err := cr.socket.IO().DialContext(ctx, wsurl, WithHeader(header))
178+
tctx, cancel := context.WithTimeout(ctx, time.Second*15)
179+
err := cr.socket.IO().DialContext(tctx, wsurl, WithHeader(header))
180+
cancel()
179181
if err != nil {
180182
logError("Websocket connect error:", err)
181183
return false
@@ -210,12 +212,14 @@ func (cr *Cluster) Enable(ctx context.Context) (err error) {
210212
return
211213
}
212214
logInfo("Sending enable packet")
213-
data, err := cr.socket.EmitAckContext(ctx, "enable", Map{
215+
tctx, cancel := context.WithTimeout(ctx, time.Second*10)
216+
data, err := cr.socket.EmitAckContext(tctx, "enable", Map{
214217
"host": cr.host,
215218
"port": cr.publicPort,
216219
"version": ClusterVersion,
217220
"byoc": cr.byoc,
218221
})
222+
cancel()
219223
if err != nil {
220224
return
221225
}

cluster_oss.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ func (cr *Cluster) ossSyncFiles(ctx context.Context, files []FileInfo) error {
7979
logInfof("Starting sync files, count: %d, total: %s", fl, bytesToUnit(stats.totalsize))
8080
start := time.Now()
8181

82+
done := make(chan struct{}, 1)
83+
8284
for _, f := range missing {
8385
logDebugf("File %s is for %v", f.Hash, f.targets)
8486
pathRes, err := cr.fetchFile(ctx, &stats, f.FileInfo)
@@ -87,15 +89,35 @@ func (cr *Cluster) ossSyncFiles(ctx context.Context, files []FileInfo) error {
8789
return err
8890
}
8991
go func(f *fileInfoWithTargets) {
92+
defer func() {
93+
done <- struct{}{}
94+
}()
9095
select {
9196
case path := <-pathRes:
9297
if path != "" {
9398
defer os.Remove(path)
99+
var srcFd *os.File
100+
if srcFd, err = os.Open(path); err != nil {
101+
return
102+
}
103+
defer srcFd.Close()
94104
relpath := hashToFilename(f.Hash)
95105
for _, target := range f.targets {
96106
target = filepath.Join(target, relpath)
97-
if err := copyFile(path, target, 0644); err != nil {
98-
logErrorf("Could not copy file %q to %q:\n\t%v", path, target, err)
107+
dstFd, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
108+
if err != nil {
109+
logErrorf("Could not create %q: %v", target, err)
110+
continue
111+
}
112+
if _, err = srcFd.Seek(0, io.SeekStart); err != nil {
113+
logErrorf("Could not seek file %q: %v", path, err)
114+
continue
115+
}
116+
_, err = io.Copy(dstFd, srcFd)
117+
dstFd.Close()
118+
if err != nil {
119+
logErrorf("Could not copy from %q to %q:\n\t%v", path, target, err)
120+
continue
99121
}
100122
}
101123
}
@@ -106,7 +128,7 @@ func (cr *Cluster) ossSyncFiles(ctx context.Context, files []FileInfo) error {
106128
}
107129
for i := cap(stats.slots); i > 0; i-- {
108130
select {
109-
case <-stats.slots:
131+
case <-done:
110132
case <-ctx.Done():
111133
logWarn("File sync interrupted")
112134
return ctx.Err()

0 commit comments

Comments
 (0)