Skip to content

Commit 127f9f4

Browse files
committed
Fix #14 Avoid blocking socket reads to infinitely block threads created by Scalive in remote process
1 parent 5ead107 commit 127f9f4

File tree

8 files changed

+99
-53
lines changed

8 files changed

+99
-53
lines changed

src/main/java/scalive/Net.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
11
package scalive;
22

3+
import java.io.IOException;
34
import java.net.InetAddress;
45
import java.net.ServerSocket;
6+
import java.net.Socket;
7+
import java.net.SocketException;
8+
import java.net.SocketTimeoutException;
59
import java.net.UnknownHostException;
10+
import java.util.concurrent.TimeUnit;
611

712
public class Net {
13+
// After this time, the REPL and completer connections should be closed,
14+
// to avoid blocking socket reads to infinitely block threads created by Scalive in remote process
15+
private static final int LONG_INACTIVITY = (int) TimeUnit.HOURS.toMillis(1);
16+
817
public static final InetAddress LOCALHOST = getLocalHostAddress();
918

1019
public static int getLocalFreePort() throws Exception {
@@ -14,6 +23,33 @@ public static int getLocalFreePort() throws Exception {
1423
return port;
1524
}
1625

26+
/**
27+
* {@link SocketTimeoutException} will be thrown if there's no activity for a long time.
28+
* This is to avoid blocking reads to block threads infinitely, causing leaks in the remote process.
29+
*/
30+
public static void throwSocketTimeoutExceptionForLongInactivity(Socket socket) throws SocketException {
31+
socket.setSoTimeout(LONG_INACTIVITY);
32+
}
33+
34+
/**
35+
* Use socket closing as a way to notify/cleanup socket blocking read.
36+
* The sockets are closed in the order they are given.
37+
*/
38+
public static Runnable getSocketCleaner(final Socket... sockets) {
39+
return new Runnable() {
40+
@Override
41+
public void run() {
42+
for (Socket socket : sockets) {
43+
try {
44+
socket.close();
45+
} catch (IOException e) {
46+
// Ignore
47+
}
48+
}
49+
}
50+
};
51+
}
52+
1753
private static InetAddress getLocalHostAddress() {
1854
try {
1955
return InetAddress.getByAddress(new byte[] {127, 0, 0, 1});

src/main/java/scalive/client/AgentLoader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ private static int loadAgent(String jarSearchDirs, String pid) throws Exception
8383
final int port = Net.getLocalFreePort();
8484

8585
vm.loadAgent(agentJar, jarSearchDirs + " " + port);
86-
Runtime.getRuntime().addShutdownHook(new Thread() {
86+
Runtime.getRuntime().addShutdownHook(new Thread(AgentLoader.class.getName() + "-ShutdownHook") {
8787
@Override
8888
public void run() {
8989
try {

src/main/java/scalive/client/Client.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import scalive.Log;
66
import scalive.Net;
77

8-
import java.io.IOException;
98
import java.net.Socket;
109

1110
class Client {
@@ -14,17 +13,12 @@ static void run(int port) throws Exception {
1413
final Socket replSocket = new Socket(Net.LOCALHOST, port);
1514
final Socket completerSocket = new Socket(Net.LOCALHOST, port);
1615

17-
// Try to notify the remote process to clean up when the client
18-
// is suddenly terminated
19-
Runtime.getRuntime().addShutdownHook(new Thread() {
16+
// Try to notify the remote process to clean up when the client is terminated
17+
final Runnable socketCleaner = Net.getSocketCleaner(replSocket, completerSocket);
18+
Runtime.getRuntime().addShutdownHook(new Thread(Client.class.getName() + "-ShutdownHook") {
2019
@Override
2120
public void run() {
22-
try {
23-
replSocket.close();
24-
completerSocket.close();
25-
} catch (IOException e) {
26-
throw new RuntimeException(e);
27-
}
21+
socketCleaner.run();
2822
}
2923
});
3024

src/main/java/scalive/client/Repl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ static void run(Socket socket, final ConsoleReader reader) throws IOException {
1414
final InputStream in = socket.getInputStream();
1515
final OutputStream out = socket.getOutputStream();
1616

17-
new Thread(new Runnable() {
17+
new Thread(Repl.class.getName() + "-printServerOutput") {
1818
@Override
1919
public void run() {
2020
try {
@@ -23,7 +23,7 @@ public void run() {
2323
throw new RuntimeException(e);
2424
}
2525
}
26-
}).start();
26+
}.start();
2727

2828
readLocalInput(reader, out);
2929
}

src/main/java/scalive/server/Agent.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package scalive.server;
22

3-
import scalive.Log;
4-
53
import java.io.File;
64
import java.io.IOException;
75
import java.lang.instrument.Instrumentation;
@@ -35,16 +33,15 @@ public static void agentmain(String agentArgs, Instrumentation inst) throws IOEx
3533
// - The server is blocking for connections
3634
// - VirtualMachine#loadAgent at the client does not return until this agentmain method returns
3735
// - The client only connects to the server after VirtualMachine#loadAgent returns
38-
new Thread(new Runnable() {
36+
new Thread(Agent.class.getName() + "-Server") {
3937
@Override
4038
public void run() {
4139
try {
4240
Server.run(serverSocket, jarSearchDirs);
4341
} catch (Exception e) {
4442
throw new RuntimeException(e);
4543
}
46-
Log.log("Closed");
4744
}
48-
}).start();
45+
}.start();
4946
}
5047
}

src/main/java/scalive/server/Completer.java

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import scala.tools.nsc.interpreter.Completion.Candidates;
77

88
import scalive.Log;
9+
import scalive.Net;
910

1011
import java.io.BufferedReader;
1112
import java.io.IOException;
@@ -18,38 +19,52 @@
1819
* @see scalive.client.Completer
1920
*/
2021
class Completer {
21-
static void run(Socket socket, ILoopWithCompletion iloop) throws IOException {
22+
static void run(
23+
Socket socket, ILoopWithCompletion iloop, Runnable socketCleaner
24+
) throws IOException, InterruptedException {
2225
InputStream in = socket.getInputStream();
2326
OutputStream out = socket.getOutputStream();
2427

2528
BufferedReader reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
2629

27-
while (true) {
28-
String line = reader.readLine();
29-
if (line == null) break;
30+
Net.throwSocketTimeoutExceptionForLongInactivity(socket);
31+
try {
32+
while (true) {
33+
// See throwSocketTimeoutExceptionForLongInactivity above
34+
String line = reader.readLine();
3035

31-
int idx = line.indexOf(" ");
32-
String cursorString = line.substring(0, idx);
33-
int cursor = Integer.parseInt(cursorString);
34-
String buffer = line.substring(idx + 1);
36+
// Socket closed
37+
if (line == null) break;
3538

36-
Completion completion = getCompletion(iloop);
37-
Candidates candidates = completion.completer().complete(buffer, cursor);
39+
int idx = line.indexOf(" ");
40+
String cursorString = line.substring(0, idx);
41+
int cursor = Integer.parseInt(cursorString);
42+
String buffer = line.substring(idx + 1);
3843

39-
out.write(("" + candidates.cursor()).getBytes("UTF-8"));
44+
Completion completion = getCompletion(iloop);
45+
Candidates candidates = completion.completer().complete(buffer, cursor);
4046

41-
List<String> list = candidates.candidates();
42-
Iterator<String> it = list.iterator();
43-
while (it.hasNext()) {
44-
String candidate = it.next();
45-
out.write(' ');
46-
out.write(candidate.getBytes("UTF-8"));
47-
}
47+
out.write(("" + candidates.cursor()).getBytes("UTF-8"));
48+
49+
List<String> list = candidates.candidates();
50+
Iterator<String> it = list.iterator();
51+
while (it.hasNext()) {
52+
String candidate = it.next();
53+
out.write(' ');
54+
out.write(candidate.getBytes("UTF-8"));
55+
}
4856

49-
out.write('\n');
50-
out.flush();
57+
out.write('\n');
58+
out.flush();
59+
}
60+
} catch (IOException e) {
61+
// Socket closed
5162
}
5263

64+
socketCleaner.run();
65+
66+
// Before logging this out, wait a litte for System.out to be restored back to the remote process
67+
Thread.sleep(1000);
5368
Log.log("Completer closed");
5469
}
5570

src/main/java/scalive/server/Repl.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import scalive.Classpath;
99
import scalive.Log;
10+
import scalive.Net;
1011

1112
import java.io.IOException;
1213
import java.io.InputStream;
@@ -17,39 +18,40 @@
1718

1819
class Repl {
1920
/** Creates a REPL and wire IO streams of the socket to it. */
20-
static ILoopWithCompletion run(final Socket socket, URLClassLoader cl) throws IOException {
21+
static ILoopWithCompletion run(
22+
final Socket socket, URLClassLoader cl, final Runnable socketCleaner
23+
) throws IOException {
2124
final InputStream in = socket.getInputStream();
2225
final OutputStream out = socket.getOutputStream();
2326

2427
final ILoopWithCompletion iloop = new ILoopWithCompletion(in, out);
2528
final Settings settings = getSettings(cl);
2629

27-
new Thread(new Runnable() {
30+
Net.throwSocketTimeoutExceptionForLongInactivity(socket);
31+
new Thread(Repl.class.getName() + "-iloop") {
2832
@Override
2933
public void run() {
3034
overrideScalaConsole(in, out, new Runnable() {
3135
@Override
3236
public void run() {
3337
// This call does not return until socket is closed,
3438
// or repl has been closed by the client using ":q"
35-
iloop.process(settings);
39+
try {
40+
iloop.process(settings);
41+
} catch (Exception e) {
42+
// See throwSocketTimeoutExceptionForLongInactivity above;
43+
// just let this thread ends
44+
}
3645
}
3746
});
3847

3948
// This code should be put outside overrideScalaConsole above
4049
// so that the output is not redirected to the client,
4150
// in case repl has been closed by the client using ":q"
51+
socketCleaner.run();
4252
Log.log("REPL closed");
43-
44-
try {
45-
// In case repl has been closed by the client using ":q",
46-
// we need to close socket to notify the client to exit
47-
socket.close();
48-
} catch (IOException e) {
49-
throw new RuntimeException(e);
50-
}
5153
}
52-
}).start();
54+
}.start();
5355

5456
return iloop;
5557
}

src/main/java/scalive/server/Server.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ static ServerSocket open(int port) throws IOException {
1919

2020
static void run(
2121
ServerSocket serverSocket, String[] jarSearchDirs
22-
) throws IOException, InvocationTargetException, NoSuchMethodException, ClassNotFoundException, IllegalAccessException {
22+
) throws IOException, InvocationTargetException, NoSuchMethodException, ClassNotFoundException, IllegalAccessException, InterruptedException {
2323
// Accept 2 connections (blocking)
2424
Socket replSocket = serverSocket.accept();
2525
Log.log("REPL connected");
@@ -34,8 +34,10 @@ static void run(
3434
URLClassLoader cl = (URLClassLoader) ClassLoader.getSystemClassLoader();
3535
loadDependencyJars(cl, jarSearchDirs);
3636

37-
ILoopWithCompletion iloop = Repl.run(replSocket, cl);
38-
Completer.run(completerSocket, iloop);
37+
Runnable socketCleaner = Net.getSocketCleaner(replSocket, completerSocket);
38+
39+
ILoopWithCompletion iloop = Repl.run(replSocket, cl, socketCleaner);
40+
Completer.run(completerSocket, iloop, socketCleaner);
3941
}
4042

4143
private static void loadDependencyJars(

0 commit comments

Comments
 (0)