diff --git a/src/config.c b/src/config.c index ac3252f823..99eb86ab28 100644 --- a/src/config.c +++ b/src/config.c @@ -28,6 +28,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include "io_threads.h" #include "server.h" #include "cluster.h" #include "connection.h" @@ -3258,8 +3259,8 @@ standardConfig static_configs[] = { /* Integer configs */ createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.config_databases, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("cluster-databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.config_databases_cluster, 1, INTEGER_CONFIG, NULL, NULL), - createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ - createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ + createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ + createIntConfig("io-threads", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, updateIOThreads), /* Single threaded by default */ createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL), createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL), diff --git a/src/io_threads.c b/src/io_threads.c index 6c5a878ca9..b226b87bdd 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -215,18 +215,19 @@ static void *IOThreadMain(void *myid) { long id = (long)myid; char thdname[32]; - serverAssert(server.io_threads_num > 0); - serverAssert(id > 0 && id < server.io_threads_num); snprintf(thdname, sizeof(thdname), "io_thd_%ld", id); valkey_set_thread_title(thdname); serverSetCpuAffinity(server.server_cpulist); makeThreadKillable(); initSharedQueryBuf(); + pthread_cleanup_push(freeSharedQueryBuf, NULL); thread_id = (int)id; size_t jobs_to_process = 0; IOJobQueue *jq = &io_jobs[id]; while (1) { + /* Cancellation point so that pthread_cancel() from main thread is honored. */ + pthread_testcancel(); /* Wait for jobs */ for (int j = 0; j < 1000000; j++) { jobs_to_process = IOJobQueue_availableJobs(jq); @@ -255,12 +256,15 @@ static void *IOThreadMain(void *myid) { * As the main-thread main concern is to check if the queue is empty, it's enough to do it once at the end. */ atomic_thread_fence(memory_order_release); } - freeSharedQueryBuf(); + pthread_cleanup_pop(0); return NULL; } #define IO_JOB_QUEUE_SIZE 2048 static void createIOThread(int id) { + serverAssert(server.io_threads_num > 0); + serverAssert(id > 0 && id < server.io_threads_num); + pthread_t tid; pthread_mutex_init(&io_threads_mutex[id], NULL); IOJobQueue_init(&io_jobs[id], IO_JOB_QUEUE_SIZE); @@ -287,7 +291,7 @@ static void shutdownIOThread(int id) { } else { serverLog(LL_NOTICE, "IO thread(tid:%lu) terminated", (unsigned long)tid); } - + pthread_mutex_destroy(&io_threads_mutex[id]); IOJobQueue_cleanup(&io_jobs[id]); } @@ -297,6 +301,44 @@ void killIOThreads(void) { } } +int updateIOThreads(const char **err) { + serverAssert(inMainThread()); + UNUSED(err); + int prev_threads_num = 1; + for (int i = IO_THREADS_MAX_NUM - 1; i > 0; i--) { + if (io_threads[i]) { + prev_threads_num = i + 1; + break; + } + } + if (prev_threads_num == server.io_threads_num) return 1; + + serverLog(LL_NOTICE, "Changing number of IO threads from %d to %d.", prev_threads_num, server.io_threads_num); + drainIOThreadsQueue(); + /* Set active threads to 1, will be adjusted based on workload later. */ + for (int i = 1; i < server.active_io_threads_num; i++) { + pthread_mutex_lock(&io_threads_mutex[i]); + } + server.active_io_threads_num = 1; + + // Create new threads. + if (server.io_threads_num > prev_threads_num) { + for (int i = prev_threads_num; i < server.io_threads_num; i++) { + createIOThread(i); + } + } + // Decrease the number of threads. + else { + for (int i = prev_threads_num - 1; i >= server.io_threads_num; i--) { + // Unblock inactive thread. + pthread_mutex_unlock(&io_threads_mutex[i]); + shutdownIOThread(i); + io_threads[i] = 0; + } + } + return 1; +} + /* Initialize the data structures needed for I/O threads. */ void initIOThreads(void) { server.active_io_threads_num = 1; /* We start with threads not active. */ diff --git a/src/io_threads.h b/src/io_threads.h index a3ff582a77..992cb66c43 100644 --- a/src/io_threads.h +++ b/src/io_threads.h @@ -14,5 +14,6 @@ void adjustIOThreadsByEventLoad(int numevents, int increase_only); void drainIOThreadsQueue(void); void trySendPollJobToIOThreads(void); int trySendAcceptToIOThreads(connection *conn); +int updateIOThreads(const char **err); #endif /* IO_THREADS_H */ diff --git a/src/networking.c b/src/networking.c index 5f57050971..a9961574c8 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2720,7 +2720,8 @@ void initSharedQueryBuf(void) { sdsclear(thread_shared_qb); } -void freeSharedQueryBuf(void) { +void freeSharedQueryBuf(void *dummy) { + UNUSED(dummy); sdsfree(thread_shared_qb); thread_shared_qb = NULL; } diff --git a/src/server.h b/src/server.h index d6c52de8db..07271f8c41 100644 --- a/src/server.h +++ b/src/server.h @@ -2769,7 +2769,7 @@ void linkClient(client *c); void protectClient(client *c); void unprotectClient(client *c); void initSharedQueryBuf(void); -void freeSharedQueryBuf(void); +void freeSharedQueryBuf(void *dummy); client *lookupClientByID(uint64_t id); int authRequired(client *c); void clientSetUser(client *c, user *u, int authenticated); diff --git a/tests/unit/other.tcl b/tests/unit/other.tcl index b8fb9ab7dd..620c1f893a 100644 --- a/tests/unit/other.tcl +++ b/tests/unit/other.tcl @@ -574,6 +574,25 @@ start_server {tags {"other external:skip"}} { } } +start_server {tags {"other external:skip"}} { + test "test io-threads are runtime modifiable" { + # Test set + r config set io-threads 5 + set thread_num [lindex [r config get io-threads] 1] + assert_equal 5 $thread_num + + # Test decrease + r config set io-threads 1 + set thread_num [lindex [r config get io-threads] 1] + assert_equal 1 $thread_num + + # Test increase + r config set io-threads 4 + set thread_num [lindex [r config get io-threads] 1] + assert_equal 4 $thread_num + } +} + set tempFileName [file join [pwd] [pid]] if {$::verbose} { puts "Creating temp file $tempFileName"