Skip to content

Commit 40b1d3f

Browse files
committed
Allow dynamic modification of io-threads num
Signed-off-by: Ayush Sharma <mrayushs933@gmail.com>
1 parent c9c49b4 commit 40b1d3f

File tree

3 files changed

+44
-2
lines changed

3 files changed

+44
-2
lines changed

src/config.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* POSSIBILITY OF SUCH DAMAGE.
2929
*/
3030

31+
#include "io_threads.h"
3132
#include "server.h"
3233
#include "cluster.h"
3334
#include "connection.h"
@@ -3264,8 +3265,8 @@ standardConfig static_configs[] = {
32643265

32653266
/* Integer configs */
32663267
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
3267-
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
3268-
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
3268+
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
3269+
createIntConfig("io-threads", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, updateIOThreads), /* Single threaded by default */
32693270
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
32703271
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
32713272
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),

src/io_threads.c

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ static void *IOThreadMain(void *myid) {
227227
size_t jobs_to_process = 0;
228228
IOJobQueue *jq = &io_jobs[id];
229229
while (1) {
230+
/* Cancellation point so that pthread_cancel() from main thread is honored. */
231+
pthread_testcancel();
230232
/* Wait for jobs */
231233
for (int j = 0; j < 1000000; j++) {
232234
jobs_to_process = IOJobQueue_availableJobs(jq);
@@ -297,6 +299,44 @@ void killIOThreads(void) {
297299
}
298300
}
299301

302+
int updateIOThreads(const char **err) {
303+
serverAssert(inMainThread());
304+
UNUSED(err);
305+
int prev_threads_num = 1;
306+
for (int i = IO_THREADS_MAX_NUM - 1; i > 0; i--) {
307+
if (io_threads[i]) {
308+
prev_threads_num = i + 1;
309+
break;
310+
}
311+
}
312+
if (prev_threads_num == server.io_threads_num) return 1;
313+
314+
serverLog(LL_DEBUG, "Changing number of IO threads from %d to %d.", prev_threads_num, server.io_threads_num);
315+
drainIOThreadsQueue();
316+
/* Set active threads to 1, will be adjusted based on workload later. */
317+
for (int i = 1; i < server.active_io_threads_num; i++) {
318+
pthread_mutex_lock(&io_threads_mutex[i]);
319+
}
320+
server.active_io_threads_num = 1;
321+
322+
// Create new threads.
323+
if (server.io_threads_num > prev_threads_num) {
324+
for (int i = prev_threads_num; i < server.io_threads_num; i++) {
325+
createIOThread(i);
326+
}
327+
}
328+
// Decrease the number of threads.
329+
else {
330+
for (int i = prev_threads_num - 1; i >= server.io_threads_num; i--) {
331+
// Unblock inactive thread.
332+
pthread_mutex_unlock(&io_threads_mutex[i]);
333+
shutdownIOThread(i);
334+
io_threads[i] = 0;
335+
}
336+
}
337+
return 1;
338+
}
339+
300340
/* Initialize the data structures needed for I/O threads. */
301341
void initIOThreads(void) {
302342
server.active_io_threads_num = 1; /* We start with threads not active. */

src/io_threads.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@ void adjustIOThreadsByEventLoad(int numevents, int increase_only);
1414
void drainIOThreadsQueue(void);
1515
void trySendPollJobToIOThreads(void);
1616
int trySendAcceptToIOThreads(connection *conn);
17+
int updateIOThreads(const char **err);
1718

1819
#endif /* IO_THREADS_H */

0 commit comments

Comments
 (0)