Skip to content

Commit ab6c4a3

Browse files
authored
replace ForkJoinPool with fixed thread pool (#4552)
fixes #4549
1 parent 1b4a014 commit ab6c4a3

File tree

3 files changed

+77
-81
lines changed

3 files changed

+77
-81
lines changed

opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexDatabase.java

Lines changed: 55 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.ArrayList;
4040
import java.util.Arrays;
4141
import java.util.Comparator;
42+
import java.util.HashMap;
4243
import java.util.HashSet;
4344
import java.util.List;
4445
import java.util.Map;
@@ -47,9 +48,11 @@
4748
import java.util.Optional;
4849
import java.util.Set;
4950
import java.util.TreeMap;
51+
import java.util.concurrent.Callable;
5052
import java.util.concurrent.CopyOnWriteArrayList;
5153
import java.util.concurrent.CountDownLatch;
5254
import java.util.concurrent.ExecutionException;
55+
import java.util.concurrent.Future;
5356
import java.util.concurrent.TimeUnit;
5457
import java.util.concurrent.atomic.AtomicInteger;
5558
import java.util.logging.Level;
@@ -1902,54 +1905,58 @@ private void indexParallel(String dir, IndexDownArgs args) {
19021905
IndexerParallelizer parallelizer = RuntimeEnvironment.getInstance().getIndexerParallelizer();
19031906
ObjectPool<Ctags> ctagsPool = parallelizer.getCtagsPool();
19041907

1905-
Map<Boolean, List<IndexFileWork>> bySuccess = null;
1908+
Map<Boolean, List<IndexFileWork>> bySuccess = new HashMap<>();
19061909
try (Progress progress = new Progress(LOGGER, String.format("indexing '%s'", dir), worksCount)) {
1907-
bySuccess = parallelizer.getForkJoinPool().submit(() ->
1908-
args.works.parallelStream().collect(
1909-
Collectors.groupingByConcurrent((x) -> {
1910-
int tries = 0;
1911-
Ctags pctags = null;
1912-
boolean ret;
1913-
while (true) {
1914-
try {
1915-
if (alreadyClosedCounter.get() > 0) {
1916-
ret = false;
1917-
} else {
1918-
pctags = ctagsPool.get();
1919-
addFile(x.file, x.path, pctags);
1920-
successCounter.incrementAndGet();
1921-
ret = true;
1910+
Set<Callable<IndexFileWork>> callables = args.works.stream().
1911+
<Callable<IndexFileWork>>map(x -> () -> {
1912+
int tries = 0;
1913+
Ctags pctags = null;
1914+
while (true) {
1915+
try {
1916+
if (alreadyClosedCounter.get() > 0) {
1917+
x.ret = false;
1918+
} else {
1919+
pctags = ctagsPool.get();
1920+
addFile(x.file, x.path, pctags);
1921+
successCounter.incrementAndGet();
1922+
x.ret = true;
1923+
}
1924+
} catch (AlreadyClosedException e) {
1925+
alreadyClosedCounter.incrementAndGet();
1926+
String errmsg = String.format("ERROR addFile(): '%s'", x.file);
1927+
LOGGER.log(Level.SEVERE, errmsg, e);
1928+
x.exception = e;
1929+
x.ret = false;
1930+
} catch (InterruptedException e) {
1931+
// Allow one retry if interrupted
1932+
if (++tries <= 1) {
1933+
continue;
1934+
}
1935+
LOGGER.log(Level.WARNING, "No retry: ''{0}''", x.file);
1936+
x.exception = e;
1937+
x.ret = false;
1938+
} catch (RuntimeException | IOException e) {
1939+
String errmsg = String.format("ERROR addFile(): '%s'", x.file);
1940+
LOGGER.log(Level.WARNING, errmsg, e);
1941+
x.exception = e;
1942+
x.ret = false;
1943+
} finally {
1944+
if (pctags != null) {
1945+
pctags.reset();
1946+
ctagsPool.release(pctags);
1947+
}
19221948
}
1923-
} catch (AlreadyClosedException e) {
1924-
alreadyClosedCounter.incrementAndGet();
1925-
String errmsg = String.format("ERROR addFile(): '%s'", x.file);
1926-
LOGGER.log(Level.SEVERE, errmsg, e);
1927-
x.exception = e;
1928-
ret = false;
1929-
} catch (InterruptedException e) {
1930-
// Allow one retry if interrupted
1931-
if (++tries <= 1) {
1932-
continue;
1933-
}
1934-
LOGGER.log(Level.WARNING, "No retry: ''{0}''", x.file);
1935-
x.exception = e;
1936-
ret = false;
1937-
} catch (RuntimeException | IOException e) {
1938-
String errmsg = String.format("ERROR addFile(): '%s'", x.file);
1939-
LOGGER.log(Level.WARNING, errmsg, e);
1940-
x.exception = e;
1941-
ret = false;
1942-
} finally {
1943-
if (pctags != null) {
1944-
pctags.reset();
1945-
ctagsPool.release(pctags);
1946-
}
1947-
}
19481949

1949-
progress.increment();
1950-
return ret;
1951-
}
1952-
}))).get();
1950+
progress.increment();
1951+
return x;
1952+
}
1953+
}).
1954+
collect(Collectors.toSet());
1955+
List<Future<IndexFileWork>> futures = parallelizer.getIndexWorkExecutor().invokeAll(callables);
1956+
for (var future : futures) {
1957+
IndexFileWork work = future.get();
1958+
bySuccess.computeIfAbsent(work.ret, key -> new ArrayList<>()).add(work);
1959+
}
19531960
} catch (InterruptedException | ExecutionException e) {
19541961
interrupted = true;
19551962
int successCount = successCounter.intValue();
@@ -1961,14 +1968,9 @@ private void indexParallel(String dir, IndexDownArgs args) {
19611968

19621969
args.curCount = currentCounter.intValue();
19631970

1964-
// Start with failureCount=worksCount, and then subtract successes.
1965-
int failureCount = worksCount;
1966-
if (bySuccess != null) {
1967-
List<IndexFileWork> successes = bySuccess.getOrDefault(Boolean.TRUE, null);
1968-
if (successes != null) {
1969-
failureCount -= successes.size();
1970-
}
1971-
}
1971+
int failureCount = worksCount - Optional.ofNullable(bySuccess.get(Boolean.TRUE))
1972+
.map(List::size)
1973+
.orElse(0);
19721974
if (failureCount > 0) {
19731975
double pctFailed = 100.0 * failureCount / worksCount;
19741976
String exmsg = String.format("%d failures (%.1f%%) while parallel-indexing", failureCount, pctFailed);

opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexDownArgs.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class IndexFileWork {
3535
final File file;
3636
final String path;
3737
Exception exception;
38+
boolean ret;
3839

3940
IndexFileWork(File file, String path) {
4041
this.file = file;

opengrok-indexer/src/main/java/org/opengrok/indexer/index/IndexerParallelizer.java

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525

2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
28-
import java.util.concurrent.ForkJoinPool;
29-
import java.util.concurrent.ForkJoinWorkerThread;
3028
import java.util.concurrent.ScheduledThreadPoolExecutor;
3129

3230
import org.opengrok.indexer.analysis.Ctags;
@@ -38,27 +36,25 @@
3836
import org.opengrok.indexer.util.ObjectFactory;
3937
import org.opengrok.indexer.util.ObjectPool;
4038

41-
import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory;
42-
4339
/**
4440
* Represents a container for executors that enable parallelism for indexing
4541
* across projects and repositories and also within any {@link IndexDatabase}
4642
* instance -- with global limits for all execution.
4743
* <p>A fixed-thread pool is used for parallelism across repositories, and a
48-
* work-stealing {@link ForkJoinPool} is used for parallelism within any
44+
* {@link #lzIndexWorkExecutor} is used for parallelism within any
4945
* {@link IndexDatabase}. Threads in the former pool are customers of the
50-
* latter, and the bulk of work is done in the latter pool. The work-stealing
51-
* {@link ForkJoinPool} makes use of a corresponding fixed pool of {@link Ctags}
52-
* instances.
53-
* <p>Additionally there are pools for executing for history, for renamings in
46+
* latter, and the bulk of work is done in the latter pool.
47+
* The {@link #lzIndexWorkExecutor} makes use of a corresponding fixed pool
48+
* of {@link Ctags} instances.
49+
* <p>Additionally there are pools for executing for history, for renames in
5450
* history, and for watching the {@link Ctags} instances for timing purposes.
5551
*/
5652
public class IndexerParallelizer implements AutoCloseable {
5753

5854
private final RuntimeEnvironment env;
5955
private final int indexingParallelism;
6056

61-
private LazilyInstantiate<ForkJoinPool> lzForkJoinPool;
57+
private LazilyInstantiate<ExecutorService> lzIndexWorkExecutor;
6258
private LazilyInstantiate<ObjectPool<Ctags>> lzCtagsPool;
6359
private LazilyInstantiate<ExecutorService> lzFixedExecutor;
6460
private LazilyInstantiate<ExecutorService> lzHistoryExecutor;
@@ -82,7 +78,7 @@ public IndexerParallelizer(RuntimeEnvironment env) {
8278
*/
8379
this.indexingParallelism = env.getIndexingParallelism();
8480

85-
createLazyForkJoinPool();
81+
createIndexWorkExecutor();
8682
createLazyCtagsPool();
8783
createLazyFixedExecutor();
8884
createLazyHistoryExecutor();
@@ -99,10 +95,10 @@ public ExecutorService getFixedExecutor() {
9995
}
10096

10197
/**
102-
* @return the forkJoinPool
98+
* @return the executor used for individual file processing in the 2nd stage of indexing
10399
*/
104-
public ForkJoinPool getForkJoinPool() {
105-
return lzForkJoinPool.get();
100+
public ExecutorService getIndexWorkExecutor() {
101+
return lzIndexWorkExecutor.get();
106102
}
107103

108104
/**
@@ -166,7 +162,7 @@ public void close() {
166162
* call this method satisfactorily too.
167163
*/
168164
public void bounce() {
169-
bounceForkJoinPool();
165+
bounceIndexWorkExecutor();
170166
bounceFixedExecutor();
171167
bounceCtagsPool();
172168
bounceHistoryExecutor();
@@ -175,11 +171,11 @@ public void bounce() {
175171
bounceXrefWatcherExecutor();
176172
}
177173

178-
private void bounceForkJoinPool() {
179-
if (lzForkJoinPool.isActive()) {
180-
ForkJoinPool formerForkJoinPool = lzForkJoinPool.get();
181-
createLazyForkJoinPool();
182-
formerForkJoinPool.shutdown();
174+
private void bounceIndexWorkExecutor() {
175+
if (lzIndexWorkExecutor.isActive()) {
176+
ExecutorService formerIndexWorkExecutor = lzIndexWorkExecutor.get();
177+
createIndexWorkExecutor();
178+
formerIndexWorkExecutor.shutdown();
183179
}
184180
}
185181

@@ -231,13 +227,10 @@ private void bounceXrefWatcherExecutor() {
231227
}
232228
}
233229

234-
private void createLazyForkJoinPool() {
235-
lzForkJoinPool = LazilyInstantiate.using(() ->
236-
new ForkJoinPool(indexingParallelism, forkJoinPool -> {
237-
ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(forkJoinPool);
238-
thread.setName(OpenGrokThreadFactory.PREFIX + "ForkJoinPool-" + thread.getId());
239-
return thread;
240-
}, null, false));
230+
private void createIndexWorkExecutor() {
231+
lzIndexWorkExecutor = LazilyInstantiate.using(() ->
232+
Executors.newFixedThreadPool(indexingParallelism,
233+
new OpenGrokThreadFactory("index-worker")));
241234
}
242235

243236
private void createLazyCtagsPool() {
@@ -261,7 +254,7 @@ private void createLazyXrefWatcherExecutor() {
261254
private void createLazyFixedExecutor() {
262255
lzFixedExecutor = LazilyInstantiate.using(() ->
263256
Executors.newFixedThreadPool(indexingParallelism,
264-
new OpenGrokThreadFactory("index-worker")));
257+
new OpenGrokThreadFactory("index-db")));
265258
}
266259

267260
private void createLazyHistoryExecutor() {

0 commit comments

Comments
 (0)