Skip to content

Commit 7eb0084

Browse files
committed
Allow dynamic modification of io-threads num
Signed-off-by: Ayush Sharma <mrayushs933@gmail.com>
1 parent af25a52 commit 7eb0084

File tree

6 files changed

+72
-8
lines changed

6 files changed

+72
-8
lines changed

src/config.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* POSSIBILITY OF SUCH DAMAGE.
2929
*/
3030

31+
#include "io_threads.h"
3132
#include "server.h"
3233
#include "cluster.h"
3334
#include "connection.h"
@@ -3258,8 +3259,8 @@ standardConfig static_configs[] = {
32583259
/* Integer configs */
32593260
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.config_databases, 16, INTEGER_CONFIG, NULL, NULL),
32603261
createIntConfig("cluster-databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.config_databases_cluster, 1, INTEGER_CONFIG, NULL, NULL),
3261-
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
3262-
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
3262+
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
3263+
createIntConfig("io-threads", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, updateIOThreads), /* Single threaded by default */
32633264
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
32643265
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
32653266
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),

src/io_threads.c

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,18 +215,19 @@ static void *IOThreadMain(void *myid) {
215215
long id = (long)myid;
216216
char thdname[32];
217217

218-
serverAssert(server.io_threads_num > 0);
219-
serverAssert(id > 0 && id < server.io_threads_num);
220218
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
221219
valkey_set_thread_title(thdname);
222220
serverSetCpuAffinity(server.server_cpulist);
223221
makeThreadKillable();
224222
initSharedQueryBuf();
223+
pthread_cleanup_push(freeSharedQueryBuf, NULL);
225224

226225
thread_id = (int)id;
227226
size_t jobs_to_process = 0;
228227
IOJobQueue *jq = &io_jobs[id];
229228
while (1) {
229+
/* Cancellation point so that pthread_cancel() from main thread is honored. */
230+
pthread_testcancel();
230231
/* Wait for jobs */
231232
for (int j = 0; j < 1000000; j++) {
232233
jobs_to_process = IOJobQueue_availableJobs(jq);
@@ -255,12 +256,15 @@ static void *IOThreadMain(void *myid) {
255256
* As the main-thread main concern is to check if the queue is empty, it's enough to do it once at the end. */
256257
atomic_thread_fence(memory_order_release);
257258
}
258-
freeSharedQueryBuf();
259+
pthread_cleanup_pop(0);
259260
return NULL;
260261
}
261262

262263
#define IO_JOB_QUEUE_SIZE 2048
263264
static void createIOThread(int id) {
265+
serverAssert(server.io_threads_num > 0);
266+
serverAssert(id > 0 && id < server.io_threads_num);
267+
264268
pthread_t tid;
265269
pthread_mutex_init(&io_threads_mutex[id], NULL);
266270
IOJobQueue_init(&io_jobs[id], IO_JOB_QUEUE_SIZE);
@@ -287,7 +291,7 @@ static void shutdownIOThread(int id) {
287291
} else {
288292
serverLog(LL_NOTICE, "IO thread(tid:%lu) terminated", (unsigned long)tid);
289293
}
290-
294+
pthread_mutex_destroy(&io_threads_mutex[id]);
291295
IOJobQueue_cleanup(&io_jobs[id]);
292296
}
293297

@@ -297,6 +301,44 @@ void killIOThreads(void) {
297301
}
298302
}
299303

304+
int updateIOThreads(const char **err) {
305+
serverAssert(inMainThread());
306+
UNUSED(err);
307+
int prev_threads_num = 1;
308+
for (int i = IO_THREADS_MAX_NUM - 1; i > 0; i--) {
309+
if (io_threads[i]) {
310+
prev_threads_num = i + 1;
311+
break;
312+
}
313+
}
314+
if (prev_threads_num == server.io_threads_num) return 1;
315+
316+
serverLog(LL_NOTICE, "Changing number of IO threads from %d to %d.", prev_threads_num, server.io_threads_num);
317+
drainIOThreadsQueue();
318+
/* Set active threads to 1, will be adjusted based on workload later. */
319+
for (int i = 1; i < server.active_io_threads_num; i++) {
320+
pthread_mutex_lock(&io_threads_mutex[i]);
321+
}
322+
server.active_io_threads_num = 1;
323+
324+
// Create new threads.
325+
if (server.io_threads_num > prev_threads_num) {
326+
for (int i = prev_threads_num; i < server.io_threads_num; i++) {
327+
createIOThread(i);
328+
}
329+
}
330+
// Decrease the number of threads.
331+
else {
332+
for (int i = prev_threads_num - 1; i >= server.io_threads_num; i--) {
333+
// Unblock inactive thread.
334+
pthread_mutex_unlock(&io_threads_mutex[i]);
335+
shutdownIOThread(i);
336+
io_threads[i] = 0;
337+
}
338+
}
339+
return 1;
340+
}
341+
300342
/* Initialize the data structures needed for I/O threads. */
301343
void initIOThreads(void) {
302344
server.active_io_threads_num = 1; /* We start with threads not active. */

src/io_threads.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@ void adjustIOThreadsByEventLoad(int numevents, int increase_only);
1414
void drainIOThreadsQueue(void);
1515
void trySendPollJobToIOThreads(void);
1616
int trySendAcceptToIOThreads(connection *conn);
17+
int updateIOThreads(const char **err);
1718

1819
#endif /* IO_THREADS_H */

src/networking.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2720,7 +2720,8 @@ void initSharedQueryBuf(void) {
27202720
sdsclear(thread_shared_qb);
27212721
}
27222722

2723-
void freeSharedQueryBuf(void) {
2723+
void freeSharedQueryBuf(void *v) {
2724+
UNUSED(v);
27242725
sdsfree(thread_shared_qb);
27252726
thread_shared_qb = NULL;
27262727
}

src/server.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2769,7 +2769,7 @@ void linkClient(client *c);
27692769
void protectClient(client *c);
27702770
void unprotectClient(client *c);
27712771
void initSharedQueryBuf(void);
2772-
void freeSharedQueryBuf(void);
2772+
void freeSharedQueryBuf(void *v);
27732773
client *lookupClientByID(uint64_t id);
27742774
int authRequired(client *c);
27752775
void clientSetUser(client *c, user *u, int authenticated);

tests/unit/other.tcl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,25 @@ start_server {tags {"other external:skip"}} {
574574
}
575575
}
576576

577+
start_server {tags {"other external:skip"}} {
578+
test "test io-threads are runtime modifiable" {
579+
# Test set
580+
r config set io-threads 5
581+
set thread_num [lindex [r config get io-threads] 1]
582+
assert_equal 5 $thread_num
583+
584+
# Test decrease
585+
r config set io-threads 1
586+
set thread_num [lindex [r config get io-threads] 1]
587+
assert_equal 1 $thread_num
588+
589+
# Test increase
590+
r config set io-threads 4
591+
set thread_num [lindex [r config get io-threads] 1]
592+
assert_equal 4 $thread_num
593+
}
594+
}
595+
577596
set tempFileName [file join [pwd] [pid]]
578597
if {$::verbose} {
579598
puts "Creating temp file $tempFileName"

0 commit comments

Comments
 (0)