Skip to content

Commit 6e2600d

Browse files
committed
Allow dynamic modification of io-threads num
Signed-off-by: Ayush Sharma <mrayushs933@gmail.com>
1 parent 911641b commit 6e2600d

File tree

6 files changed

+69
-6
lines changed

6 files changed

+69
-6
lines changed

src/config.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* POSSIBILITY OF SUCH DAMAGE.
2929
*/
3030

31+
#include "io_threads.h"
3132
#include "server.h"
3233
#include "cluster.h"
3334
#include "connection.h"
@@ -3258,8 +3259,8 @@ standardConfig static_configs[] = {
32583259
/* Integer configs */
32593260
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.config_databases, 16, INTEGER_CONFIG, NULL, NULL),
32603261
createIntConfig("cluster-databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.config_databases_cluster, 1, INTEGER_CONFIG, NULL, NULL),
3261-
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
3262-
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
3262+
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
3263+
createIntConfig("io-threads", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, updateIOThreads), /* Single threaded by default */
32633264
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
32643265
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
32653266
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),

src/io_threads.c

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,11 +222,14 @@ static void *IOThreadMain(void *myid) {
222222
serverSetCpuAffinity(server.server_cpulist);
223223
makeThreadKillable();
224224
initSharedQueryBuf();
225+
pthread_cleanup_push(freeSharedQueryBuf, NULL);
225226

226227
thread_id = (int)id;
227228
size_t jobs_to_process = 0;
228229
IOJobQueue *jq = &io_jobs[id];
229230
while (1) {
231+
/* Cancellation point so that pthread_cancel() from main thread is honored. */
232+
pthread_testcancel();
230233
/* Wait for jobs */
231234
for (int j = 0; j < 1000000; j++) {
232235
jobs_to_process = IOJobQueue_availableJobs(jq);
@@ -255,7 +258,7 @@ static void *IOThreadMain(void *myid) {
255258
* As the main-thread main concern is to check if the queue is empty, it's enough to do it once at the end. */
256259
atomic_thread_fence(memory_order_release);
257260
}
258-
freeSharedQueryBuf();
261+
pthread_cleanup_pop(0);
259262
return NULL;
260263
}
261264

@@ -287,7 +290,7 @@ static void shutdownIOThread(int id) {
287290
} else {
288291
serverLog(LL_NOTICE, "IO thread(tid:%lu) terminated", (unsigned long)tid);
289292
}
290-
293+
pthread_mutex_destroy(&io_threads_mutex[id]);
291294
IOJobQueue_cleanup(&io_jobs[id]);
292295
}
293296

@@ -297,6 +300,44 @@ void killIOThreads(void) {
297300
}
298301
}
299302

303+
int updateIOThreads(const char **err) {
304+
serverAssert(inMainThread());
305+
UNUSED(err);
306+
int prev_threads_num = 1;
307+
for (int i = IO_THREADS_MAX_NUM - 1; i > 0; i--) {
308+
if (io_threads[i]) {
309+
prev_threads_num = i + 1;
310+
break;
311+
}
312+
}
313+
if (prev_threads_num == server.io_threads_num) return 1;
314+
315+
serverLog(LL_DEBUG, "Changing number of IO threads from %d to %d.", prev_threads_num, server.io_threads_num);
316+
drainIOThreadsQueue();
317+
/* Set active threads to 1, will be adjusted based on workload later. */
318+
for (int i = 1; i < server.active_io_threads_num; i++) {
319+
pthread_mutex_lock(&io_threads_mutex[i]);
320+
}
321+
server.active_io_threads_num = 1;
322+
323+
// Create new threads.
324+
if (server.io_threads_num > prev_threads_num) {
325+
for (int i = prev_threads_num; i < server.io_threads_num; i++) {
326+
createIOThread(i);
327+
}
328+
}
329+
// Decrease the number of threads.
330+
else {
331+
for (int i = prev_threads_num - 1; i >= server.io_threads_num; i--) {
332+
// Unblock inactive thread.
333+
pthread_mutex_unlock(&io_threads_mutex[i]);
334+
shutdownIOThread(i);
335+
io_threads[i] = 0;
336+
}
337+
}
338+
return 1;
339+
}
340+
300341
/* Initialize the data structures needed for I/O threads. */
301342
void initIOThreads(void) {
302343
server.active_io_threads_num = 1; /* We start with threads not active. */

src/io_threads.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@ void adjustIOThreadsByEventLoad(int numevents, int increase_only);
1414
void drainIOThreadsQueue(void);
1515
void trySendPollJobToIOThreads(void);
1616
int trySendAcceptToIOThreads(connection *conn);
17+
int updateIOThreads(const char **err);
1718

1819
#endif /* IO_THREADS_H */

src/networking.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2720,7 +2720,8 @@ void initSharedQueryBuf(void) {
27202720
sdsclear(thread_shared_qb);
27212721
}
27222722

2723-
void freeSharedQueryBuf(void) {
2723+
void freeSharedQueryBuf(void *v) {
2724+
UNUSED(v);
27242725
sdsfree(thread_shared_qb);
27252726
thread_shared_qb = NULL;
27262727
}

src/server.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2769,7 +2769,7 @@ void linkClient(client *c);
27692769
void protectClient(client *c);
27702770
void unprotectClient(client *c);
27712771
void initSharedQueryBuf(void);
2772-
void freeSharedQueryBuf(void);
2772+
void freeSharedQueryBuf(void *);
27732773
client *lookupClientByID(uint64_t id);
27742774
int authRequired(client *c);
27752775
void clientSetUser(client *c, user *u, int authenticated);

tests/unit/other.tcl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,25 @@ start_server {tags {"other external:skip"}} {
574574
}
575575
}
576576

577+
start_server {tags {"other external:skip"}} {
578+
test "test io-threads are runtime modifiable" {
579+
#Test set
580+
r config set io-threads 5
581+
set thread_num [lindex [r config get io-threads] 1]
582+
assert_equal 5 $thread_num
583+
584+
#Test decrease
585+
r config set io-threads 1
586+
set thread_num [lindex [r config get io-threads] 1]
587+
assert_equal 1 $thread_num
588+
589+
#Test increase
590+
r config set io-threads 4
591+
set thread_num [lindex [r config get io-threads] 1]
592+
assert_equal 4 $thread_num
593+
}
594+
}
595+
577596
set tempFileName [file join [pwd] [pid]]
578597
if {$::verbose} {
579598
puts "Creating temp file $tempFileName"

0 commit comments

Comments
 (0)