Проект представляет собой простой, но функционально полный in-memory key-value store с собственным сетевым сервером.
- Использование не блокирующего ввода-вывода через корутины (асинхронный EventLoop на
epoll
/select
) для обработки сетевых соединений. - Управление памятью на низком уровне через собственный Memory Pool (аллокатор), реализующий быстрый
allocate
/deallocate
блоков фиксированного размера. - Организация параллельного исполнения через пул потоков (
ThreadPool
) с очередью задач и безопасной синхронизацией. - Простую, но гибкую систему логирования (уровни логов, вывод в консоль и/или файл).
- Реализацию хеш‐таблицы с шардированием, для уменьшения конкуренции при одновременном доступе из нескольких потоков. (шардированный map в
Server
) - Неблокирующий сетевой сервер, обрабатывающий команды
GET
,SET
,DEL
в текстовом протоколе.
- Структура проекта
- Основные модули и их назначение
- Конфигурация (
config.hpp
) - Логирование (
logger.hpp
,logger.cpp
) - Пул потоков (
thread_pool.hpp
,thread_pool.cpp
) - Аллокатор памяти (
allocator.hpp
,allocator.cpp
) - Асинхронный I/O (корутины) (
coroutine_io.hpp
,coroutine_io.cpp
) - Хеш-таблица и шардирование (
hash_table.hpp
,sharded_hash_map.hpp
) - Сервер (
server.hpp
) - Точка входа (
main.cpp
)
- Конфигурация (
- Пример сборки и запуска
- Использование
- Особенности реализации
- Настройка логирования
- Лицензия
.
├── CMakeLists.txt # Скрипт для сборки через CMake
├── README.md # (этот файл)
├── main.cpp # Точка входа, инициализация логгера, запуск сервера
├── include/
│ ├── config.hpp # Параметры по умолчанию (порт, размер пула потоков и т.д.)
│ ├── kv/ # Пространство имён kv
│ │ ├── allocator.hpp # Интерфейс MemoryPool
│ │ ├── coroutine_io.hpp # Интерфейс асинхронного I/O
│ │ ├── hash_table.hpp # Модульная хеш-таблица
│ │ ├── sharded_hash_map.hpp # Sharded-обёртка над hash_table
│ │ ├── logger.hpp # Интерфейс логгера: уровни (TRACE/DEBUG/INFO/WARN/ERROR/FATAL) и макросы `LOG_*`
│ │ ├── server.hpp # Интерфейс сетевого сервера: шаблонный класс Server<Key,Value>, содержащий `sharded_map` и логику обработки команд, настройку сокета
│ │ └── thread_pool.hpp # Интерфейс ThreadPool: запуск пула
│ ├── src/
│ │ ├── allocator.cpp # Реализация MemoryPool
│ │ ├── coroutine_io.cpp # Реализация EventLoop (epoll/`select`), Read/Write Awaitable для Windows/Linux
│ │ ├── logger.cpp # Реализация логирования: консоль + файл, безопасность потоков, форматирование timestamp
│ └── └── thread_pool.cpp # Реализация ThreadPool: блокировка очереди задач (mutex/condition), потоки‐работники, atomic для учёта активных задач
└── kv_server.log # Файл логов по умолчанию (генерируется при запуске)
Файл config.hpp
содержит константы, задающие настройки сервера по умолчанию:
SERVER_PORT
— порт, на котором будет слушать сервер (по умолчанию 5555).THREAD_POOL_SIZE
— число рабочих потоков в пуле (по умолчаниюstd::thread::hardware_concurrency()
или установленное вручную значение).
kv::log::LoggerConfig
: структура, задающая параметры логирования:level
— минимальный уровень логирования (TRACE
,DEBUG
,INFO
,WARN
,ERROR
,FATAL
).to_console
— вывод в консоль (stdout/ stderr).to_file
— вывод в файл.filename
— путь к файлу логов.
- Макросы
LOG_TRACE
,LOG_DEBUG
,LOG_INFO
и пр. позволяют указать сообщение, имя файла/номер строки, уровень и сам текст.
kv::ThreadPool
: при инициализации создаётся несколько рабочих потоков (по умолчанию – число аппаратных ядер).- Метод
submit(std::function<void()>)
добавляет задачу, увеличивает счётчик активных задач (activeTasks_
), и разбудит один поток. - Рабочий поток в
workerThread()
ждет поstd::condition_variable
, затем забирает задачу, выполняет её, затем уменьшаетactiveTasks_
. - При вызове
shutdown()
устанавливается флагstop_ = true
, пробуждает все потоки, и ждёт их завершения.
kv::MemoryPool
— пул блоков фиксированного размера (blockSize
) и заданного числа блоков за одну аллокацию (blocksCount
).- При первом запросе
allocate()
внутриfreeList_
нет узлов — вызываетсяallocateBlock()
, выделяется кусок памяти размеромblockSize * blocksCount
, разбивается наblocksCount
узловFreeNode
, и все они встают в список свободных. - При
allocate()
берется первый узел из спискаfreeList_
. - При
deallocate(void*)
указатель возвращается в голову списка. - Для защиты списка используется
std::mutex
(mtx_
). - При выгрузке (
~MemoryPool
) все ранее выделенные блоки (allBlocks_
) освобождаются черезfree()
.
kv::EventLoop
— синглтон, внутри себя хранит файловый дескрипторepollFd_
(Linux) или используетselect
(Windows).- Методы
add_reader(int fd, coroutine_handle<>)
/add_writer(int fd, coroutine_handle<>)
регистрируют дескрипторы и корутины, ожидающие готовности на чтение/запись. На Linux сначала ставят дескриптор в неблокирующий режим (fcntl(fd, O_NONBLOCK)
), затем добавляют вepoll
с флагомEPOLLIN | EPOLLET
либоEPOLLOUT | EPOLLET
. wait_and_handle_epoll()
: бесконечный циклepoll_wait(...)
, затем для каждого готового дескриптора извлекается соответствующая корутина (handlers_[fd]
), убирается из мапы и вызываетсяhandle.resume()
.ReadAwaitable
/WriteAwaitable
: объекты, возвращаемые функциямиasync_read(fd, buf, size)
/async_write(fd, buf, size)
. При первомco_await
вызываютawait_suspend
, где размещают себя вEventLoop
; когдаepoll
сообщает, что дескриптор готов, вызываетсяawait_resume()
, который либо читает (::read
/::recv
) либо пишет (::write
/::send
) данные.
kv::HashTable<Key, Value, Hash, KeyEqual>
— однопоточная реализация хеш-таблицы. Детали реализации ядра хеш-таблицы находятся вhash_table.hpp
—хеш-таблица с резервированием и динамическим ростом при нагрузке выше определенного порога.kv::ShardedHashMap<Key, Value, Hash, KeyEqual>
— обёртка над N сегментами (каждый сегмент — свояHashTable
+ своя мьютекс/спинлок). При операцияхget
,put
,erase
вычисляется хеш ключа, берётся сегмент =(hash % num_segments)
, и под соответствующим мьютексом совершается операция.- В
Server
создаетсяShardedHashMap<std::string, std::string>
с 4 сегментами по умолчанию.
- Шаблонный класс
kv::Server<Key, Value, Hash = std::hash<Key>, KeyEqual = std::equal_to<Key>>
:- Конструктор принимает
address: string
иport: uint16_t
. - Внутри хранится
listenFd_
, а такжеShardedHashMap<Key,Value>
(по умолчанию 4 сегмента). setup_listening_socket()
:- Создание TCP-сокета (
socket(...)
), установкаSO_REUSEADDR
,bind()
,listen()
, затем перевод сокета в неблокирующий режим (fcntl/
ioctlsocket
).
- Создание TCP-сокета (
run()
:- Вызывает
setup_listening_socket()
. - Запускает
accept_loop()
в отдельном потоке (detach), чтобы не блокировать цикл корутин. - Запускает
EventLoop::instance().run()
, который обрабатываетasync_read
/async_write
.
- Вызывает
accept_loop()
:- Бесконечный цикл
select(listenFd_)
(Linux/Windows), при появлении нового соединения –accept()
, перевести клиентский FD в неблокирующий режим, сразу вызватьhandle_connection(clientFd)
, который возвращаетTask
.
- Бесконечный цикл
Task handle_connection(SOCKET_TYPE clientFd)
:- Бесконечный цикл:
n = co_await async_read(clientFd, buffer, 4096)
. Еслиn <= 0
, закрыть соединение. - Иначе парсит строку
req
(до\n
), если начало"GET "
– извлечь ключ, вызватьshardedMap_.get(key)
, сформировать ответ (value + "\n"
или"NOT_FOUND\n"
), и сделатьco_await async_write(...)
. - Аналогично для
"SET "
– найти пробел, разделяющий ключ и значение, вызватьshardedMap_.put(key, value)
, и ответ"STORED\n"
. - Для
"DEL"
–shardedMap_.erase(key)
, и ответ"DELETED\n"
либо"NOT_FOUND\n"
. - Иначе ответ
"ERROR\n"
. - После выхода из цикла закрывается FD (
close(fd)
илиclosesocket
).
- Бесконечный цикл:
- Конструктор принимает
- В
main(int argc, char* argv[])
читается порт из аргумента (или используетсяconfig::SERVER_PORT
). - Создаётся
ThreadPool pool(config::THREAD_POOL_SIZE)
. - Настраивается
LoggerConfig
: уровеньDEBUG
, вывод в консоль и в файлkv_server.log
. - Логгер инициализируется, выводится
LOG_INFO("LaunchKV server on ...")
. - Создается
Server<std::string, std::string> server("0.0.0.0", port)
, запускаетсяserver.run()
. - После
run()
управление никогда не возвращается (циклEventLoop
работает в текущем потоке, аaccept_loop
в фоновой нити), поэтому вызовpool.shutdown()
– формальность “после завершения работы сервера”.
Запуск:
./kv_server [порт]
- Если не указан порт, берётся значение
config::SERVER_PORT
(по умолчанию 5555). - Логи будут писаться в файл
kv_server.log
и выводиться в консоль.
Сервер ожидает соединения по TCP, принимает текстовые команды, заканчивающиеся символом \n
. Каждая команда — отдельная строка. Поддерживаются три базовых команды:
-
GET
- Клиент отправляет:
GET my_key\n
- Если ключ существует, сервер отвечает:
value\n
- Если ключ отсутствует, сервер отвечает:
NOT_FOUND\n
- Клиент отправляет:
-
SET
- Клиент отправляет:
SET my_key some_value\n
- Сервер сохраняет пару (my_key → some_value) и отвечает:
STORED\n
- Клиент отправляет:
-
DEL
- Клиент отправляет:
DEL my_key\n
- Если ключ был удалён, сервер отвечает:
DELETED\n
- Если ключ отсутствует, сервер отвечает:
NOT_FOUND\n
- Клиент отправляет:
Во всех остальных случаях (неизвестная команда) сервер отвечает: ERROR\n
.
# Подключаемся к серверу на localhost:5555
telnet 127.0.0.1 5555
# Внутри telnet вводим:
SET foo bar
# Получим: STORED
GET foo
# Получим: bar
DEL foo
# Получим: DELETED
GET foo
# Получим: NOT_FOUND
ncat 127.0.0.1 5555
Или с помощью netcat
(nc
):
# Подключаемся к серверу на localhost:5555
ncat 127.0.0.1 5555
# Внутри ncat вводим:
SET foo bar
# Получим: STORED
GET foo
# Получим: bar
DEL foo
# Получим: DELETED
GET foo
# Получим: NOT_FOUND
MemoryPool
реализован вallocator.cpp
/allocator.hpp
.- При создании объекта задается размер блока
blockSize
и число блоков в одном большом кускеblocksCount
. - Алгоритм:
- При запросе
allocate()
– еслиfreeList_
пуст, вызываемallocateBlock()
. allocateBlock()
делаетstd::malloc(blockSize * blocksCount)
и разбивает память на узлыFreeNode
, каждый из которых содержит указательnext
на следующий свободный. Все новые узлы прицепляются кfreeList_
.allocate()
извлекает первый узел изfreeList_
и возвращает его адрес.deallocate(ptr)
– помещает данный узел обратно вfreeList_
.
- При запросе
- Защита потоков обеспечивается
std::mutex mtx_
. - Преимущество: быстрая аллокация/делокация одноразмерных блоков без перехождения на
malloc
/free
каждый раз.
ThreadPool
вthread_pool.cpp
/thread_pool.hpp
.- Конструктор создаёт N потоков (N = аргумент или
std::thread::hardware_concurrency()
). - Задачи передаются через
submit(std::function<void()>)
. - Внутри:
std::queue<std::function<void()>> tasks_
, защищеннаяstd::mutex queueMutex_
.std::condition_variable condition_
, на которую “спят” воркеры.std::atomic<size_t> activeTasks_
, чтобы отслеживать количество выполняемых задач (можно добавить ожидание завершения).
- Каждый воркер в
workerThread()
:- Ждёт
condition_.wait(...)
, пока неstop_
или вtasks_
не появится задача. - Если
stop_
иtasks_
пустой — выходим. - Иначе забираем задачу, выходим из мьютекса, выполняем задачу (в
try/catch
). - Уменьшаем
activeTasks_
.
- Ждёт
- При вызове
shutdown()
устанавливаемstop_ = true
,condition_.notify_all()
, и ждемjoin()
всех потоков.
- Асинхронный ввод-вывод реализован на базе C++20 корутин.
EventLoop
(синглтон) запускается в основном потоке (послеaccept_loop
) и вызываетepoll_wait
(Linux) илиselect
(Windows) в бесконечном цикле, а затем пробуждает соответствующие корутины черезhandle.resume()
.- В
ReadAwaitable::await_suspend(h)
/WriteAwaitable::await_suspend(h)
корутина регистрируется вEventLoop
, сохраняяcoroutine_handle
. Когда дескриптор готов,await_resume()
либо читает (::read
) либо пишет (::write
) данные. - Благодаря неблокирующему режиму FD (fcntl/
ioctlsocket
) иEPOLLET
, корутины будут возобновляться только при реальном приходе данных.
- Класс
kv::ShardedHashMap<Key, Value>
– контейнер, объединяющий N независимых сегментовHashTable<Key,Value> + mutex
. - При
put/get/erase(key)
вычисляется:hval = Hash{}(key)
idx = hval % num_shards
- Берётся
std::lock_guard
наmutexes_[idx]
, вызывается соответствующий метод уtables_[idx]
.
- В ядре каждый сегмент — простая хеш-таблица (в
hash_table.hpp
), основанная на методе цепочек. - Шардирование позволяет распараллелить доступ к map: потоки, работающие с разными ключами, вероятнее работают с разными сегментами, что снижает конкуренцию.
config.hpp
:namespace kv::config { // Порт TCP‐сервера (можно изменить на любой свободный) inline constexpr std::uint16_t SERVER_PORT = 4000; // Размерность пула потоков по умолчанию. inline constexpr std::size_t THREAD_POOL_SIZE = 4; // Количество сегментов (shards) в sharded hash map. inline constexpr std::size_t HASH_MAP_SHARDS = 16; // Максимальная длина ключа (в байтах), если вы лимитируете строковые ключи. inline constexpr std::size_t MAX_KEY_SIZE = 128; // Максимальный размер значения (value) в байтах. inline constexpr std::size_t MAX_VALUE_SIZE = 1024 * 10; // 10 KB // Лимит одновременных соединений (можно использовать для балансировки). inline constexpr std::size_t MAX_CONNECTIONS = 1024; // Логический флаг: включать ли расширенную (debug) трассировку. inline constexpr bool ENABLE_DEBUG_LOG = true; }
- Файл
kv/logger.hpp
содержит перечисление уровней:enum class Level { TRACE, DEBUG, INFO, WARN, ERROR, FATAL }; struct LoggerConfig { Level level; bool to_console; bool to_file; std::string filename; };
- Инициализация:
kv::log::LoggerConfig cfg; cfg.level = kv::log::Level::DEBUG; cfg.to_console = true; cfg.to_file = true; cfg.filename = "kv_server.log"; kv::log::Logger::instance().init(cfg);
- Использование:
LOG_INFO("Some message");
LOG_DEBUG("Debug details: x=", x);
- При уровне
FATAL
приложение немедленно завершится (std::exit(1)
).
Этот проект распространяется под лицензией MIT.