The Thread System Project is a comprehensive, production-ready C++20 multithreading framework designed to democratize concurrent programming. By providing intuitive abstractions and robust implementations, it empowers developers of all skill levels to build high-performance, thread-safe applications without the typical complexity and pitfalls of manual thread management.
This project addresses the fundamental challenge faced by developers worldwide: making concurrent programming accessible, safe, and efficient. Traditional threading approaches often lead to complex code, hard-to-debug race conditions, and performance bottlenecks. Our mission is to provide a comprehensive solution that:
- Eliminates threading complexity through intuitive, high-level abstractions
- Ensures thread safety by design, preventing common concurrency bugs
- Maximizes performance through optimized algorithms and modern C++ features
- Promotes code reusability across different platforms and use cases
- Accelerates development by providing ready-to-use threading components
- Zero-overhead abstractions: Modern C++ design ensures minimal runtime cost
- Optimized data structures: Adaptive algorithms and cache-friendly designs
- Adaptive scheduling: Type-based job processing for optimal resource utilization
- Scalable architecture: Linear performance scaling with hardware thread count
- Thread-safe by design: All components guarantee safe concurrent access
- Comprehensive error handling: Robust error reporting and recovery mechanisms
- Memory safety: RAII principles and smart pointers prevent leaks and corruption
- Extensive testing: 95%+ CI/CD success rate across multiple platforms and compilers
- Intuitive API design: Clean, self-documenting interfaces reduce learning curve
- Rich documentation: Comprehensive Doxygen documentation with examples
- Flexible configuration: Adaptive queues with automatic optimization
- Debugging support: Built-in logging and monitoring capabilities
- Universal support: Works on Windows, Linux, and macOS
- Compiler flexibility: Compatible with GCC, Clang, and MSVC
- C++ standard adaptation: Graceful fallback from C++20 to older standards
- Architecture independence: Optimized for both x86 and ARM processors
- Type-based scheduling: Sophisticated job type specialization for real-time systems
- Asynchronous logging: High-performance, non-blocking logging system
- Resource monitoring: Built-in performance metrics and health checks
- Modular design: Use individual components or the complete framework
- High-frequency trading systems: Microsecond-level latency requirements
- Game engines: Real-time rendering and physics simulation
- Web servers: Concurrent request processing with type handling
- Scientific computing: Parallel algorithm execution and data processing
- Media processing: Video encoding, image processing, and audio streaming
- IoT systems: Sensor data collection and real-time response systems
Benchmarked on Apple M1 (8-core) @ 3.2GHz, 16GB, macOS Sonoma, Apple Clang 17.0.0
π Architecture Update: Latest simplified architecture (2025-07-09) removed ~2,800 lines of duplicate code while maintaining all performance capabilities. Adaptive queues now provide automatic optimization for all workload scenarios.
- Peak Throughput: Up to 13.0M jobs/second (1 worker, empty jobs - theoretical)
- Real-world Throughput:
- Standard thread pool: 1.16M jobs/s (10 workers, proven in production)
- Typed thread pool: 1.24M jobs/s (6 workers, 3 types)
- Adaptive queues: Automatic optimization for all scenarios
- Job scheduling latency:
- Standard pool: ~77 nanoseconds (reliable baseline)
- Adaptive queues: 96-580ns with automatic strategy selection
- Queue operations: Adaptive strategy provides up to 7.7x faster when needed
- High contention: Adaptive mode provides up to 3.46x improvement when beneficial
- Priority scheduling: Type-based routing with high accuracy under all conditions
- Memory efficiency: <1MB baseline, reduced codebase by ~2,800 lines
- Scalability: Adaptive architecture maintains performance under any contention level
- Wake interval access: 5% performance impact with mutex protection
- Cancellation token: 3% overhead for proper double-check pattern
- Job queue operations: 4% performance improvement after removing redundant atomic counter
Real-World Performance (measured with actual workloads):
Measured Performance (actual workloads):
Configuration | Throughput | Time/1M jobs | Workers | Notes |
---|---|---|---|---|
Basic Pool | 1.16M/s | 862 ms | 10 | π Real-world baseline performance |
Adaptive Pool | Dynamic | Optimized | Variable | π Automatic optimization based on load |
Type Pool | 1.24M/s | 806 ms | 6 | β 6.9% faster with fewer workers |
Adaptive Queues | Dynamic | Optimized | Auto | π Automatic optimization |
Peak (empty) | 13.0M/s | - | 1 | π Theoretical maximum |
Adaptive Queue Performance (Automatic Optimization):
Contention Level | Strategy Selected | Latency | vs Mutex-only | Benefit |
---|---|---|---|---|
Low (1-2 threads) | Mutex | 96 ns | Baseline | Optimal for low load |
Medium (4 threads) | Adaptive | 142 ns | +8.2% faster | Balanced performance |
High (8+ threads) | Lock-free | 320 ns | +37% faster | Scales under contention |
Variable Load | Auto-switching | Dynamic | Optimized | Automatic |
Real Workload Performance (8-worker configuration):
Job Complexity | Throughput | Use Case | Scaling Efficiency |
---|---|---|---|
Empty job | 8.2M/s | π Framework overhead measurement | 95% |
1 ΞΌs work | 1.5M/s | β‘ Very light computations | 94% |
10 ΞΌs work | 540K/s | π§ Typical small tasks | 92% |
100 ΞΌs work | 70K/s | π» Medium computations | 90% |
1 ms work | 7.6K/s | π₯ Heavy computations | 88% |
10 ms work | 760/s | ποΈ Very heavy computations | 85% |
Worker Thread Scaling Analysis:
Workers | Speedup | Efficiency | Performance Rating | Recommended Use |
---|---|---|---|---|
1 | 1.0x | π― 100% | π₯ Excellent | Single-threaded workloads |
2 | 2.0x | π 99% | π₯ Excellent | Dual-core systems |
4 | 3.9x | π 97.5% | π₯ Excellent | Quad-core optimal |
8 | 7.7x | π 96% | π₯ Very Good | Standard multi-core |
16 | 15.0x | π 94% | π₯ Very Good | High-end workstations |
32 | 28.3x | π 88% | π₯ Good | Server environments |
Library Performance Comparison (Real-world measurements):
Library | Throughput | Performance | Verdict | Key Features |
---|---|---|---|---|
π₯ Thread System (Typed) | 1.24M/s | π’ 107% | β Excellent | Priority scheduling, adaptive queues, C++20 |
π₯ Intel TBB | ~1.24M/s | π’ 107% | β Excellent | Industry standard, work stealing |
π Thread System (Standard) | 1.16M/s | π’ 100% | β Baseline | Adaptive queues, proven performance |
π¦ Boost.Thread Pool | ~1.09M/s | π‘ 94% | β Good | Header-only, portable |
π¦ OpenMP | ~1.06M/s | π‘ 92% | β Good | Compiler directives, easy to use |
π¦ Microsoft PPL | ~1.02M/s | π‘ 88% | β Good | Windows-specific |
π std::async | ~267K/s | π΄ 23% | Standard library, basic functionality |
Logger Performance Comparison (High-contention scenario):
Logger Type | Single Thread | 4 Threads | 8 Threads | 16 Threads | Best Use Case |
---|---|---|---|---|---|
π Thread System Logger | 4.41M/s | 1.07M/s | 0.41M/s | 0.39M/s | All scenarios (adaptive) |
π₯ Standard Mode | 4.41M/s | 0.86M/s | 0.23M/s | 0.18M/s | Low concurrency |
π Adaptive Benefit | 0% | +24% | +78% | +117% | Auto-optimization |
Logger vs Industry Standards (spdlog comparison included):
System | Single-thread | 4 Threads | 8 Threads | Latency | vs Console |
---|---|---|---|---|---|
π Console | 583K/s | - | - | 1,716 ns | Baseline |
π TS Logger | 4.34M/s | 1.07M/s | 412K/s | 148 ns | π 7.4x |
π¦ spdlog | 515K/s | 210K/s | 52K/s | 2,333 ns | π΄ 0.88x |
β‘ spdlog async | 5.35M/s | 785K/s | 240K/s | - | π 9.2x |
Key Insights:
- π Single-thread: spdlog async wins (5.35M/s) but TS Logger close behind (4.34M/s)
- ποΈ Multi-thread: TS Logger with adaptive queues shows consistent performance
- β±οΈ Latency: TS Logger wins with 148ns (15.7x lower than spdlog)
- π Scalability: Adaptive mode provides automatic optimization
Type-based Thread Pool Performance Comparison:
Mutex-based Implementation:
Complexity | vs Basic Pool | Type Accuracy | Performance | Best For |
---|---|---|---|---|
Single Type | π -3% | π― 100% | 525K/s | Specialized workloads |
3 Types | π -9% | π― 99.6% | 495K/s | Standard prioritization |
Real Workload | π +6.9% | π― 100% | 1.24M/s | Actual measurement |
With Adaptive Queues:
Scenario | Performance | vs Standard | Type Accuracy | Notes |
---|---|---|---|---|
Low contention | 1.24M/s | Same | π― 100% | Mutex strategy selected |
High contention | Dynamic | Up to +71% | π― 99%+ | Lock-free mode engaged |
Mixed workload | Optimized | Automatic | π― 99.5% | Strategy switches as needed |
Real measurement | 1.24M/s | +6.9% | π― 100% | Production workload |
Memory Usage & Creation Performance:
Workers | Creation Time | Memory Usage | Efficiency | Resource Rating |
---|---|---|---|---|
1 | π’ 162 ns | π 1.2 MB | π― 100% | β‘ Ultra-light |
4 | π’ 347 ns | π 1.8 MB | π 98% | β‘ Very light |
8 | π‘ 578 ns | π 2.6 MB | π 96% | π Light |
16 | π‘ 1.0 ΞΌs | π‘ 4.2 MB | π 94% | π Moderate |
32 | π 2.0 ΞΌs | π 7.4 MB | π‘ 88% | π Heavy |
For comprehensive performance analysis and optimization techniques, see the Performance Guide.
- C++20 features:
std::jthread
,std::format
, concepts, and ranges - Template metaprogramming: Type-safe, compile-time optimizations
- Memory management: Smart pointers and RAII for automatic resource cleanup
- Exception safety: Strong exception safety guarantees throughout
- Adaptive algorithms: MPMC queues, automatic strategy selection, and atomic operations
- SIMD optimization: Vectorized operations where applicable
- Command Pattern: Job encapsulation for flexible task execution
- Observer Pattern: Event-driven logging and monitoring
- Factory Pattern: Configurable thread pool creation
- Singleton Pattern: Global logger access with thread safety
- Template Method Pattern: Customizable thread behavior
- Strategy Pattern: Configurable backoff strategies and scheduling policies
thread_system/
βββ π sources/ # Core source code
β βββ π thread_base/ # Base threading functionality
β β βββ core/ # Core classes (thread_base, thread_conditions)
β β βββ jobs/ # Job system (job, callback_job, job_queue)
β β βββ lockfree/ # Lock-free queue implementations (for adaptive mode)
β β β βββ memory/ # Hazard pointers, node pools, memory reclamation
β β β βββ queues/ # MPMC queue, adaptive queue, strategy selection
β β βββ sync/ # Synchronization primitives, atomic operations
β βββ π thread_pool/ # Thread pool implementations
β β βββ core/ # Pool classes
β β β βββ thread_pool.h/cpp # Standard pool with adaptive queue support
β β βββ workers/ # Worker implementations
β β β βββ thread_worker.h/cpp # Standard worker
β β βββ async/ # Future-based tasks
β βββ π typed_thread_pool/ # Type-based thread pool with adaptive queues
β β βββ core/ # Job types and interfaces (job_types.h, typed_job_interface.h)
β β βββ jobs/ # Typed job implementations
β β β βββ typed_job.h/tpp # Base typed job template
β β β βββ callback_typed_job.h/tpp # Lambda-based typed jobs
β β βββ pool/ # Thread pool implementations
β β β βββ typed_thread_pool.h/tpp # Adaptive pool with automatic optimization
β β βββ scheduling/ # Job queues and workers
β β βββ adaptive_typed_job_queue.h/tpp/cpp # Adaptive priority queue
β β βββ typed_lockfree_job_queue.h/tpp/cpp # Lock-free queue (for adaptive mode)
β β βββ typed_thread_worker.h/tpp # Adaptive worker
β βββ π logger/ # Asynchronous logging system
β β βββ core/ # Logger implementation
β β β βββ logger_implementation.h/cpp # Standard mutex-based logger
β β β βββ log_collector.h/cpp # Adaptive log collector
β β βββ types/ # Log types and formatters
β β βββ writers/ # Console, file, callback writers
β β βββ jobs/ # Log job processing
β βββ π utilities/ # Utility functions
β β βββ core/ # formatter, span
β β βββ conversion/ # String conversions
β β βββ time/ # Date/time utilities
β β βββ io/ # File handling
β βββ π monitoring/ # Real-time monitoring system
β βββ core/ # Metrics collector, monitoring types
β βββ storage/ # Ring buffer for time-series data
βββ π samples/ # Example applications
β βββ thread_pool_sample/ # Basic thread pool usage
β βββ typed_thread_pool_sample/ # Mutex-based priority scheduling
β βββ typed_thread_pool_sample_2/ # Advanced typed pool usage
β βββ logger_sample/ # Logging examples
β βββ monitoring_sample/ # Real-time metrics collection
β βββ mpmc_queue_sample/ # Adaptive MPMC queue usage
β βββ hazard_pointer_sample/ # Memory reclamation demo
β βββ node_pool_sample/ # Memory pool operations
β βββ adaptive_queue_sample/ # Adaptive queue selection
β βββ typed_thread_pool_sample_2/ # Custom job types
βββ π unittest/ # Unit tests (Google Test)
β βββ thread_base_test/ # Base thread functionality tests
β βββ thread_pool_test/ # Thread pool tests
β βββ typed_thread_pool_test/ # Typed pool tests
β βββ logger_test/ # Logger tests
β βββ utilities_test/ # Utility function tests
βββ π benchmarks/ # Performance benchmarks
β βββ thread_base_benchmarks/ # Core threading benchmarks
β βββ thread_pool_benchmarks/ # Pool performance tests
β β βββ thread_pool_benchmark.cpp # Core pool metrics
β β βββ adaptive_comparison_benchmark.cpp # π Standard vs adaptive
β β βββ memory_benchmark.cpp # Memory usage patterns
β β βββ real_world_benchmark.cpp # Realistic workloads
β β βββ stress_test_benchmark.cpp # Extreme load testing
β β βββ scalability_benchmark.cpp # Multi-core scaling
β β βββ contention_benchmark.cpp # Contention scenarios
β βββ typed_thread_pool_benchmarks/ # Typed pool benchmarks
β β βββ typed_scheduling_benchmark.cpp # Priority scheduling
β β βββ typed_lockfree_benchmark.cpp # π Lock-free vs mutex
β β βββ queue_comparison_benchmark.cpp # π Queue performance
β βββ logger_benchmarks/ # Logging performance
β βββ monitoring_benchmarks/ # Monitoring overhead
βββ π docs/ # Documentation
βββ π cmake/ # CMake modules
βββ π CMakeLists.txt # Main build configuration
βββ π vcpkg.json # Dependencies
βββ π§ build.sh/.bat # Build scripts
thread_base.h/cpp
: Abstract base class for all worker threadsjob.h/cpp
: Abstract interface for work unitsjob_queue.h/cpp
: Thread-safe FIFO queue implementationcallback_job.h/cpp
: Lambda-based job implementation
thread_pool.h/cpp
: Main thread pool class managing workersthread_worker.h/cpp
: Worker thread that processes jobsfuture_extensions.h
: Future-based task extensions for async results
typed_thread_pool.h/tpp
: Template-based priority thread pooltyped_job_queue.h/tpp
: Priority queue for typed jobstyped_thread_worker.h/tpp
: Workers with type responsibility listsjob_types.h
: Default priority enumeration (RealTime, Batch, Background)
typed_thread_pool.h/tpp
: Adaptive priority thread pool implementationadaptive_typed_job_queue.h/tpp/cpp
: Per-type adaptive MPMC queuestyped_thread_worker.h/tpp
: Adaptive worker with priority handlingtyped_queue_statistics_t
: Performance monitoring and metrics collection
logger.h
: Public API with free functionslog_collector.h/cpp
: Central log message routerconsole_writer.h/cpp
: Colored console outputfile_writer.h/cpp
: Rotating file logger
utilities (no dependencies)
β
βββ> thread_base
β β
β βββ> thread_pool
β β
β βββ> typed_thread_pool
β β
β βββ typed_thread_pool (adaptive)
β
βββ> logger
β
βββ> monitoring
build/
βββ bin/ # Executable files
β βββ thread_pool_sample
β βββ typed_thread_pool_sample # Mutex-based
β βββ typed_thread_pool_sample_2 # Advanced usage
β βββ logger_sample
β βββ monitoring_sample
β βββ adaptive_benchmark # π Performance comparison
β βββ queue_comparison_benchmark # π Queue benchmarks
β βββ ...
βββ lib/ # Static libraries
β βββ libthread_base.a
β βββ libthread_pool.a
β βββ libtyped_thread_pool.a # Includes both mutex & lock-free
β βββ liblogger.a
β βββ libutilities.a
β βββ libmonitoring.a
βββ include/ # Public headers (for installation)
thread_base
class: The foundational abstract class for all thread operations- Supports both
std::jthread
(C++20) andstd::thread
through conditional compilation - Provides lifecycle management (start/stop) and customizable hooks
- Supports both
job
class: Abstract base class for units of workcallback_job
class: Concrete job implementation usingstd::function
job_queue
class: Thread-safe queue for job management- Adaptive components:
adaptive_job_queue
: Dual-mode queue supporting both mutex and lock-free strategieslockfree_job_queue
: Lock-free MPMC queue (utilized by adaptive mode)hazard_pointer
: Safe memory reclamation for lock-free data structuresnode_pool
: Memory pool for queue operations
- Namespace-level logging functions:
write_information()
,write_error()
,write_debug()
, etc. log_types
enum: Bitwise-enabled log levels (Exception, Error, Information, Debug, Sequence, Parameter)- Multiple output targets:
console_writer
: Asynchronous console output with color supportfile_writer
: Rotating file output with backup supportcallback_writer
: Custom callback for log processing
log_collector
class: Central hub for log message routing and processing- Configuration functions:
set_title()
,console_target()
,file_target()
, etc.
thread_pool
class: Thread pool with adaptive queue support- Dynamic worker addition/removal
- Dual-mode job queue architecture (mutex and lock-free)
- Proven reliability for general workloads
thread_worker
class: Worker thread implementation supporting adaptive queues
- Adaptive job queues: Dual-mode queue implementation with automatic optimization
- Dynamic strategy selection between mutex and lock-free modes
- MPMC queue with hazard pointers when needed
- Intelligent backoff for contention handling
- Batch processing support for improved throughput
- Per-worker statistics tracking
- Optional batch processing mode
- Configurable backoff strategies
task<T>
template: Future-based task wrapper for async results- Builder pattern support: Fluent API for pool configuration
- Drop-in compatibility: Same API for easy migration
metrics_collector
class: Real-time performance metrics collection engine- Cross-platform system metrics: Memory usage, CPU utilization, active threads
- Thread pool monitoring: Job completion rates, queue depths, worker utilization
- Lock-free storage: Memory-efficient ring buffer for time-series data
- Easy integration: Global singleton collector with simple API
- Key features:
- Real-time data collection (100ms-1s intervals)
- Thread-safe metric registration and updates
- Configurable buffer sizes and collection intervals
- Zero-overhead when disabled
The framework provides two distinct typed thread pool implementations optimized for different scenarios:
typed_thread_pool
class: Priority thread pool with adaptive queue support- Best for: Type-based job scheduling with automatic optimization
- Performance: Adaptive queues provide optimal performance for varying workloads
- Features:
- Per-type adaptive queues: Each job type can use optimized queue strategy
- Priority-based routing: RealTime > Batch > Background ordering
- Adaptive queue support: Uses dual-mode queues for optimal performance
- Dynamic queue creation: Automatic type queue lifecycle management
- Advanced statistics: Per-type metrics and performance monitoring
job_types
enum: Default priority levels (RealTime, Batch, Background)- Type-aware components:
typed_job_t<T>
: Jobs with associated type/priorityadaptive_typed_job_queue_t<T>
: Adaptive priority queue implementationtyped_lockfree_job_queue_t<T>
: Lock-free priority queue (utilized by adaptive mode)typed_thread_worker_t<T>
: Workers with adaptive queue handling
callback_typed_job<T>
: Lambda-based typed job implementation- Custom type support: Use your own enums or types for job prioritization
- Use Adaptive Implementation: Automatic optimization for all scenarios
- Benefits: Simplified deployment with automatic performance tuning
- Adaptive implementation strategy: Automatic optimization based on runtime conditions
- Type-aware job distribution: Workers can handle multiple type levels with configurable responsibility lists
- Priority-based scheduling: Adaptive implementation provides optimal priority ordering (RealTime > Batch > Background)
- Dynamic type adaptation: Runtime adjustment of worker responsibilities based on workload patterns
- FIFO guarantee: Strict first-in-first-out ordering within same type levels
- Per-type queue optimization: Adaptive implementation uses optimized queues for each job type
- Advanced contention handling: Automatic strategy selection with hazard pointers for safe memory reclamation
- Scalable architecture: Dynamic scaling optimization based on contention patterns
- Hierarchical design: Clean
thread_base
foundation with specialized derived classes - C++20 compatibility: Full support for
std::jthread
with graceful fallback tostd::thread
- Cancellation support: Cooperative task cancellation using
std::stop_token
- Custom thread naming: Enhanced debugging with meaningful thread identification
- Wake interval support: Periodic task execution without busy waiting
- Result type: Modern error handling with monadic operations
- Real-time metrics: Job processing rates, queue depths, and worker utilization
- Performance profiling: Built-in timing and bottleneck identification
- Health checks: Automatic detection of thread failures and recovery
- Comprehensive logging: Multi-level, multi-target logging with asynchronous processing
- Template-based flexibility: Custom type types and job implementations
- Runtime configuration: JSON-based configuration for deployment flexibility
- Compile-time optimization: Conditional feature compilation for minimal overhead
- Builder pattern: Fluent API for easy thread pool construction
- Exception safety: Strong exception safety guarantees throughout the framework
- Resource leak prevention: Automatic cleanup using RAII principles
- Deadlock prevention: Careful lock ordering and timeout mechanisms
- Memory corruption protection: Smart pointer usage and bounds checking
#include "thread_pool/core/thread_pool.h"
#include "thread_base/jobs/callback_job.h"
#include "logger/core/logger.h"
using namespace thread_pool_module;
using namespace thread_module;
int main() {
// 1. Start the logger
log_module::start();
// 2. Create a high-performance adaptive thread pool
auto pool = std::make_shared<thread_pool>();
// 3. Add workers with adaptive queue optimization
std::vector<std::unique_ptr<thread_worker>> workers;
for (int i = 0; i < std::thread::hardware_concurrency(); ++i) {
auto worker = std::make_unique<thread_worker>();
worker->set_batch_processing(true, 32); // Process up to 32 jobs at once
workers.push_back(std::move(worker));
}
pool->enqueue_batch(std::move(workers));
// 4. Start processing
pool->start();
// 5. Submit jobs - adaptive pool handles varying contention efficiently
std::atomic<int> counter{0};
const int total_jobs = 100000;
for (int i = 0; i < total_jobs; ++i) {
pool->enqueue(std::make_unique<callback_job>(
[&counter, i]() -> result_void {
counter.fetch_add(1);
if (i % 10000 == 0) {
log_module::write_information("Processed {} jobs", i);
}
return {};
}
));
}
// 6. Wait for completion with progress monitoring
auto start_time = std::chrono::high_resolution_clock::now();
while (counter.load() < total_jobs) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
auto end_time = std::chrono::high_resolution_clock::now();
// 7. Get comprehensive performance statistics
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
auto throughput = static_cast<double>(total_jobs) / duration.count() * 1000.0;
log_module::write_information("Performance Results:");
log_module::write_information("- Total jobs: {}", total_jobs);
log_module::write_information("- Execution time: {} ms", duration.count());
log_module::write_information("- Throughput: {:.2f} jobs/second", throughput);
auto workers_list = pool->get_workers();
for (size_t i = 0; i < workers_list.size(); ++i) {
auto stats = static_cast<thread_worker*>(workers_list[i].get())->get_statistics();
log_module::write_information("Worker {}: {} jobs, avg time: {} ns, {} batch ops",
i, stats.jobs_processed,
stats.avg_processing_time_ns,
stats.batch_operations);
}
// 8. Clean shutdown
pool->stop();
log_module::stop();
return 0;
}
Performance Tip: The adaptive queues automatically optimize for your workload. They provide both mutex-based reliability and lock-free performance when beneficial.
#include "thread_pool/core/thread_pool.h"
#include "thread_base/jobs/callback_job.h"
using namespace thread_pool_module;
using namespace thread_module;
// Create a simple thread pool for low-contention workloads
auto pool = std::make_shared<thread_pool>("StandardPool");
// Add workers
std::vector<std::unique_ptr<thread_worker>> workers;
for (int i = 0; i < 4; ++i) { // Few workers for simple tasks
workers.push_back(std::make_unique<thread_worker>());
}
pool->enqueue_batch(std::move(workers));
pool->start();
// Submit jobs
for (int i = 0; i < 100; ++i) {
pool->enqueue(std::make_unique<callback_job>(
[i]() -> result_void {
// Process data
std::this_thread::sleep_for(std::chrono::milliseconds(10));
log_module::write_debug("Processed item {}", i);
return {};
}
));
}
#include "thread_pool/core/thread_pool.h"
#include "thread_base/jobs/callback_job.h"
using namespace thread_pool_module;
using namespace thread_module;
// Create adaptive pool for high-contention scenarios
auto pool = std::make_shared<thread_pool>("AdaptivePool");
// Configure workers for maximum throughput
std::vector<std::unique_ptr<thread_worker>> workers;
for (int i = 0; i < std::thread::hardware_concurrency(); ++i) {
auto worker = std::make_unique<thread_worker>();
// Enable batch processing for better throughput
worker->set_batch_processing(true, 64);
workers.push_back(std::move(worker));
}
pool->enqueue_batch(std::move(workers));
pool->start();
// Submit jobs from multiple threads (high contention)
// Adaptive queues will automatically switch to lock-free mode when beneficial
std::vector<std::thread> producers;
for (int t = 0; t < 8; ++t) {
producers.emplace_back([&pool, t]() {
for (int i = 0; i < 10000; ++i) {
pool->enqueue(std::make_unique<callback_job>(
[t, i]() -> result_void {
// Fast job execution
std::atomic<int> sum{0};
for (int j = 0; j < 100; ++j) {
sum.fetch_add(j);
}
return {};
}
));
}
});
}
// Wait for all producers
for (auto& t : producers) {
t.join();
}
// Get detailed statistics
auto workers_vec = pool->get_workers();
for (size_t i = 0; i < workers_vec.size(); ++i) {
auto stats = static_cast<thread_worker*>(
workers_vec[i].get())->get_statistics();
log_module::write_information(
"Worker {}: {} jobs, {} ΞΌs avg, {} batch ops",
i, stats.jobs_processed,
stats.avg_processing_time_ns / 1000,
stats.batch_operations
);
}
#include "logger/core/logger.h"
// Configure logger
log_module::set_title("MyApplication");
log_module::console_target(log_module::log_types::Information |
log_module::log_types::Error);
log_module::file_target(log_module::log_types::All);
// Start logger
log_module::start();
// Use various log levels
log_module::write_information("Application started");
log_module::write_debug("Debug mode enabled");
log_module::write_error("Example error: {}", error_code);
log_module::write_sequence("Processing step {}/{}", current, total);
// Custom callback for critical errors
log_module::callback_target(log_module::log_types::Exception);
log_module::message_callback(
[](const log_module::log_types& type,
const std::string& datetime,
const std::string& message) {
if (type == log_module::log_types::Exception) {
send_alert_email(message);
}
}
);
#include "logger/core/logger.h"
using namespace log_module;
// Configure the logger for high-performance scenarios
log_module::set_title("HighPerformanceApp");
log_module::console_target(log_types::Information);
log_module::file_target(log_types::Information);
// Start the logger
log_module::start();
// High-frequency logging from multiple threads
// The logger automatically adapts to contention patterns
std::vector<std::thread> log_threads;
for (int t = 0; t < 16; ++t) {
log_threads.emplace_back([t]() {
for (int i = 0; i < 10000; ++i) {
log_module::write_information(
"Thread {} - High-frequency log message {}", t, i);
}
});
}
// Wait for all threads
for (auto& t : log_threads) {
t.join();
}
// Adaptive logger provides excellent performance:
// - Automatic optimization based on contention
// - Efficient multi-threaded operation
// - Up to 238% better throughput at 16 threads
// - Ideal for high-concurrency logging scenarios
log_module::stop();
#include "monitoring/core/metrics_collector.h"
#include "thread_pool/core/thread_pool.h"
using namespace monitoring_module;
using namespace thread_pool_module;
// Start monitoring system
monitoring_config config;
config.collection_interval = std::chrono::milliseconds(100); // 100ms intervals
metrics::start_global_monitoring(config);
// Create and monitor a thread pool
auto pool = std::make_shared<thread_pool>();
pool->start();
// Register thread pool metrics
auto collector = global_metrics_collector::instance().get_collector();
auto pool_metrics = std::make_shared<thread_pool_metrics>();
collector->register_thread_pool_metrics(pool_metrics);
// Submit jobs and monitor in real-time
for (int i = 0; i < 1000; ++i) {
pool->enqueue(std::make_unique<callback_job>([&pool_metrics]() -> result_void {
// Update metrics
pool_metrics->jobs_completed.fetch_add(1);
return {};
}));
}
// Get real-time metrics
auto snapshot = metrics::get_current_metrics();
std::cout << "Jobs completed: " << snapshot.thread_pool.jobs_completed.load() << "\n";
std::cout << "Memory usage: " << snapshot.system.memory_usage_bytes.load() << " bytes\n";
// Stop monitoring
metrics::stop_global_monitoring();
Our samples demonstrate real-world usage patterns and best practices:
- Adaptive Thread Pool: Thread pool with adaptive queue optimization
- Typed Thread Pool: Priority scheduling with adaptive per-type queues
- Adaptive MPMC Queue: Core adaptive data structure fundamentals
- Hazard Pointers: Safe memory reclamation for lock-free programming
- Node Pool: Memory pool operations for adaptive queues
- Basic Thread Pool: Simple job processing with adaptive queue optimization
- Typed Thread Pool: Priority-based task scheduling with adaptive queues
- Custom Job Types: Extending the framework with domain-specific types
- Real-time Monitoring: Live performance metrics and system monitoring
- Asynchronous Logging: High-performance, multi-target logging system
- CMake 3.16 or later
- C++20 capable compiler (GCC 9+, Clang 10+, MSVC 2019+)
- vcpkg package manager (automatically installed by dependency scripts)
# Clone the repository
git clone https://github.com/kcenon/thread_system.git
cd thread_system
# Install dependencies via vcpkg
./dependency.sh # Linux/macOS
./dependency.bat # Windows
# Build the project
./build.sh # Linux/macOS
./build.bat # Windows
# Run samples
./build/bin/thread_pool_sample
./build/bin/typed_thread_pool_sample
./build/bin/logger_sample
# Run tests (Linux/Windows only, disabled on macOS)
cd build && ctest --verbose
# Using as a subdirectory
add_subdirectory(thread_system)
target_link_libraries(your_target PRIVATE
thread_base
thread_pool
typed_thread_pool
logger
)
# Using with FetchContent
include(FetchContent)
FetchContent_Declare(
thread_system
GIT_REPOSITORY https://github.com/kcenon/thread_system.git
GIT_TAG main
)
FetchContent_MakeAvailable(thread_system)
- API Reference: Complete API documentation
- Architecture Guide: System design and internals
- Performance Guide: Optimization tips and benchmarks
- Examples: Comprehensive code examples
- FAQ: Frequently asked questions
// Thread Pool API
namespace thread_pool_module {
// Thread pool with adaptive queue support
class thread_pool {
auto start() -> std::optional<std::string>;
auto stop(bool immediately = false) -> void;
auto enqueue(std::unique_ptr<job>&& job) -> std::optional<std::string>;
auto enqueue_batch(std::vector<std::unique_ptr<job>>&& jobs) -> std::optional<std::string>;
auto get_workers() const -> const std::vector<std::shared_ptr<thread_worker>>&;
auto get_queue_statistics() const -> queue_statistics;
};
// Thread worker with adaptive capabilities
class thread_worker : public thread_base {
struct worker_statistics {
uint64_t jobs_processed;
uint64_t total_processing_time_ns;
uint64_t batch_operations;
uint64_t avg_processing_time_ns;
};
auto set_batch_processing(bool enabled, size_t batch_size = 32) -> void;
auto get_statistics() const -> worker_statistics;
};
}
// Typed Thread Pool API (Mutex-based)
namespace typed_thread_pool_module {
template<typename T>
class typed_thread_pool_t {
auto start() -> result_void;
auto stop(bool clear_queue = false) -> result_void;
auto enqueue(std::unique_ptr<typed_job_t<T>>&& job) -> result_void;
auto enqueue_batch(std::vector<std::unique_ptr<typed_job_t<T>>>&& jobs) -> result_void;
};
// Adaptive Typed Queue API (supports both mutex and lock-free modes)
template<typename T>
class adaptive_typed_job_queue_t {
auto enqueue(std::unique_ptr<typed_job_t<T>>&& job) -> result_void;
auto dequeue() -> result<std::unique_ptr<job>>;
auto dequeue(const T& type) -> result<std::unique_ptr<typed_job_t<T>>>;
auto size() const -> std::size_t;
auto empty() const -> bool;
auto get_typed_statistics() const -> typed_queue_statistics_t<T>;
};
// Lock-free Queue (utilized by adaptive mode when beneficial)
template<typename T>
class typed_lockfree_job_queue_t {
auto enqueue(std::unique_ptr<typed_job_t<T>>&& job) -> result_void;
auto dequeue() -> result<std::unique_ptr<job>>;
auto dequeue(const T& type) -> result<std::unique_ptr<typed_job_t<T>>>;
auto size() const -> std::size_t;
auto empty() const -> bool;
auto get_typed_statistics() const -> typed_queue_statistics_t<T>;
};
}
// Monitoring API
namespace monitoring_module {
class metrics_collector {
auto start() -> thread_module::result_void;
auto stop() -> void;
auto get_current_snapshot() const -> metrics_snapshot;
auto register_system_metrics(std::shared_ptr<system_metrics> metrics) -> void;
auto register_thread_pool_metrics(std::shared_ptr<thread_pool_metrics> metrics) -> void;
};
// Convenience functions
namespace metrics {
auto start_global_monitoring(monitoring_config config = {}) -> thread_module::result_void;
auto stop_global_monitoring() -> void;
auto get_current_metrics() -> metrics_snapshot;
auto is_monitoring_active() -> bool;
}
}
// Logger API
namespace log_module {
auto start() -> std::optional<std::string>;
auto stop() -> void;
template<typename... Args>
auto write_information(const char* format, const Args&... args) -> void;
auto write_error(const char* format, const Args&... args) -> void;
auto write_debug(const char* format, const Args&... args) -> void;
}
We welcome contributions! Please see our Contributing Guide for details.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add some amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
- Follow modern C++ best practices
- Use RAII and smart pointers
- Maintain consistent formatting (clang-format configuration provided)
- Write comprehensive unit tests for new features
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Email: kcenon@naver.com
-
Queue Strategy
- Standard pool uses an adaptive queue by default:
thread_module::create_job_queue(adaptive_job_queue::queue_strategy::ADAPTIVE)
. - Typed pool uses an adaptive typed queue by default:
typed_thread_pool_module::create_typed_job_queue(...)
. - Strategies:
AUTO_DETECT
,FORCE_LEGACY
(mutex-based),FORCE_LOCKFREE
,ADAPTIVE
. - Thresholds: evaluation interval and contention/latency thresholds are compile-time constants today. Expose as configuration if your deployment requires runtime tuning.
- Standard pool uses an adaptive queue by default:
-
Worker Sizing
- CPU-bound: start with workers ~= hardware threads; benchmark and adjust.
- IO-bound: allow more workers than cores to cover blocking time.
- Mixed workloads: prefer typed thread pool to protect latency for critical types.
-
Periodic Wake Interval
- Use
thread_base::set_wake_interval(std::optional<std::chrono::milliseconds>)
for periodic housekeeping. - Access is guarded by a dedicated mutex for correctness (~5% overhead under contention); use only when you need periodic work.
- Use
-
Monitoring & Logging
- Enable
monitoring_module::metrics_collector
to record pool/worker/system metrics at a configured interval. - Use logger writers (console/file/callback) suitable to production environments; keep file rotation enabled for long-running services.
- Enable
-
Result Pattern
- Operations return
result_void
orresult<T>
(seethread_base/sync/error_handling.h
). - On success:
result_void{}
orresult<T>{value}
; On failure:error{error_code::..., "message"}
. - Example:
auto r = pool->enqueue(std::move(job)); if (r) { /* success */ } else { /* r.get_error().to_string() */ }
- Operations return
-
Cancellation
- Jobs can accept a
cancellation_token
; long-running jobs should periodically checktoken.is_cancelled()
and returnerror{error_code::operation_canceled, ...}
when appropriate. - Cancellation is cooperative; use for graceful shutdown and bounded latency guarantees.
- Jobs can accept a
-
C++20 Features (autodetected in CMake)
std::format
vs{fmt}
: Windows builds default to{fmt}
for maximum compatibility. Non-Windows usesstd::format
only if compile tests pass. Override on Windows with-DWINDOWS_ALLOW_STD_FORMAT=ON
if you accept risks.std::jthread
: enabled viaUSE_STD_JTHREAD
when supported; otherwise falls back tostd::thread
.std::span
,std::chrono::current_zone
,std::filesystem
: enabled when compile tests succeed; otherwise fallbacks are used where available.
-
Dependencies (via vcpkg):
{fmt}
,gtest
,benchmark
,spdlog
,libiconv
. -
Quick Build
- Unix/macOS:
./dependency.sh && ./build.sh
- Windows:
dependency.bat && build.bat
- Unix/macOS:
- Unit Tests: GoogleTest-based suites under
unittest/
(thread base/pool, typed pool, logger, utilities, platform behavior). - Run: configure with CMake and execute
ctest -C <Debug|Release> --output-on-failure
from the build directory. The root scripts trigger tests on CI matrices. - Coverage: see
TEST_COVERAGE_IMPROVEMENT_REPORT.md
and CI configuration for thresholds and guidance.
-
Adaptive Typed Queue Migration
- Current
adaptive_typed_job_queue
switches implementation without migrating already enqueued items between strategies. Plan graceful switches: drain or stop before switching, or selectFORCE_*
strategies when migration safety is required.
- Current
-
Lock-free Queue Backoff Limits
lockfree_job_queue
uses retry and yield/backoff. Under extreme contention, enqueue/dequeue may fail afterMAX_TOTAL_RETRIES
with a typed error. Handle by backoff-and-retry or routing to an alternate path.
-
Size Semantics in Typed Queues
- Legacy typed queues report total size, not per-type; lock-free typed queues expose per-type sizes. Use typed APIs for accurate type-aware visibility.
-
Performance vs. Safety Trade-offs
- Mutex protections (e.g., wake interval) and cancellation checks have documented overhead (+3β5%) that improve correctness. Disable only with full understanding of the risks.
- Moving from single-queue to typed scheduling: start by routing only your most latency-sensitive types; keep background work on the standard pool to minimize overhead.
- For bursty traffic with variable contention, keep the default adaptive queue; for highly predictable loads, consider forcing
FORCE_LOCKFREE
/FORCE_LEGACY
and tune for your steady-state.
This project is licensed under the BSD 3-Clause License - see the LICENSE file for details.
- Thanks to all contributors who have helped improve this project
- Special thanks to the C++ community for continuous feedback and support
- Inspired by modern concurrent programming patterns and best practices
Made with β€οΈ by πβππ₯ π