Skip to content

Commit 6947bff

Browse files
committed
fix byte buffer leak
1 parent 7266d9a commit 6947bff

File tree

3 files changed

+65
-62
lines changed

3 files changed

+65
-62
lines changed

src/main/java/com/github/brainlag/nsq/Connection.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class Connection {
4040
private final EventLoopGroup eventLoopGroup;
4141
private final NSQConfig config;
4242

43-
public static final long HEARTBEAT_MAX_INTERVAL = 1L*60L*1000L;//default one minute
43+
public static final long HEARTBEAT_MAX_INTERVAL = 1L * 60L * 1000L;//default one minute
4444
private volatile AtomicReference<Long> lastHeartbeatSuccess = new AtomicReference<Long>(System.currentTimeMillis());
4545

4646

@@ -97,8 +97,8 @@ public boolean isRequestInProgress() {
9797
return requests.size() > 0;
9898
}
9999

100-
public boolean isHeartbeatStatusOK(){
101-
if(System.currentTimeMillis() - lastHeartbeatSuccess.get() > HEARTBEAT_MAX_INTERVAL){
100+
public boolean isHeartbeatStatusOK() {
101+
if (System.currentTimeMillis() - lastHeartbeatSuccess.get() > HEARTBEAT_MAX_INTERVAL) {
102102
return false;
103103
}
104104
return true;

src/main/java/com/github/brainlag/nsq/frames/MessageFrame.java

Lines changed: 39 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,44 +4,46 @@
44
import io.netty.buffer.Unpooled;
55

66
public class MessageFrame extends NSQFrame {
7-
private long timestamp;
8-
private int attempts;
9-
private byte[] messageId = new byte[16];
10-
private byte[] messageBody;
11-
12-
@Override
13-
public void setData(byte[] bytes) {
14-
//parse the bytes
15-
super.setData(bytes);
7+
private long timestamp;
8+
private int attempts;
9+
private byte[] messageId = new byte[16];
10+
private byte[] messageBody;
11+
12+
@Override
13+
public void setData(byte[] bytes) {
14+
//parse the bytes
15+
super.setData(bytes);
1616

1717
ByteBuf buf = Unpooled.wrappedBuffer(bytes);
18-
timestamp = buf.readLong();
19-
attempts = buf.readShort();
20-
buf.readBytes(messageId);
21-
ByteBuf messageBodyBuf = buf.readBytes(buf.readableBytes());
22-
if (messageBodyBuf.hasArray()) {
23-
messageBody = messageBodyBuf.array();
24-
} else {
25-
byte[] array = new byte[messageBodyBuf.readableBytes()];
26-
messageBodyBuf.readBytes(array);
27-
messageBody = array;
28-
}
29-
}
30-
31-
public long getTimestamp() {
32-
return timestamp;
33-
}
34-
35-
public int getAttempts() {
36-
return attempts;
37-
}
38-
39-
public byte[] getMessageId() {
40-
return messageId;
41-
}
42-
43-
public byte[] getMessageBody() {
44-
return messageBody;
45-
}
18+
timestamp = buf.readLong();
19+
attempts = buf.readShort();
20+
buf.readBytes(messageId);
21+
ByteBuf messageBodyBuf = buf.readBytes(buf.readableBytes());
22+
if (messageBodyBuf.hasArray()) {
23+
messageBody = messageBodyBuf.array();
24+
} else {
25+
byte[] array = new byte[messageBodyBuf.readableBytes()];
26+
messageBodyBuf.readBytes(array);
27+
messageBody = array;
28+
}
29+
buf.release();
30+
messageBodyBuf.release();
31+
}
32+
33+
public long getTimestamp() {
34+
return timestamp;
35+
}
36+
37+
public int getAttempts() {
38+
return attempts;
39+
}
40+
41+
public byte[] getMessageId() {
42+
return messageId;
43+
}
44+
45+
public byte[] getMessageBody() {
46+
return messageBody;
47+
}
4648

4749
}

src/main/java/com/github/brainlag/nsq/netty/NSQDecoder.java

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,31 +9,32 @@
99

1010
public class NSQDecoder extends MessageToMessageDecoder<ByteBuf> {
1111

12-
private int size;
13-
private NSQFrame frame;
12+
private int size;
13+
private NSQFrame frame;
1414

15-
public NSQDecoder() {
16-
}
15+
public NSQDecoder() {
16+
}
1717

1818
@Override
1919
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
20-
size = in.readInt();
21-
int id = in.readInt();
22-
frame = NSQFrame.instance(id);
23-
if (frame == null) {
24-
//uhh, bad response from server.. what should we do?
25-
throw new Exception("Bad frame id from server (" + id + "). disconnect!");
26-
}
27-
frame.setSize(size);
28-
ByteBuf bytes = in.readBytes(frame.getSize() - 4); //subtract 4 because the frame id is included
29-
if (bytes.hasArray()) {
30-
frame.setData(bytes.array());
31-
} else {
32-
byte[] array = new byte[bytes.readableBytes()];
33-
bytes.readBytes(array);
34-
frame.setData(array);
35-
}
36-
out.add(frame);
37-
}
20+
size = in.readInt();
21+
int id = in.readInt();
22+
frame = NSQFrame.instance(id);
23+
if (frame == null) {
24+
//uhh, bad response from server.. what should we do?
25+
throw new Exception("Bad frame id from server (" + id + "). disconnect!");
26+
}
27+
frame.setSize(size);
28+
ByteBuf bytes = in.readBytes(frame.getSize() - 4); //subtract 4 because the frame id is included
29+
if (bytes.hasArray()) {
30+
frame.setData(bytes.array());
31+
} else {
32+
byte[] array = new byte[bytes.readableBytes()];
33+
bytes.readBytes(array);
34+
frame.setData(array);
35+
}
36+
out.add(frame);
37+
bytes.release();
38+
}
3839

3940
}

0 commit comments

Comments
 (0)