24
24
package rabbitminer .Cluster .Server ;
25
25
26
26
import Extasys .DataFrame ;
27
+ import Extasys .Encryption .Base64Encryptor ;
27
28
import Extasys .ManualResetEvent ;
28
29
import Extasys .Network .TCP .Server .Listener .Exceptions .ClientIsDisconnectedException ;
29
30
import Extasys .Network .TCP .Server .Listener .Exceptions .OutgoingPacketFailedException ;
30
31
import Extasys .Network .TCP .Server .Listener .TCPClientConnection ;
32
+ import Extasys .Network .TCP .Server .Listener .TCPListener ;
31
33
import java .util .HashMap ;
32
34
import java .util .LinkedHashMap ;
33
35
import rabbitminer .Cluster .ClusterCommunicationCommons ;
45
47
*/
46
48
public class ClusterServer extends Extasys .Network .TCP .Server .ExtasysTCPServer
47
49
{
48
-
50
+
49
51
private final RabbitCluster fMyCluster ;
50
52
private final Object fClientsConnectOrDisconnectLock = new Object ();
51
53
private final HashMap <String , TCPClientConnection > fConnectedClients ;
52
54
private final Thread fPingConnectedClientsThread ;
53
-
55
+
54
56
public ClusterServer (RabbitCluster myCluster , ClusterServerSettings clusterServerSettings )
55
57
{
56
58
super ("" , "" , Computer .getComputerCPUCoresCount (), Computer .getComputerCPUCoresCount () * 2 );
57
- super .AddListener ("" , clusterServerSettings .getIPAddress (), clusterServerSettings .getPort (), 60000 , 10240 , 30000 , 150 , ClusterCommunicationCommons .fETX );
58
-
59
+ TCPListener listener = super .AddListener ("" , clusterServerSettings .getIPAddress (), clusterServerSettings .getPort (), 60000 , 10240 , 30000 , 150 , ClusterCommunicationCommons .fETX );
60
+ listener .setAutoApplyMessageSplitterState (true );
61
+ listener .setConnectionEncryptor (new Base64Encryptor ());
62
+
59
63
fMyCluster = myCluster ;
60
64
fConnectedClients = new HashMap <>();
61
65
@@ -73,22 +77,22 @@ public ClusterServer(RabbitCluster myCluster, ClusterServerSettings clusterServe
73
77
}
74
78
catch (Exception ex )
75
79
{
76
-
80
+
77
81
}
78
82
evt .Reset ();
79
83
}
80
84
});
81
85
fPingConnectedClientsThread .start ();
82
86
}
83
-
87
+
84
88
@ Override
85
89
public void OnDataReceive (TCPClientConnection sender , DataFrame data )
86
90
{
87
91
try
88
92
{
89
93
String incomingStr = new String (data .getBytes (), "UTF-8" );
90
94
String [] parts = incomingStr .split (ClusterCommunicationCommons .fMessageSplitter );
91
-
95
+
92
96
switch (parts [0 ])
93
97
{
94
98
case "LOGIN" :
@@ -106,41 +110,41 @@ public void OnDataReceive(TCPClientConnection sender, DataFrame data)
106
110
sender .setTag (var );
107
111
108
112
// Ειδοποιούμε το Node οτι συνδέθηκε
109
- sender .SendData ("AUTHORIZED" + ClusterCommunicationCommons .fMessageSplitter + ClusterCommunicationCommons . fETX );
113
+ sender .SendData ("AUTHORIZED" + ClusterCommunicationCommons .fMessageSplitter );
110
114
}
111
115
else
112
116
{
113
- sender .SendData ("WRONG_PASSWORD" + ClusterCommunicationCommons .fMessageSplitter + ClusterCommunicationCommons . fETX );
117
+ sender .SendData ("WRONG_PASSWORD" + ClusterCommunicationCommons .fMessageSplitter );
114
118
}
115
119
break ;
116
-
120
+
117
121
case "GET_JOB" :
118
122
if (CheckIfClientIsAuthorized (sender ))
119
123
{
120
124
// Ζήτα απο το Cluster να φτιάξει ένα
121
125
// job για να το δώσουμε στο Node
122
126
StratumJob job = fMyCluster .GiveNodeAJobToDo (sender );
123
-
127
+
124
128
if (job != null )
125
129
{
126
- sender .SendData ("JOB" + ClusterCommunicationCommons .fMessageSplitter + job .toJSON () + ClusterCommunicationCommons . fETX );
130
+ sender .SendData ("JOB" + ClusterCommunicationCommons .fMessageSplitter + job .toJSON ());
127
131
}
128
132
else
129
133
{
130
134
// Δεν υπάρχει Job....
131
- sender .SendData ("NO_JOB" + ClusterCommunicationCommons .fMessageSplitter + ClusterCommunicationCommons . fETX );
135
+ sender .SendData ("NO_JOB" + ClusterCommunicationCommons .fMessageSplitter );
132
136
}
133
137
}
134
138
break ;
135
-
139
+
136
140
case "JOB_SOLVED" :
137
141
if (CheckIfClientIsAuthorized (sender ))
138
142
{
139
143
final String jobID = parts [1 ];
140
144
final String extranonce2 = parts [2 ];
141
145
final String nTime = parts [3 ];
142
146
final String nonce = parts [4 ];
143
-
147
+
144
148
String submitJobStr = "{\" params\" : [\" #WORKER_NAME#\" , \" #JOB_ID#\" , \" #EXTRANONCE_2#\" , \" #NTIME#\" , \" #NONCE#\" ], \" id\" : #STRATUM_MESSAGE_ID#, \" method\" : \" mining.submit\" }" ;
145
149
submitJobStr = submitJobStr .replace ("#WORKER_NAME#" , fMyCluster .getStratumPoolSettings ().getUsername ());
146
150
submitJobStr = submitJobStr .replace ("#JOB_ID#" , jobID );
@@ -158,49 +162,49 @@ public void OnDataReceive(TCPClientConnection sender, DataFrame data)
158
162
fMyCluster .fJobsSubmitted += 1 ;
159
163
}
160
164
break ;
161
-
165
+
162
166
case "JOB_SOLVED_RANDOMX" :
163
167
if (CheckIfClientIsAuthorized (sender ))
164
168
{
165
169
StratumJob_RandomX randomXJobSolved = new StratumJob_RandomX (parts [1 ]);
166
-
170
+
167
171
LinkedHashMap solvedJobParams = new LinkedHashMap ();
168
172
solvedJobParams .put ("id" , Parser_RandomX .fPoolLoginID );
169
173
solvedJobParams .put ("job_id" , randomXJobSolved .getJobID ());
170
174
solvedJobParams .put ("nonce" , randomXJobSolved .getSolution_NonceHexlifyByteArray ());
171
175
solvedJobParams .put ("result" , randomXJobSolved .getSolution_HashHexlifyByteArray ());
172
-
176
+
173
177
LinkedHashMap messageToPool = new LinkedHashMap ();
174
178
messageToPool .put ("id" , 1 );
175
179
messageToPool .put ("jsonrpc" , "2.0" );
176
180
messageToPool .put ("method" , "submit" );
177
181
messageToPool .put ("params" , solvedJobParams );
178
-
182
+
179
183
String dataToSend = JSONSerializer .SerializeObject (messageToPool );
180
-
184
+
181
185
System .err .println (dataToSend );
182
-
186
+
183
187
fMyCluster .getStratumClient ().SendData (dataToSend + "\n " );
184
188
System .out .println ("SOLVED!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" );
185
189
186
190
// Καποιο Node ολοκλήρωσε ενα job με επιτυχία!
187
191
// Στειλε το αποτέλεσμα στον Stratum Server
188
192
fMyCluster .setCurrentStratumJob (null , false );
189
193
}
190
-
194
+
191
195
break ;
192
-
196
+
193
197
case "PONG" :
194
198
if (CheckIfClientIsAuthorized (sender ))
195
199
{
196
-
200
+
197
201
}
198
202
break ;
199
203
}
200
204
}
201
205
catch (Exception ex )
202
206
{
203
-
207
+
204
208
}
205
209
}
206
210
@@ -217,10 +221,10 @@ private boolean CheckIfClientIsAuthorized(TCPClientConnection client)
217
221
NodeTCPConnectionVariables var = (NodeTCPConnectionVariables ) client .getTag ();
218
222
return var .isClientAuthorized ();
219
223
}
220
-
224
+
221
225
return false ;
222
226
}
223
-
227
+
224
228
@ Override
225
229
public void OnClientConnect (TCPClientConnection client )
226
230
{
@@ -232,7 +236,7 @@ public void OnClientConnect(TCPClientConnection client)
232
236
frmClusterControl .ACTIVE_INSTANCE .NodeConnected (client );
233
237
}
234
238
}
235
-
239
+
236
240
@ Override
237
241
public void OnClientDisconnect (TCPClientConnection client )
238
242
{
@@ -247,7 +251,7 @@ public void OnClientDisconnect(TCPClientConnection client)
247
251
}
248
252
}
249
253
}
250
-
254
+
251
255
public void InformClientsToCleanJobs ()
252
256
{
253
257
synchronized (fClientsConnectOrDisconnectLock )
@@ -256,14 +260,14 @@ public void InformClientsToCleanJobs()
256
260
{
257
261
try
258
262
{
259
- client .SendData ("CLEAN_JOBS" + ClusterCommunicationCommons .fMessageSplitter + ClusterCommunicationCommons . fETX );
263
+ client .SendData ("CLEAN_JOBS" + ClusterCommunicationCommons .fMessageSplitter );
260
264
}
261
265
catch (ClientIsDisconnectedException | OutgoingPacketFailedException ex )
262
266
{
263
267
}
264
268
});
265
269
}
266
-
270
+
267
271
}
268
272
269
273
/**
@@ -277,21 +281,21 @@ private void PingClients()
277
281
{
278
282
try
279
283
{
280
- con .SendData ("PING" + ClusterCommunicationCommons .fMessageSplitter + ClusterCommunicationCommons . fETX );
284
+ con .SendData ("PING" + ClusterCommunicationCommons .fMessageSplitter );
281
285
}
282
286
catch (ClientIsDisconnectedException | OutgoingPacketFailedException ex )
283
287
{
284
-
288
+
285
289
}
286
290
});
287
291
}
288
292
}
289
-
293
+
290
294
public HashMap <String , TCPClientConnection > getConnectedClients ()
291
295
{
292
296
return fConnectedClients ;
293
297
}
294
-
298
+
295
299
public void ClearRangesFromClients ()
296
300
{
297
301
synchronized (fClientsConnectOrDisconnectLock )
@@ -304,10 +308,10 @@ public void ClearRangesFromClients()
304
308
}
305
309
catch (Exception ex )
306
310
{
307
-
311
+
308
312
}
309
313
});
310
314
}
311
315
}
312
-
316
+
313
317
}
0 commit comments