1
1
package it .renvins .serverpulse .common ;
2
2
3
- import java .net .SocketTimeoutException ;
4
3
import java .net .URI ;
5
4
import java .net .http .HttpClient ;
6
5
import java .net .http .HttpRequest ;
7
6
import java .net .http .HttpResponse ;
7
+ import java .nio .charset .StandardCharsets ;
8
8
import java .time .Duration ;
9
+ import java .util .concurrent .CompletableFuture ;
9
10
10
- import com .influxdb .client .InfluxDBClient ;
11
- import com .influxdb .client .InfluxDBClientFactory ;
12
- import com .influxdb .client .WriteApi ;
13
11
import it .renvins .serverpulse .api .service .IDatabaseService ;
14
12
import it .renvins .serverpulse .common .config .DatabaseConfiguration ;
15
13
import it .renvins .serverpulse .common .logger .PulseLogger ;
16
14
import it .renvins .serverpulse .common .platform .Platform ;
17
15
import it .renvins .serverpulse .common .scheduler .Task ;
18
16
import it .renvins .serverpulse .common .scheduler .TaskScheduler ;
19
- import lombok .Getter ;
20
17
21
18
public class DatabaseService implements IDatabaseService {
22
19
@@ -28,9 +25,6 @@ public class DatabaseService implements IDatabaseService {
28
25
29
26
private HttpClient httpClient ; // Keep for ping
30
27
31
- @ Getter private InfluxDBClient client ;
32
- @ Getter private WriteApi writeApi ;
33
-
34
28
private volatile Task retryTask ; // volatile for visibility across threads
35
29
36
30
private final int MAX_RETRIES = 5 ;
@@ -40,12 +34,15 @@ public class DatabaseService implements IDatabaseService {
40
34
private volatile boolean isConnected = false ;
41
35
private volatile int retryCount = 0 ;
42
36
37
+ //HTTP API endpoints
38
+ private String pingUrl ;
39
+ private String writeUrl ;
40
+
43
41
public DatabaseService (PulseLogger logger , Platform platform , DatabaseConfiguration configuration , TaskScheduler scheduler ) {
44
42
this .logger = logger ;
45
43
this .platform = platform ;
46
44
47
45
this .configuration = configuration ;
48
-
49
46
this .scheduler = scheduler ;
50
47
51
48
this .httpClient = HttpClient .newBuilder ()
@@ -61,6 +58,17 @@ public void load() {
61
58
return ;
62
59
}
63
60
logger .info ("Connecting to InfluxDB..." );
61
+
62
+ // Initialize the HttpClient for ping
63
+ String baseUrl = configuration .getHost ();
64
+ if (!baseUrl .endsWith ("/" )) {
65
+ baseUrl += "/" ;
66
+ }
67
+
68
+ this .pingUrl = baseUrl + "ping" ;
69
+ this .writeUrl = baseUrl + "api/v2/write?org=" + configuration .getOrg () +
70
+ "&bucket=" + configuration .getBucket () + "&precision=ns" ;
71
+
64
72
scheduler .runAsync (this ::connect );
65
73
}
66
74
@@ -75,44 +83,57 @@ public void unload() {
75
83
}
76
84
77
85
@ Override
78
- public boolean ping () {
79
- String url = configuration .getHost ();
80
- if (url == null || url .isEmpty ()) {
81
- logger .error ("InfluxDB URL is missing for ping..." );
82
- return false ;
86
+ public CompletableFuture <Boolean > writeLineProtocol (String lineProtocol ) {
87
+ if (!isConnected ) {
88
+ return CompletableFuture .completedFuture (false );
83
89
}
84
90
85
- // Ensure httpClient is initialized
86
- if (this .httpClient == null ) {
87
- logger .error ("HttpClient not initialized for ping..." );
88
- this .httpClient = HttpClient .newBuilder ().connectTimeout (Duration .ofSeconds (10 )).build ();
89
- }
90
-
91
- HttpRequest request ;
92
91
try {
93
- String pingUrl = url .endsWith ("/" ) ? url + "ping" : url + "/ping" ;
94
- request = HttpRequest .newBuilder ()
95
- .uri (URI .create (pingUrl ))
96
- .GET ()
97
- .timeout (Duration .ofSeconds (5 )) // Add timeout specific to ping
98
- .build ();
99
- } catch (IllegalArgumentException e ) {
100
- logger .error ("Invalid InfluxDB URL format for ping: " + url , e );
101
- return false ;
92
+ HttpRequest request = HttpRequest .newBuilder ()
93
+ .uri (URI .create (writeUrl ))
94
+ .header ("Authorization" , "Token " + configuration .getToken ())
95
+ .header ("Content-Type" , "text/plain; charset=utf-8" )
96
+ .POST (HttpRequest .BodyPublishers .ofString (lineProtocol , StandardCharsets .UTF_8 ))
97
+ .timeout (Duration .ofSeconds (10 ))
98
+ .build ();
99
+
100
+ return httpClient .sendAsync (request , HttpResponse .BodyHandlers .ofString ())
101
+ .thenApply (response -> {
102
+ if (response .statusCode () == 204 ) {
103
+ return true ;
104
+ } else {
105
+ logger .error ("Failed to write to InfluxDB: " + response .statusCode () + " - " + response .body ());
106
+ return false ;
107
+ }
108
+ })
109
+ .exceptionally (throwable -> {
110
+ logger .error ("Error writing to InfluxDB: " + throwable .getMessage ());
111
+ if (throwable .getCause () instanceof java .net .ConnectException ) {
112
+ // Connection lost, trigger reconnection
113
+ this .isConnected = false ;
114
+ startRetryTaskIfNeeded ();
115
+ }
116
+ return false ;
117
+ });
118
+ } catch (Exception e ) {
119
+ logger .error ("Failed to create write request: " + e .getMessage ());
120
+ return CompletableFuture .completedFuture (false );
102
121
}
122
+ }
103
123
124
+ @ Override
125
+ public boolean ping () {
104
126
try {
105
- HttpResponse <Void > response = this .httpClient .send (request , HttpResponse .BodyHandlers .discarding ());
127
+ HttpRequest request = HttpRequest .newBuilder ()
128
+ .uri (URI .create (pingUrl ))
129
+ .GET ()
130
+ .timeout (Duration .ofSeconds (5 ))
131
+ .build ();
132
+
133
+ HttpResponse <Void > response = httpClient .send (request , HttpResponse .BodyHandlers .discarding ());
106
134
return response .statusCode () == 204 ;
107
- } catch (java .net .ConnectException | java .net .UnknownHostException e ) {
108
- logger .warning ("InfluxDB service is offline..." );
109
- return false ;
110
- } catch (
111
- SocketTimeoutException e ) {
112
- logger .warning ("InfluxDB ping timed out: " + e .getMessage ());
113
- return false ;
114
135
} catch (Exception e ) {
115
- logger .error ( "Error during InfluxDB ping: " + e .getMessage (), e );
136
+ logger .warning ( " InfluxDB ping failed : " + e .getMessage ());
116
137
return false ;
117
138
}
118
139
}
@@ -123,84 +144,6 @@ public boolean isConnected() {
123
144
return this .isConnected ;
124
145
}
125
146
126
- /**
127
- * Attempts to connect to InfluxDB. Updates the internal connection status
128
- * and starts the retry task if connection fails.
129
- * Should be run asynchronously.
130
- */
131
- private void connect () {
132
- // If already connected, don't try again unless forced (e.g., by retry task)
133
- // Note: This check might prevent the retry task from running if isConnected is true
134
- // but the connection actually dropped without detection. We rely on ping() inside here.
135
-
136
- // Ensure previous resources are closed if attempting a new connection
137
- // This might be needed if connect() is called manually or by retry.
138
- disconnect ();
139
-
140
- String url = configuration .getHost ();
141
- String token = configuration .getToken ();
142
- String org = configuration .getOrg ();
143
- String bucket = configuration .getBucket ();
144
-
145
- try {
146
- logger .info ("Attempting to connect to InfluxDB at " + url );
147
- client = InfluxDBClientFactory .create (url , token .toCharArray (), org , bucket );
148
-
149
- // Ping immediately after creating client to verify reachability & auth
150
- boolean isPingSuccessful = ping (); // Use the internal ping method
151
-
152
- if (isPingSuccessful ) {
153
- writeApi = client .makeWriteApi (); // Initialize Write API
154
-
155
- this .isConnected = true ;
156
- this .retryCount = 0 ; // Reset retry count on successful connection
157
-
158
- stopRetryTask (); // Stop retrying if we just connected
159
- logger .info ("Successfully connected to InfluxDB and ping successful..." );
160
- } else {
161
- // Ping failed after client creation
162
- logger .warning ("Created InfluxDB instance, but ping failed. Will retry..." );
163
- this .isConnected = false ; // Ensure status is false
164
-
165
- if (client != null ) {
166
- client .close (); // Close the client if ping failed
167
- client = null ;
168
- }
169
- startRetryTaskIfNeeded (); // Start retry task
170
- }
171
- } catch (Exception e ) {
172
- // Handle exceptions during InfluxDBClientFactory.create() or ping()
173
- logger .error ("Failed to connect or ping InfluxDB: " + e .getMessage ());
174
- this .isConnected = false ;
175
- if (client != null ) { // Ensure client is closed on exception
176
- client .close ();
177
- client = null ;
178
- }
179
- startRetryTaskIfNeeded (); // Start retry task
180
- }
181
- }
182
-
183
- @ Override
184
- public void disconnect () {
185
- if (writeApi != null ) {
186
- try {
187
- writeApi .close ();
188
- } catch (Exception e ) {
189
- logger .error ("Error closing InfluxDB WriteApi..." , e );
190
- }
191
- writeApi = null ;
192
- }
193
- if (client != null ) {
194
- try {
195
- client .close ();
196
- } catch (Exception e ) {
197
- logger .error ("Error closing InfluxDB Client..." , e );
198
- }
199
- client = null ;
200
- }
201
- this .isConnected = false ;
202
- }
203
-
204
147
205
148
@ Override
206
149
public synchronized void startRetryTaskIfNeeded () {
@@ -248,6 +191,42 @@ public synchronized void startRetryTaskIfNeeded() {
248
191
}
249
192
250
193
194
+ /**
195
+ * Attempts to connect to InfluxDB. Updates the internal connection status
196
+ * and starts the retry task if connection fails.
197
+ * Should be run asynchronously.
198
+ */
199
+ private void connect () {
200
+ disconnect ();
201
+
202
+ try {
203
+ logger .info ("Attempting to connect to InfluxDB via HTTP API..." );
204
+ boolean isPingSuccessful = ping (); // Use the internal ping method
205
+
206
+ if (isPingSuccessful ) {
207
+ this .isConnected = true ;
208
+ this .retryCount = 0 ; // Reset retry count on successful connection
209
+
210
+ stopRetryTask (); // Stop retrying if we just connected
211
+ logger .info ("Successfully connected to InfluxDB and ping successful..." );
212
+ } else {
213
+ // Ping failed after client creation
214
+ logger .warning ("Created InfluxDB instance, but ping failed. Will retry..." );
215
+ this .isConnected = false ; // Ensure status is false
216
+ startRetryTaskIfNeeded (); // Start retry task
217
+ }
218
+ } catch (Exception e ) {
219
+ // Handle exceptions during InfluxDBClientFactory.create() or ping()
220
+ logger .error ("Failed to connect or ping InfluxDB: " + e .getMessage ());
221
+ this .isConnected = false ;
222
+ startRetryTaskIfNeeded (); // Start retry task
223
+ }
224
+ }
225
+
226
+ private void disconnect () {
227
+ this .isConnected = false ;
228
+ }
229
+
251
230
/** Stops and nullifies the retry task if it's running. */
252
231
private synchronized void stopRetryTask () {
253
232
if (retryTask != null ) {
@@ -262,7 +241,6 @@ private synchronized void stopRetryTask() {
262
241
}
263
242
}
264
243
265
-
266
244
/**
267
245
* Checks if the essential connection data is present in the config.
268
246
* @return true if data seems present, false otherwise.
0 commit comments