Skip to content
This repository was archived by the owner on Dec 20, 2024. It is now read-only.

Commit 0b48f55

Browse files
committed
Merge pull request #1375 from lowzj/fix-ByteBuffer-gc
optimize: release allocated ByteBuffer explicitly
2 parents 129d562 + 0122fc9 commit 0b48f55

12 files changed

+487
-31
lines changed

dfget/core/downloader/p2p_downloader/client_stream_writer_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
package downloader
1818

1919
import (
20-
"bytes"
2120
"io"
2221
"sort"
2322

2423
"github.com/dragonflyoss/Dragonfly/dfget/config"
24+
"github.com/dragonflyoss/Dragonfly/pkg/pool"
2525

2626
"github.com/go-check/check"
2727
)
@@ -49,7 +49,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
4949
piece: &Piece{
5050
PieceNum: 0,
5151
PieceSize: 6,
52-
Content: bytes.NewBufferString("000010"),
52+
Content: pool.NewBufferString("000010"),
5353
},
5454
noWrapper: false,
5555
expected: "1",
@@ -58,7 +58,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
5858
piece: &Piece{
5959
PieceNum: 1,
6060
PieceSize: 6,
61-
Content: bytes.NewBufferString("000020"),
61+
Content: pool.NewBufferString("000020"),
6262
},
6363
noWrapper: false,
6464
expected: "2",
@@ -67,7 +67,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
6767
piece: &Piece{
6868
PieceNum: 3,
6969
PieceSize: 6,
70-
Content: bytes.NewBufferString("000040"),
70+
Content: pool.NewBufferString("000040"),
7171
},
7272
noWrapper: false,
7373
expected: "4",
@@ -76,7 +76,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
7676
piece: &Piece{
7777
PieceNum: 4,
7878
PieceSize: 6,
79-
Content: bytes.NewBufferString("000050"),
79+
Content: pool.NewBufferString("000050"),
8080
},
8181
noWrapper: false,
8282
expected: "5",
@@ -85,7 +85,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
8585
piece: &Piece{
8686
PieceNum: 2,
8787
PieceSize: 6,
88-
Content: bytes.NewBufferString("000030"),
88+
Content: pool.NewBufferString("000030"),
8989
},
9090
noWrapper: false,
9191
expected: "3",

dfget/core/downloader/p2p_downloader/client_writer.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package downloader
1818

1919
import (
20-
"bufio"
2120
"context"
2221
"io"
2322
"math/rand"
@@ -31,6 +30,7 @@ import (
3130
"github.com/dragonflyoss/Dragonfly/dfget/core/helper"
3231
"github.com/dragonflyoss/Dragonfly/dfget/types"
3332
"github.com/dragonflyoss/Dragonfly/pkg/fileutils"
33+
"github.com/dragonflyoss/Dragonfly/pkg/pool"
3434
"github.com/dragonflyoss/Dragonfly/pkg/queue"
3535

3636
"github.com/sirupsen/logrus"
@@ -244,9 +244,10 @@ func writePieceToFile(piece *Piece, file *os.File, cdnSource apiTypes.CdnSource)
244244
return err
245245
}
246246

247-
buf := bufio.NewWriterSize(file, 4*1024*1024)
248-
_, err := io.Copy(buf, piece.RawContent(noWrapper))
249-
buf.Flush()
247+
writer := pool.AcquireWriter(file)
248+
_, err := io.Copy(writer, piece.RawContent(noWrapper))
249+
pool.ReleaseWriter(writer)
250+
writer = nil
250251
return err
251252
}
252253

dfget/core/downloader/p2p_downloader/client_writer_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
package downloader
1818

1919
import (
20-
"bytes"
2120
"fmt"
2221
"io/ioutil"
2322
"os"
2423
"path/filepath"
2524

2625
apiTypes "github.com/dragonflyoss/Dragonfly/apis/types"
2726
"github.com/dragonflyoss/Dragonfly/pkg/fileutils"
27+
"github.com/dragonflyoss/Dragonfly/pkg/pool"
2828

2929
"github.com/go-check/check"
3030
)
@@ -63,7 +63,7 @@ func (s *ClientWriterTestSuite) TestWrite(c *check.C) {
6363
piece: &Piece{
6464
PieceNum: 0,
6565
PieceSize: 6,
66-
Content: bytes.NewBufferString("000010"),
66+
Content: pool.NewBufferString("000010"),
6767
},
6868
cdnSource: apiTypes.CdnSourceSupernode,
6969
expected: "1",
@@ -72,7 +72,7 @@ func (s *ClientWriterTestSuite) TestWrite(c *check.C) {
7272
piece: &Piece{
7373
PieceNum: 1,
7474
PieceSize: 6,
75-
Content: bytes.NewBufferString("000020"),
75+
Content: pool.NewBufferString("000020"),
7676
},
7777
cdnSource: apiTypes.CdnSourceSupernode,
7878
expected: "2",
@@ -81,7 +81,7 @@ func (s *ClientWriterTestSuite) TestWrite(c *check.C) {
8181
piece: &Piece{
8282
PieceNum: 1,
8383
PieceSize: 6,
84-
Content: bytes.NewBufferString("000030"),
84+
Content: pool.NewBufferString("000030"),
8585
},
8686
cdnSource: apiTypes.CdnSourceSource,
8787
expected: "000030",

dfget/core/downloader/p2p_downloader/p2p_downloader.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package downloader
1818

1919
import (
20-
"bytes"
2120
"context"
2221
"fmt"
2322
"io"
@@ -212,7 +211,7 @@ func (p2p *P2PDownloader) run(ctx context.Context, pieceWriter PieceWriter) erro
212211
logrus.Infof("downloading piece:%v", lastItem)
213212

214213
curItem := *lastItem
215-
curItem.Content = &bytes.Buffer{}
214+
curItem.Content = nil
216215
lastItem = nil
217216

218217
response, err := p2p.pullPieceTask(&curItem)
@@ -412,7 +411,7 @@ func (p2p *P2PDownloader) getItem(latestItem *Piece) (bool, *Piece) {
412411
}
413412
if !v && (item.Result == constants.ResultSemiSuc ||
414413
item.Result == constants.ResultSuc) {
415-
p2p.total += int64(item.Content.Len())
414+
p2p.total += item.ContentLength()
416415
p2p.pieceSet[item.Range] = true
417416
} else if !v {
418417
delete(p2p.pieceSet, item.Range)

dfget/core/downloader/p2p_downloader/piece.go

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
apiTypes "github.com/dragonflyoss/Dragonfly/apis/types"
2424
"github.com/dragonflyoss/Dragonfly/pkg/constants"
25+
"github.com/dragonflyoss/Dragonfly/pkg/pool"
2526
)
2627

2728
// Piece contains all information of a piece.
@@ -51,14 +52,25 @@ type Piece struct {
5152
PieceNum int `json:"pieceNum"`
5253

5354
// Content uses a buffer to temporarily store the piece content.
54-
Content *bytes.Buffer `json:"-"`
55+
Content *pool.Buffer `json:"-"`
56+
57+
// length the length of the content.
58+
length int64
59+
60+
// autoReset automatically reset content after reading.
61+
autoReset bool
5562
}
5663

5764
// RawContent returns raw contents,
5865
// If the piece has wrapper, and the piece content will remove the head and tail.
5966
func (p *Piece) RawContent(noWrapper bool) *bytes.Buffer {
6067
contents := p.Content.Bytes()
6168
length := len(contents)
69+
defer func() {
70+
if p.autoReset {
71+
p.ResetContent()
72+
}
73+
}()
6274

6375
if noWrapper {
6476
return bytes.NewBuffer(contents[:])
@@ -69,13 +81,33 @@ func (p *Piece) RawContent(noWrapper bool) *bytes.Buffer {
6981
return nil
7082
}
7183

84+
// ContentLength returns the content length.
85+
func (p *Piece) ContentLength() int64 {
86+
if p.length <= 0 && p.Content != nil {
87+
p.length = int64(p.Content.Len())
88+
}
89+
return p.length
90+
}
91+
7292
func (p *Piece) String() string {
7393
if b, e := json.Marshal(p); e == nil {
7494
return string(b)
7595
}
7696
return ""
7797
}
7898

99+
// ResetContent reset contents and returns it back to buffer pool.
100+
func (p *Piece) ResetContent() {
101+
if p.Content == nil {
102+
return
103+
}
104+
if p.length == 0 {
105+
p.length = int64(p.Content.Len())
106+
}
107+
pool.ReleaseBuffer(p.Content)
108+
p.Content = nil
109+
}
110+
79111
// NewPiece creates a Piece.
80112
func NewPiece(taskID, node, dstCid, pieceRange string, result, status int, cdnSource apiTypes.CdnSource) *Piece {
81113
return &Piece{
@@ -85,7 +117,8 @@ func NewPiece(taskID, node, dstCid, pieceRange string, result, status int, cdnSo
85117
Range: pieceRange,
86118
Result: result,
87119
Status: status,
88-
Content: &bytes.Buffer{},
120+
Content: nil,
121+
autoReset: true,
89122
}
90123
}
91124

@@ -96,16 +129,14 @@ func NewPieceSimple(taskID string, node string, status int, cdnSource apiTypes.C
96129
SuperNode: node,
97130
Status: status,
98131
Result: constants.ResultInvalid,
99-
Content: &bytes.Buffer{},
132+
Content: nil,
133+
autoReset: true,
100134
}
101135
}
102136

103137
// NewPieceContent creates a Piece with specified content.
104138
func NewPieceContent(taskID, node, dstCid, pieceRange string,
105-
result, status int, contents *bytes.Buffer, cdnSource apiTypes.CdnSource) *Piece {
106-
if contents == nil {
107-
contents = &bytes.Buffer{}
108-
}
139+
result, status int, contents *pool.Buffer, cdnSource apiTypes.CdnSource) *Piece {
109140
return &Piece{
110141
TaskID: taskID,
111142
SuperNode: node,
@@ -114,5 +145,7 @@ func NewPieceContent(taskID, node, dstCid, pieceRange string,
114145
Result: result,
115146
Status: status,
116147
Content: contents,
148+
length: int64(contents.Len()),
149+
autoReset: true,
117150
}
118151
}

dfget/core/downloader/p2p_downloader/piece_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"bytes"
2121

2222
"github.com/go-check/check"
23+
24+
"github.com/dragonflyoss/Dragonfly/pkg/pool"
2325
)
2426

2527
type PieceTestSuite struct {
@@ -35,9 +37,9 @@ func (s *PieceTestSuite) TestRawContent(c *check.C) {
3537
noWrapper bool
3638
expected *bytes.Buffer
3739
}{
38-
{piece: &Piece{Content: bytes.NewBufferString("")}, noWrapper: false, expected: nil},
39-
{piece: &Piece{Content: bytes.NewBufferString("000010")}, noWrapper: false, expected: bytes.NewBufferString("1")},
40-
{piece: &Piece{Content: bytes.NewBufferString("000020")}, noWrapper: true, expected: bytes.NewBufferString("000020")},
40+
{piece: &Piece{Content: pool.NewBufferString("")}, noWrapper: false, expected: nil},
41+
{piece: &Piece{Content: pool.NewBufferString("000010")}, noWrapper: false, expected: bytes.NewBufferString("1")},
42+
{piece: &Piece{Content: pool.NewBufferString("000020")}, noWrapper: true, expected: bytes.NewBufferString("000020")},
4143
}
4244

4345
for _, v := range cases {

dfget/core/downloader/p2p_downloader/power_client.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
3434
"github.com/dragonflyoss/Dragonfly/pkg/limitreader"
3535
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
36+
"github.com/dragonflyoss/Dragonfly/pkg/pool"
3637
"github.com/dragonflyoss/Dragonfly/pkg/queue"
3738
"github.com/dragonflyoss/Dragonfly/pkg/ratelimiter"
3839

@@ -113,7 +114,7 @@ func (pc *PowerClient) ClientError() *types.ClientErrorRequest {
113114
return pc.clientError
114115
}
115116

116-
func (pc *PowerClient) downloadPiece() (content *bytes.Buffer, e error) {
117+
func (pc *PowerClient) downloadPiece() (content *pool.Buffer, e error) {
117118
dstIP := pc.pieceTask.PeerIP
118119
peerPort := pc.pieceTask.PeerPort
119120

@@ -149,7 +150,14 @@ func (pc *PowerClient) downloadPiece() (content *bytes.Buffer, e error) {
149150
// start to read data from resp
150151
// use limitReader to limit the download speed
151152
limitReader := limitreader.NewLimitReaderWithLimiter(pc.rateLimiter, resp.Body, pieceMD5 != "")
152-
content = &bytes.Buffer{}
153+
content = pool.AcquireBufferSize(int(pc.pieceTask.PieceSize))
154+
defer func() {
155+
// if an error happened, the content cannot be released outside.
156+
if e != nil {
157+
pool.ReleaseBuffer(content)
158+
content = nil
159+
}
160+
}()
153161
if pc.total, e = content.ReadFrom(limitReader); e != nil {
154162
return nil, e
155163
}
@@ -193,7 +201,7 @@ func (pc *PowerClient) createDownloadRequest() *api.DownloadRequest {
193201
}
194202
}
195203

196-
func (pc *PowerClient) successPiece(content *bytes.Buffer) *Piece {
204+
func (pc *PowerClient) successPiece(content *pool.Buffer) *Piece {
197205
piece := NewPieceContent(pc.taskID, pc.node, pc.pieceTask.Cid, pc.pieceTask.Range,
198206
constants.ResultSemiSuc, constants.TaskStatusRunning, content, pc.cdnDource)
199207
piece.PieceSize = pc.pieceTask.PieceSize

dfget/core/downloader/p2p_downloader/power_client_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ func (s *PowerClientTestSuite) TestDownloadPiece(c *check.C) {
136136
return resp, nil
137137
}
138138
content, err = s.powerClient.downloadPiece()
139-
c.Check(content, check.DeepEquals, bytes.NewBufferString("hello"))
139+
c.Check(content, check.NotNil)
140+
c.Check(content.String(), check.Equals, "hello")
140141
c.Check(err, check.IsNil)
141142
}
142143

0 commit comments

Comments
 (0)