1
1
package io .avaje .metrics .graphite ;
2
2
3
+ import io .avaje .applog .AppLog ;
3
4
import io .avaje .metrics .*;
4
5
5
6
import javax .net .SocketFactory ;
11
12
import java .util .List ;
12
13
import java .util .regex .Pattern ;
13
14
15
+ import static java .lang .System .Logger .Level .INFO ;
16
+ import static java .lang .System .Logger .Level .WARNING ;
14
17
import static java .nio .charset .StandardCharsets .UTF_8 ;
15
18
16
19
/**
17
20
* A client to a Carbon server that sends all metrics after they have been pickled in configurable sized batches
18
21
*/
19
22
final class DGraphiteSender implements GraphiteSender {
20
23
24
+ private static final System .Logger log = AppLog .getLogger (GraphiteReporter .class );
25
+
21
26
/**
22
27
* Minimally necessary pickle opcodes.
23
28
*/
@@ -40,7 +45,6 @@ final class DGraphiteSender implements GraphiteSender {
40
45
private final String prefix ;
41
46
private final long timedThreshold ;
42
47
private Socket socket ;
43
- private Writer writer ;
44
48
45
49
DGraphiteSender (InetSocketAddress address , SocketFactory socketFactory , int batchSize , String prefix , long timedThreshold ) {
46
50
this .address = address ;
@@ -56,7 +60,6 @@ public void connect() throws IOException {
56
60
throw new IllegalStateException ("Already connected" );
57
61
}
58
62
this .socket = socketFactory .createSocket (address .getAddress (), address .getPort ());
59
- this .writer = new BufferedWriter (new OutputStreamWriter (socket .getOutputStream (), UTF_8 ));
60
63
}
61
64
62
65
@ Override
@@ -138,25 +141,27 @@ public void visit(GaugeLong.Stats gauge) {
138
141
@ Override
139
142
public void flush () throws IOException {
140
143
writeMetrics ();
141
- if (writer != null ) {
142
- writer .flush ();
143
- }
144
144
}
145
145
146
146
@ Override
147
- public void close () throws IOException {
147
+ public void close () {
148
148
try {
149
149
flush ();
150
- if (writer != null ) {
151
- writer .close ();
152
- }
153
- } catch (IOException ex ) {
154
- if (socket != null ) {
150
+ } catch (IOException e ) {
151
+ log .log (INFO , "Exception flushing metrics {0}" , e );
152
+ } finally {
153
+ closeSocket ();
154
+ }
155
+ }
156
+
157
+ private void closeSocket () {
158
+ if (socket != null ) {
159
+ try {
155
160
socket .close ();
161
+ } catch (IOException e ) {
162
+ log .log (INFO , "Exception trying to close socket {0}" , e );
156
163
}
157
- } finally {
158
- this .socket = null ;
159
- this .writer = null ;
164
+ socket = null ;
160
165
}
161
166
}
162
167
@@ -168,13 +173,9 @@ public void close() throws IOException {
168
173
private void writeMetrics () throws IOException {
169
174
if (!metrics .isEmpty ()) {
170
175
try {
171
- byte [] payload = pickleMetrics (metrics );
172
- byte [] header = ByteBuffer .allocate (4 ).putInt (payload .length ).array ();
173
-
174
- OutputStream outputStream = socket .getOutputStream ();
175
- outputStream .write (header );
176
- outputStream .write (payload );
177
- outputStream .flush ();
176
+ final byte [] payload = pickleMetrics (metrics );
177
+ final byte [] header = ByteBuffer .allocate (4 ).putInt (payload .length ).array ();
178
+ send (header , payload );
178
179
} finally {
179
180
// if there was an error, we might miss some data. for now, drop those on the floor and
180
181
// try to keep going.
@@ -183,6 +184,24 @@ private void writeMetrics() throws IOException {
183
184
}
184
185
}
185
186
187
+ private void send (byte [] header , byte [] payload ) throws IOException {
188
+ try {
189
+ sendPayload (header , payload );
190
+ } catch (IOException e ) {
191
+ log .log (WARNING , "Retry sending metrics due to {0}" , e );
192
+ closeSocket ();
193
+ this .socket = socketFactory .createSocket (address .getAddress (), address .getPort ());
194
+ sendPayload (header , payload );
195
+ }
196
+ }
197
+
198
+ private void sendPayload (byte [] header , byte [] payload ) throws IOException {
199
+ OutputStream outputStream = socket .getOutputStream ();
200
+ outputStream .write (header );
201
+ outputStream .write (payload );
202
+ outputStream .flush ();
203
+ }
204
+
186
205
/**
187
206
* See: <a href="http://readthedocs.org/docs/graphite/en/1.0/feeding-carbon.html">feeding-carbon</a>
188
207
*/
@@ -247,10 +266,11 @@ public String sanitize(String string) {
247
266
return WHITESPACE .matcher (string .trim ()).replaceAll (DASH );
248
267
}
249
268
250
- static class MetricTuple {
251
- long timestamp ;
252
- String value ;
253
- String [] names ;
269
+ static final class MetricTuple {
270
+
271
+ final long timestamp ;
272
+ final String value ;
273
+ final String [] names ;
254
274
255
275
MetricTuple (long timestamp , String value , String ... names ) {
256
276
this .timestamp = timestamp ;
0 commit comments