1
1
package it .renvins .serverpulse .common ;
2
2
3
3
import java .time .Instant ;
4
+ import java .time .temporal .TemporalField ;
4
5
import java .util .ArrayList ;
5
6
import java .util .List ;
6
7
import java .util .Map ;
7
8
import java .util .concurrent .CompletableFuture ;
8
9
import java .util .concurrent .Executor ;
9
- import java .util .logging .Level ;
10
- import java .util .logging .Logger ;
11
-
12
- import com .influxdb .client .domain .WritePrecision ;
13
- import com .influxdb .client .write .Point ;
14
10
import it .renvins .serverpulse .api .ServerPulseProvider ;
11
+ import it .renvins .serverpulse .api .data .LineProtocolPoint ;
15
12
import it .renvins .serverpulse .api .utils .MemoryUtils ;
16
13
import it .renvins .serverpulse .api .data .SyncMetricsSnapshot ;
17
14
import it .renvins .serverpulse .api .data .WorldData ;
@@ -49,7 +46,7 @@ public void load() {
49
46
50
47
@ Override
51
48
public void collectAndSendMetrics () {
52
- if (!ServerPulseProvider .get ().getDatabaseService ().isConnected () || ServerPulseProvider . get (). getDatabaseService (). getWriteApi () == null ) {
49
+ if (!ServerPulseProvider .get ().getDatabaseService ().isConnected ()) {
53
50
return ;
54
51
}
55
52
if (!ServerPulseProvider .get ().getDatabaseService ().ping ()) {
@@ -77,7 +74,8 @@ public void collectAndSendMetrics() {
77
74
}, asyncExecutor ).thenAcceptAsync (points -> {
78
75
if (!points .isEmpty ()) {
79
76
try {
80
- ServerPulseProvider .get ().getDatabaseService ().getWriteApi ().writePoints (points );
77
+ String body = String .join ("\n " , points );
78
+ ServerPulseProvider .get ().getDatabaseService ().writeLineProtocol (body );
81
79
} catch (Exception e ) {
82
80
logger .error ("Error sending metrics to InfluxDB..." , e );
83
81
}
@@ -131,42 +129,42 @@ private SyncMetricsSnapshot collectSnapshot() {
131
129
* @param usableDisk The usable disk space in bytes.
132
130
* @return A list of InfluxDB points representing the metrics.
133
131
*/
134
- private List <Point > buildPoints (SyncMetricsSnapshot snapshot , long usedHeap , long committedHeap ,
132
+ private List <String > buildPoints (SyncMetricsSnapshot snapshot , long usedHeap , long committedHeap ,
135
133
long totalDisk , long usableDisk , int minPing , int maxPing , int avgPing ) {
136
- List <Point > points = new ArrayList <>();
134
+ List <String > points = new ArrayList <>();
137
135
138
136
String serverTag = configuration .getServerTag ();
139
137
String measurement = configuration .getMeasurementTable ();
140
138
141
- Point generalPoint = Point . measurement (measurement )
142
- .addTag ("server" , serverTag )
143
- .addField ("tps_1m" , snapshot .getTps ()[0 ])
144
- .addField ("tps_5m" , snapshot .getTps ()[1 ])
145
- .addField ("tps_15m" , snapshot .getTps ()[2 ])
146
- .addField ("players_online" , snapshot .getPlayerCount ())
147
- .addField ("used_memory" , usedHeap )
148
- .addField ("available_memory" , committedHeap )
149
- .addField ("total_disk_space" , totalDisk )
150
- .addField ("usable_disk_space" , usableDisk )
151
- .addField ("min_ping" , minPing )
152
- .addField ("max_ping" , maxPing )
153
- .addField ("avg_ping" , avgPing )
154
- . time (Instant .now (), WritePrecision . NS );
139
+ LineProtocolPoint generalPoint = new LineProtocolPoint (measurement )
140
+ .addTag ("server" , serverTag )
141
+ .addField ("tps_1m" , snapshot .getTps ()[0 ])
142
+ .addField ("tps_5m" , snapshot .getTps ()[1 ])
143
+ .addField ("tps_15m" , snapshot .getTps ()[2 ])
144
+ .addField ("players_online" , snapshot .getPlayerCount ())
145
+ .addField ("used_memory" , usedHeap )
146
+ .addField ("available_memory" , committedHeap )
147
+ .addField ("total_disk_space" , totalDisk )
148
+ .addField ("usable_disk_space" , usableDisk )
149
+ .addField ("min_ping" , minPing )
150
+ .addField ("max_ping" , maxPing )
151
+ .addField ("avg_ping" , avgPing )
152
+ . setTimestamp (Instant .now (). getNano () );
155
153
addConfigTags (generalPoint );
156
- points .add (generalPoint );
154
+ points .add (generalPoint . toLineProtocol () );
157
155
158
156
for (Map .Entry <String , WorldData > entry : snapshot .getWorldData ().entrySet ()) {
159
157
String worldName = entry .getKey ();
160
158
WorldData worldData = entry .getValue ();
161
159
162
- Point worldPoint = Point . measurement (measurement )
160
+ LineProtocolPoint worldPoint = new LineProtocolPoint (measurement )
163
161
.addTag ("server" , serverTag )
164
162
.addTag ("world" , worldName )
165
163
.addField ("entities_count" , worldData .getEntities ())
166
164
.addField ("loaded_chunks" , worldData .getLoadedChunks ())
167
- .time (Instant .now (), WritePrecision . NS );
165
+ .setTimestamp (Instant .now (). getNano () );
168
166
addConfigTags (worldPoint );
169
- points .add (worldPoint );
167
+ points .add (worldPoint . toLineProtocol () );
170
168
}
171
169
return points ;
172
170
}
@@ -176,7 +174,7 @@ private List<Point> buildPoints(SyncMetricsSnapshot snapshot, long usedHeap, lon
176
174
*
177
175
* @param point The InfluxDB point to which tags will be added.
178
176
*/
179
- private void addConfigTags (Point point ) {
177
+ private void addConfigTags (LineProtocolPoint point ) {
180
178
Map <String , String > tags = configuration .getTags ();
181
179
tags .forEach (point ::addTag );
182
180
}
0 commit comments