Skip to content

Commit cde1b5a

Browse files
committed
Allow dynamic modification of io-threads num
Signed-off-by: Ayush Sharma <mrayushs933@gmail.com>
1 parent c9c49b4 commit cde1b5a

File tree

3 files changed

+41
-2
lines changed

3 files changed

+41
-2
lines changed

src/config.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* POSSIBILITY OF SUCH DAMAGE.
2929
*/
3030

31+
#include "io_threads.h"
3132
#include "server.h"
3233
#include "cluster.h"
3334
#include "connection.h"
@@ -3264,8 +3265,8 @@ standardConfig static_configs[] = {
32643265

32653266
/* Integer configs */
32663267
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
3267-
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
3268-
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
3268+
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
3269+
createIntConfig("io-threads", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, updateIOThreads), /* Single threaded by default */
32693270
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
32703271
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
32713272
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),

src/io_threads.c

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66

77
#include "io_threads.h"
8+
#include "server.h"
89

910
static __thread int thread_id = 0; /* Thread local var */
1011
static pthread_t io_threads[IO_THREADS_MAX_NUM] = {0};
@@ -227,6 +228,8 @@ static void *IOThreadMain(void *myid) {
227228
size_t jobs_to_process = 0;
228229
IOJobQueue *jq = &io_jobs[id];
229230
while (1) {
231+
/* Cancellation point so that pthread_cancel() from main thread is honored. */
232+
pthread_testcancel();
230233
/* Wait for jobs */
231234
for (int j = 0; j < 1000000; j++) {
232235
jobs_to_process = IOJobQueue_availableJobs(jq);
@@ -297,6 +300,40 @@ void killIOThreads(void) {
297300
}
298301
}
299302

303+
int updateIOThreads(const char **err) {
304+
serverAssert(inMainThread());
305+
UNUSED(err);
306+
int prev_threads_num = 1;
307+
for (int i = IO_THREADS_MAX_NUM - 1; i > 0; i--) {
308+
if (io_threads[i]) {
309+
prev_threads_num = i + 1;
310+
break;
311+
}
312+
}
313+
if (prev_threads_num == server.io_threads_num) return 1;
314+
serverLog(LL_DEBUG, "Changing number of IO threads from %d to %d.", prev_threads_num, server.io_threads_num);
315+
drainIOThreadsQueue();
316+
/* Set active threads to 1, will be adjusted based on workload later. */
317+
for (int i = 1; i < server.active_io_threads_num; i++) pthread_mutex_lock(&io_threads_mutex[i]);
318+
server.active_io_threads_num = 1;
319+
// Create new threads.
320+
if (server.io_threads_num > prev_threads_num) {
321+
for (int i = prev_threads_num; i < server.io_threads_num; i++) {
322+
createIOThread(i);
323+
}
324+
}
325+
// Decrease the number of threads.
326+
else {
327+
for (int i = prev_threads_num - 1; i >= server.io_threads_num; i--) {
328+
// Unblock inactive thread.
329+
pthread_mutex_unlock(&io_threads_mutex[i]);
330+
shutdownIOThread(i);
331+
io_threads[i] = 0;
332+
}
333+
}
334+
return 1;
335+
}
336+
300337
/* Initialize the data structures needed for I/O threads. */
301338
void initIOThreads(void) {
302339
server.active_io_threads_num = 1; /* We start with threads not active. */

src/io_threads.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@ void adjustIOThreadsByEventLoad(int numevents, int increase_only);
1414
void drainIOThreadsQueue(void);
1515
void trySendPollJobToIOThreads(void);
1616
int trySendAcceptToIOThreads(connection *conn);
17+
int updateIOThreads(const char **err);
1718

1819
#endif /* IO_THREADS_H */

0 commit comments

Comments
 (0)