Skip to content

Add retry to GraphiteSender #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metrics-graphite/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
</parent>

<artifactId>avaje-metrics-graphite</artifactId>
<version>9.4-RC2</version>

<properties>
<surefire.useModulePath>false</surefire.useModulePath>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
import java.io.IOException;
import java.util.List;

import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;
import static java.lang.System.Logger.Level.*;

final class DGraphiteReporter implements GraphiteReporter {

Expand All @@ -30,7 +29,7 @@ public void report() {
}
sender.flush();
} catch (IOException e) {
log.log(ERROR, "Error reporting metrics", e);
log.log(WARNING, "Error reporting metrics", e);
} finally {
try {
sender.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.avaje.metrics.graphite;

import io.avaje.applog.AppLog;
import io.avaje.metrics.*;

import javax.net.SocketFactory;
Expand All @@ -11,13 +12,17 @@
import java.util.List;
import java.util.regex.Pattern;

import static java.lang.System.Logger.Level.INFO;
import static java.lang.System.Logger.Level.WARNING;
import static java.nio.charset.StandardCharsets.UTF_8;

/**
* A client to a Carbon server that sends all metrics after they have been pickled in configurable sized batches
*/
final class DGraphiteSender implements GraphiteSender {

private static final System.Logger log = AppLog.getLogger(GraphiteReporter.class);

/**
* Minimally necessary pickle opcodes.
*/
Expand All @@ -40,7 +45,6 @@ final class DGraphiteSender implements GraphiteSender {
private final String prefix;
private final long timedThreshold;
private Socket socket;
private Writer writer;

DGraphiteSender(InetSocketAddress address, SocketFactory socketFactory, int batchSize, String prefix, long timedThreshold) {
this.address = address;
Expand All @@ -56,7 +60,6 @@ public void connect() throws IOException {
throw new IllegalStateException("Already connected");
}
this.socket = socketFactory.createSocket(address.getAddress(), address.getPort());
this.writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), UTF_8));
}

@Override
Expand Down Expand Up @@ -138,25 +141,27 @@ public void visit(GaugeLong.Stats gauge) {
@Override
public void flush() throws IOException {
writeMetrics();
if (writer != null) {
writer.flush();
}
}

@Override
public void close() throws IOException {
public void close() {
try {
flush();
if (writer != null) {
writer.close();
}
} catch (IOException ex) {
if (socket != null) {
} catch (IOException e) {
log.log(INFO, "Exception flushing metrics {0}", e);
} finally {
closeSocket();
}
}

private void closeSocket() {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
log.log(INFO, "Exception trying to close socket {0}", e);
}
} finally {
this.socket = null;
this.writer = null;
socket = null;
}
}

Expand All @@ -168,13 +173,9 @@ public void close() throws IOException {
private void writeMetrics() throws IOException {
if (!metrics.isEmpty()) {
try {
byte[] payload = pickleMetrics(metrics);
byte[] header = ByteBuffer.allocate(4).putInt(payload.length).array();

OutputStream outputStream = socket.getOutputStream();
outputStream.write(header);
outputStream.write(payload);
outputStream.flush();
final byte[] payload = pickleMetrics(metrics);
final byte[] header = ByteBuffer.allocate(4).putInt(payload.length).array();
send(header, payload);
} finally {
// if there was an error, we might miss some data. for now, drop those on the floor and
// try to keep going.
Expand All @@ -183,6 +184,24 @@ private void writeMetrics() throws IOException {
}
}

private void send(byte[] header, byte[] payload) throws IOException {
try {
sendPayload(header, payload);
} catch (IOException e) {
log.log(WARNING, "Retry sending metrics due to {0}", e);
closeSocket();
this.socket = socketFactory.createSocket(address.getAddress(), address.getPort());
sendPayload(header, payload);
}
}

private void sendPayload(byte[] header, byte[] payload) throws IOException {
OutputStream outputStream = socket.getOutputStream();
outputStream.write(header);
outputStream.write(payload);
outputStream.flush();
}

/**
* See: <a href="http://readthedocs.org/docs/graphite/en/1.0/feeding-carbon.html">feeding-carbon</a>
*/
Expand Down Expand Up @@ -247,10 +266,11 @@ public String sanitize(String string) {
return WHITESPACE.matcher(string.trim()).replaceAll(DASH);
}

static class MetricTuple {
long timestamp;
String value;
String[] names;
static final class MetricTuple {

final long timestamp;
final String value;
final String[] names;

MetricTuple(long timestamp, String value, String... names) {
this.timestamp = timestamp;
Expand Down
Loading