Skip to content

Commit 660dc17

Browse files
authored
Merge pull request #428 from vinser52/svinogra_refactor_mt_tests
Share multithread utils between tests and benchmarks
2 parents 5f4652f + 68546df commit 660dc17

File tree

6 files changed

+120
-119
lines changed

6 files changed

+120
-119
lines changed

benchmark/CMakeLists.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ function(add_umf_benchmark)
4242
LIBS ${BENCH_LIBS})
4343

4444
target_include_directories(
45-
${BENCH_NAME} PRIVATE ${UMF_CMAKE_SOURCE_DIR}/include
46-
${UMF_CMAKE_SOURCE_DIR}/src/utils)
45+
${BENCH_NAME}
46+
PRIVATE ${UMF_CMAKE_SOURCE_DIR}/include
47+
${UMF_CMAKE_SOURCE_DIR}/src/utils
48+
${UMF_CMAKE_SOURCE_DIR}/test/common)
4749

4850
target_link_directories(${BENCH_NAME} PRIVATE ${ARG_LIBDIRS})
4951

benchmark/multithread.hpp

Lines changed: 4 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -17,75 +17,9 @@
1717
#include <thread>
1818
#include <vector>
1919

20-
namespace umf_bench {
21-
22-
template <typename Function>
23-
void parallel_exec(size_t threads_number, Function &&f) {
24-
std::vector<std::thread> threads;
25-
threads.reserve(threads_number);
26-
27-
for (size_t i = 0; i < threads_number; ++i) {
28-
threads.emplace_back([&](size_t id) { f(id); }, i);
29-
}
30-
31-
for (auto &t : threads) {
32-
t.join();
33-
}
34-
}
35-
36-
class latch {
37-
public:
38-
latch(size_t desired) : counter(desired) {}
39-
40-
/* Returns true for the last thread arriving at the latch, false for all
41-
* other threads. */
42-
bool wait(std::unique_lock<std::mutex> &lock) {
43-
counter--;
44-
if (counter > 0) {
45-
cv.wait(lock, [&] { return counter == 0; });
46-
return false;
47-
} else {
48-
/*
49-
* notify_call could be called outside of a lock
50-
* (it would perform better) but drd complains
51-
* in that case
52-
*/
53-
cv.notify_all();
54-
return true;
55-
}
56-
}
20+
#include "multithread_helpers.hpp"
5721

58-
private:
59-
std::condition_variable cv;
60-
size_t counter = 0;
61-
};
62-
63-
/* Implements multi-use barrier (latch). Once all threads arrive at the
64-
* latch, a new latch is allocated and used by all subsequent calls to
65-
* syncthreads. */
66-
struct syncthreads_barrier {
67-
syncthreads_barrier(size_t num_threads) : num_threads(num_threads) {
68-
mutex = std::shared_ptr<std::mutex>(new std::mutex);
69-
current_latch = std::shared_ptr<latch>(new latch(num_threads));
70-
}
71-
72-
syncthreads_barrier(const syncthreads_barrier &) = delete;
73-
syncthreads_barrier &operator=(const syncthreads_barrier &) = delete;
74-
syncthreads_barrier(syncthreads_barrier &&) = default;
75-
76-
void operator()() {
77-
std::unique_lock<std::mutex> lock(*mutex);
78-
auto l = current_latch;
79-
if (l->wait(lock)) {
80-
current_latch = std::shared_ptr<latch>(new latch(num_threads));
81-
}
82-
}
83-
84-
private:
85-
size_t num_threads;
86-
std::shared_ptr<std::mutex> mutex;
87-
std::shared_ptr<latch> current_latch;
88-
};
22+
namespace umf_bench {
8923

9024
template <typename TimeUnit, typename F>
9125
typename TimeUnit::rep measure(F &&func) {
@@ -110,14 +44,12 @@ auto measure(size_t iterations, size_t concurrency, F &&run_workload) {
11044

11145
for (size_t i = 0; i < iterations; i++) {
11246
std::vector<ResultsType> iteration_results(concurrency);
113-
syncthreads_barrier syncthreads(concurrency);
114-
parallel_exec(concurrency, [&](size_t id) {
47+
umf_test::syncthreads_barrier syncthreads(concurrency);
48+
umf_test::parallel_exec(concurrency, [&](size_t id) {
11549
syncthreads();
11650

11751
iteration_results[id] =
11852
measure<TimeUnit>([&]() { run_workload(id); });
119-
120-
syncthreads();
12153
});
12254

12355
// skip the first 'warmup' iteration

src/critnib/critnib.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ int critnib_insert(struct critnib *c, word key, void *value, int update) {
343343

344344
struct critnib_node *n = c->root;
345345
if (!n) {
346-
c->root = kn;
346+
store(&c->root, kn);
347347

348348
util_mutex_unlock(&c->mutex);
349349

test/common/multithread_helpers.hpp

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
*
3+
* Copyright (C) 2024 Intel Corporation
4+
*
5+
* Under the Apache License v2.0 with LLVM Exceptions. See LICENSE.TXT.
6+
* SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7+
*
8+
*/
9+
10+
#include <condition_variable>
11+
#include <memory>
12+
#include <mutex>
13+
#include <thread>
14+
#include <vector>
15+
16+
namespace umf_test {
17+
18+
template <typename Function>
19+
void parallel_exec(size_t threads_number, Function &&f) {
20+
std::vector<std::thread> threads;
21+
threads.reserve(threads_number);
22+
23+
for (size_t i = 0; i < threads_number; ++i) {
24+
threads.emplace_back([&](size_t id) { f(id); }, i);
25+
}
26+
27+
for (auto &t : threads) {
28+
t.join();
29+
}
30+
}
31+
32+
class latch {
33+
public:
34+
latch(size_t desired) : counter(desired) {}
35+
36+
/* Returns true for the last thread arriving at the latch, false for all
37+
* other threads. */
38+
bool wait(std::unique_lock<std::mutex> &lock) {
39+
counter--;
40+
if (counter > 0) {
41+
cv.wait(lock, [&] { return counter == 0; });
42+
return false;
43+
} else {
44+
/*
45+
* notify_call could be called outside of a lock
46+
* (it would perform better) but drd complains
47+
* in that case
48+
*/
49+
cv.notify_all();
50+
return true;
51+
}
52+
}
53+
54+
private:
55+
std::condition_variable cv;
56+
size_t counter = 0;
57+
};
58+
59+
/* Implements multi-use barrier (latch). Once all threads arrive at the
60+
* latch, a new latch is allocated and used by all subsequent calls to
61+
* syncthreads. */
62+
struct syncthreads_barrier {
63+
syncthreads_barrier(size_t num_threads) : num_threads(num_threads) {
64+
mutex = std::shared_ptr<std::mutex>(new std::mutex);
65+
current_latch = std::shared_ptr<latch>(new latch(num_threads));
66+
}
67+
68+
syncthreads_barrier(const syncthreads_barrier &) = delete;
69+
syncthreads_barrier &operator=(const syncthreads_barrier &) = delete;
70+
syncthreads_barrier(syncthreads_barrier &&) = default;
71+
72+
void operator()() {
73+
std::unique_lock<std::mutex> lock(*mutex);
74+
auto l = current_latch;
75+
if (l->wait(lock)) {
76+
current_latch = std::shared_ptr<latch>(new latch(num_threads));
77+
}
78+
}
79+
80+
private:
81+
size_t num_threads;
82+
std::shared_ptr<std::mutex> mutex;
83+
std::shared_ptr<latch> current_latch;
84+
};
85+
86+
} // namespace umf_test

test/ipcAPI.cpp

Lines changed: 18 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
44
// This file contains tests for UMF pool API
55

6+
#include "multithread_helpers.hpp"
67
#include "pool.hpp"
78
#include "provider.hpp"
89

@@ -217,10 +218,10 @@ TEST_F(umfIpcTest, ConcurrentGetPutHandles) {
217218

218219
std::array<std::vector<umf_ipc_handle_t>, NTHREADS> ipcHandles;
219220

220-
auto getHandlesFn = [&ipcHandles, &ptrs](size_t tid) {
221-
// TODO: better to wait on the barrier here so that every thread
222-
// starts at the same point. But std::barrier is available only
223-
// starting from C++20
221+
umf_test::syncthreads_barrier syncthreads(NTHREADS);
222+
223+
auto getHandlesFn = [&ipcHandles, &ptrs, &syncthreads](size_t tid) {
224+
syncthreads();
224225
for (void *ptr : ptrs) {
225226
umf_ipc_handle_t ipcHandle;
226227
size_t handleSize;
@@ -230,31 +231,17 @@ TEST_F(umfIpcTest, ConcurrentGetPutHandles) {
230231
}
231232
};
232233

233-
std::vector<std::thread> threads;
234-
for (int i = 0; i < NTHREADS; i++) {
235-
threads.emplace_back(getHandlesFn, i);
236-
}
237-
238-
for (auto &thread : threads) {
239-
thread.join();
240-
}
241-
threads.clear();
234+
umf_test::parallel_exec(NTHREADS, getHandlesFn);
242235

243-
auto putHandlesFn = [&ipcHandles](size_t tid) {
236+
auto putHandlesFn = [&ipcHandles, &syncthreads](size_t tid) {
237+
syncthreads();
244238
for (umf_ipc_handle_t ipcHandle : ipcHandles[tid]) {
245239
umf_result_t ret = umfPutIPCHandle(ipcHandle);
246240
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
247241
}
248242
};
249243

250-
for (int i = 0; i < NTHREADS; i++) {
251-
threads.emplace_back(putHandlesFn, i);
252-
}
253-
254-
for (auto &thread : threads) {
255-
thread.join();
256-
}
257-
threads.clear();
244+
umf_test::parallel_exec(NTHREADS, putHandlesFn);
258245

259246
for (void *ptr : ptrs) {
260247
umf_result_t ret = umfPoolFree(pool.get(), ptr);
@@ -287,10 +274,11 @@ TEST_F(umfIpcTest, ConcurrentOpenCloseHandles) {
287274

288275
std::array<std::vector<void *>, NTHREADS> openedIpcHandles;
289276

290-
auto openHandlesFn = [this, &ipcHandles, &openedIpcHandles](size_t tid) {
291-
// TODO: better to wait on the barrier here so that every thread
292-
// starts at the same point. But std::barrier is available only
293-
// starting from C++20
277+
umf_test::syncthreads_barrier syncthreads(NTHREADS);
278+
279+
auto openHandlesFn = [this, &ipcHandles, &openedIpcHandles,
280+
&syncthreads](size_t tid) {
281+
syncthreads();
294282
for (auto ipcHandle : ipcHandles) {
295283
void *ptr;
296284
umf_result_t ret = umfOpenIPCHandle(pool.get(), ipcHandle, &ptr);
@@ -299,31 +287,17 @@ TEST_F(umfIpcTest, ConcurrentOpenCloseHandles) {
299287
}
300288
};
301289

302-
std::vector<std::thread> threads;
303-
for (int i = 0; i < NTHREADS; i++) {
304-
threads.emplace_back(openHandlesFn, i);
305-
}
306-
307-
for (auto &thread : threads) {
308-
thread.join();
309-
}
310-
threads.clear();
290+
umf_test::parallel_exec(NTHREADS, openHandlesFn);
311291

312-
auto closeHandlesFn = [&openedIpcHandles](size_t tid) {
292+
auto closeHandlesFn = [&openedIpcHandles, &syncthreads](size_t tid) {
293+
syncthreads();
313294
for (void *ptr : openedIpcHandles[tid]) {
314295
umf_result_t ret = umfCloseIPCHandle(ptr);
315296
EXPECT_EQ(ret, UMF_RESULT_SUCCESS);
316297
}
317298
};
318299

319-
for (int i = 0; i < NTHREADS; i++) {
320-
threads.emplace_back(closeHandlesFn, i);
321-
}
322-
323-
for (auto &thread : threads) {
324-
thread.join();
325-
}
326-
threads.clear();
300+
umf_test::parallel_exec(NTHREADS, closeHandlesFn);
327301

328302
for (auto ipcHandle : ipcHandles) {
329303
umf_result_t ret = umfPutIPCHandle(ipcHandle);

test/supp/drd-umf_test-ipc.supp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
Conditional variable destruction false-positive
3+
drd:CondErr
4+
...
5+
fun:pthread_cond_destroy@*
6+
...
7+
}

0 commit comments

Comments
 (0)