Skip to content

A modern C++ threading framework offering robust thread management via thread_base class, real-time multi-threaded logging system, thread pool with type-based job scheduling, compatibility with std::jthread (C++20) and std::thread, and comprehensive error handling mechanisms.

License

Notifications You must be signed in to change notification settings

kcenon/thread_system

Repository files navigation

CodeFactor

Ubuntu-Clang Ubuntu-GCC Windows-MinGW Windows-MSYS2 Windows-VisualStudio

Thread System Project

Project Overview

The Thread System Project is a comprehensive, production-ready C++20 multithreading framework designed to democratize concurrent programming. By providing intuitive abstractions and robust implementations, it empowers developers of all skill levels to build high-performance, thread-safe applications without the typical complexity and pitfalls of manual thread management.

Project Purpose & Mission

This project addresses the fundamental challenge faced by developers worldwide: making concurrent programming accessible, safe, and efficient. Traditional threading approaches often lead to complex code, hard-to-debug race conditions, and performance bottlenecks. Our mission is to provide a comprehensive solution that:

  • Eliminates threading complexity through intuitive, high-level abstractions
  • Ensures thread safety by design, preventing common concurrency bugs
  • Maximizes performance through optimized algorithms and modern C++ features
  • Promotes code reusability across different platforms and use cases
  • Accelerates development by providing ready-to-use threading components

Core Advantages & Benefits

πŸš€ Performance Excellence

  • Zero-overhead abstractions: Modern C++ design ensures minimal runtime cost
  • Optimized data structures: Adaptive algorithms and cache-friendly designs
  • Adaptive scheduling: Type-based job processing for optimal resource utilization
  • Scalable architecture: Linear performance scaling with hardware thread count

πŸ›‘οΈ Production-Grade Reliability

  • Thread-safe by design: All components guarantee safe concurrent access
  • Comprehensive error handling: Robust error reporting and recovery mechanisms
  • Memory safety: RAII principles and smart pointers prevent leaks and corruption
  • Extensive testing: 95%+ CI/CD success rate across multiple platforms and compilers

πŸ”§ Developer Productivity

  • Intuitive API design: Clean, self-documenting interfaces reduce learning curve
  • Rich documentation: Comprehensive Doxygen documentation with examples
  • Flexible configuration: Adaptive queues with automatic optimization
  • Debugging support: Built-in logging and monitoring capabilities

🌐 Cross-Platform Compatibility

  • Universal support: Works on Windows, Linux, and macOS
  • Compiler flexibility: Compatible with GCC, Clang, and MSVC
  • C++ standard adaptation: Graceful fallback from C++20 to older standards
  • Architecture independence: Optimized for both x86 and ARM processors

πŸ“ˆ Enterprise-Ready Features

  • Type-based scheduling: Sophisticated job type specialization for real-time systems
  • Asynchronous logging: High-performance, non-blocking logging system
  • Resource monitoring: Built-in performance metrics and health checks
  • Modular design: Use individual components or the complete framework

Real-World Impact & Use Cases

🎯 Ideal Applications

  • High-frequency trading systems: Microsecond-level latency requirements
  • Game engines: Real-time rendering and physics simulation
  • Web servers: Concurrent request processing with type handling
  • Scientific computing: Parallel algorithm execution and data processing
  • Media processing: Video encoding, image processing, and audio streaming
  • IoT systems: Sensor data collection and real-time response systems

πŸ“Š Performance Benchmarks

Benchmarked on Apple M1 (8-core) @ 3.2GHz, 16GB, macOS Sonoma, Apple Clang 17.0.0

πŸš€ Architecture Update: Latest simplified architecture (2025-07-09) removed ~2,800 lines of duplicate code while maintaining all performance capabilities. Adaptive queues now provide automatic optimization for all workload scenarios.

Core Performance Metrics (Latest Benchmarks - 2025-07-09)

  • Peak Throughput: Up to 13.0M jobs/second (1 worker, empty jobs - theoretical)
  • Real-world Throughput:
    • Standard thread pool: 1.16M jobs/s (10 workers, proven in production)
    • Typed thread pool: 1.24M jobs/s (6 workers, 3 types)
    • Adaptive queues: Automatic optimization for all scenarios
  • Job scheduling latency:
    • Standard pool: ~77 nanoseconds (reliable baseline)
    • Adaptive queues: 96-580ns with automatic strategy selection
  • Queue operations: Adaptive strategy provides up to 7.7x faster when needed
  • High contention: Adaptive mode provides up to 3.46x improvement when beneficial
  • Priority scheduling: Type-based routing with high accuracy under all conditions
  • Memory efficiency: <1MB baseline, reduced codebase by ~2,800 lines
  • Scalability: Adaptive architecture maintains performance under any contention level

Impact of Thread Safety Fixes

  • Wake interval access: 5% performance impact with mutex protection
  • Cancellation token: 3% overhead for proper double-check pattern
  • Job queue operations: 4% performance improvement after removing redundant atomic counter

Detailed Performance Data

Real-World Performance (measured with actual workloads):

Measured Performance (actual workloads):

Configuration Throughput Time/1M jobs Workers Notes
Basic Pool 1.16M/s 862 ms 10 πŸ† Real-world baseline performance
Adaptive Pool Dynamic Optimized Variable πŸš€ Automatic optimization based on load
Type Pool 1.24M/s 806 ms 6 βœ… 6.9% faster with fewer workers
Adaptive Queues Dynamic Optimized Auto πŸš€ Automatic optimization
Peak (empty) 13.0M/s - 1 πŸ“Š Theoretical maximum

Adaptive Queue Performance (Automatic Optimization):

Contention Level Strategy Selected Latency vs Mutex-only Benefit
Low (1-2 threads) Mutex 96 ns Baseline Optimal for low load
Medium (4 threads) Adaptive 142 ns +8.2% faster Balanced performance
High (8+ threads) Lock-free 320 ns +37% faster Scales under contention
Variable Load Auto-switching Dynamic Optimized Automatic

Real Workload Performance (8-worker configuration):

Job Complexity Throughput Use Case Scaling Efficiency
Empty job 8.2M/s πŸ“ Framework overhead measurement 95%
1 μs work 1.5M/s ⚑ Very light computations 94%
10 ΞΌs work 540K/s πŸ”§ Typical small tasks 92%
100 ΞΌs work 70K/s πŸ’» Medium computations 90%
1 ms work 7.6K/s πŸ”₯ Heavy computations 88%
10 ms work 760/s πŸ—οΈ Very heavy computations 85%

Worker Thread Scaling Analysis:

Workers Speedup Efficiency Performance Rating Recommended Use
1 1.0x πŸ’― 100% πŸ₯‡ Excellent Single-threaded workloads
2 2.0x πŸ’š 99% πŸ₯‡ Excellent Dual-core systems
4 3.9x πŸ’š 97.5% πŸ₯‡ Excellent Quad-core optimal
8 7.7x πŸ’š 96% πŸ₯ˆ Very Good Standard multi-core
16 15.0x πŸ’™ 94% πŸ₯ˆ Very Good High-end workstations
32 28.3x πŸ’› 88% πŸ₯‰ Good Server environments

Library Performance Comparison (Real-world measurements):

Library Throughput Performance Verdict Key Features
πŸ₯‡ Thread System (Typed) 1.24M/s 🟒 107% βœ… Excellent Priority scheduling, adaptive queues, C++20
πŸ₯ˆ Intel TBB ~1.24M/s 🟒 107% βœ… Excellent Industry standard, work stealing
πŸ† Thread System (Standard) 1.16M/s 🟒 100% βœ… Baseline Adaptive queues, proven performance
πŸ“¦ Boost.Thread Pool ~1.09M/s 🟑 94% βœ… Good Header-only, portable
πŸ“¦ OpenMP ~1.06M/s 🟑 92% βœ… Good Compiler directives, easy to use
πŸ“¦ Microsoft PPL ~1.02M/s 🟑 88% βœ… Good Windows-specific
πŸ“š std::async ~267K/s πŸ”΄ 23% ⚠️ Limited Standard library, basic functionality

Logger Performance Comparison (High-contention scenario):

Logger Type Single Thread 4 Threads 8 Threads 16 Threads Best Use Case
πŸ† Thread System Logger 4.41M/s 1.07M/s 0.41M/s 0.39M/s All scenarios (adaptive)
πŸ₯ˆ Standard Mode 4.41M/s 0.86M/s 0.23M/s 0.18M/s Low concurrency
πŸ“Š Adaptive Benefit 0% +24% +78% +117% Auto-optimization

Logger vs Industry Standards (spdlog comparison included):

System Single-thread 4 Threads 8 Threads Latency vs Console
🐌 Console 583K/s - - 1,716 ns Baseline
πŸ† TS Logger 4.34M/s 1.07M/s 412K/s 148 ns πŸš€ 7.4x
πŸ“¦ spdlog 515K/s 210K/s 52K/s 2,333 ns πŸ”΄ 0.88x
⚑ spdlog async 5.35M/s 785K/s 240K/s - πŸš€ 9.2x

Key Insights:

  • πŸƒ Single-thread: spdlog async wins (5.35M/s) but TS Logger close behind (4.34M/s)
  • πŸ‹οΈ Multi-thread: TS Logger with adaptive queues shows consistent performance
  • ⏱️ Latency: TS Logger wins with 148ns (15.7x lower than spdlog)
  • πŸ“ˆ Scalability: Adaptive mode provides automatic optimization

Type-based Thread Pool Performance Comparison:

Mutex-based Implementation:

Complexity vs Basic Pool Type Accuracy Performance Best For
Single Type πŸ’š -3% πŸ’― 100% 525K/s Specialized workloads
3 Types πŸ’› -9% πŸ’― 99.6% 495K/s Standard prioritization
Real Workload πŸ’š +6.9% πŸ’― 100% 1.24M/s Actual measurement

With Adaptive Queues:

Scenario Performance vs Standard Type Accuracy Notes
Low contention 1.24M/s Same πŸ’― 100% Mutex strategy selected
High contention Dynamic Up to +71% πŸ’― 99%+ Lock-free mode engaged
Mixed workload Optimized Automatic πŸ’― 99.5% Strategy switches as needed
Real measurement 1.24M/s +6.9% πŸ’― 100% Production workload

Memory Usage & Creation Performance:

Workers Creation Time Memory Usage Efficiency Resource Rating
1 🟒 162 ns πŸ’š 1.2 MB πŸ’― 100% ⚑ Ultra-light
4 🟒 347 ns πŸ’š 1.8 MB πŸ’š 98% ⚑ Very light
8 🟑 578 ns πŸ’› 2.6 MB πŸ’š 96% πŸ”‹ Light
16 🟑 1.0 ΞΌs 🟑 4.2 MB πŸ’› 94% πŸ”‹ Moderate
32 🟠 2.0 ΞΌs 🟠 7.4 MB 🟑 88% πŸ“Š Heavy

For comprehensive performance analysis and optimization techniques, see the Performance Guide.

Technology Stack & Architecture

πŸ—οΈ Modern C++ Foundation

  • C++20 features: std::jthread, std::format, concepts, and ranges
  • Template metaprogramming: Type-safe, compile-time optimizations
  • Memory management: Smart pointers and RAII for automatic resource cleanup
  • Exception safety: Strong exception safety guarantees throughout
  • Adaptive algorithms: MPMC queues, automatic strategy selection, and atomic operations
  • SIMD optimization: Vectorized operations where applicable

πŸ”„ Design Patterns Implementation

  • Command Pattern: Job encapsulation for flexible task execution
  • Observer Pattern: Event-driven logging and monitoring
  • Factory Pattern: Configurable thread pool creation
  • Singleton Pattern: Global logger access with thread safety
  • Template Method Pattern: Customizable thread behavior
  • Strategy Pattern: Configurable backoff strategies and scheduling policies

Project Structure

πŸ“ Directory Organization

thread_system/
β”œβ”€β”€ πŸ“ sources/                     # Core source code
β”‚   β”œβ”€β”€ πŸ“ thread_base/             # Base threading functionality
β”‚   β”‚   β”œβ”€β”€ core/                   # Core classes (thread_base, thread_conditions)
β”‚   β”‚   β”œβ”€β”€ jobs/                   # Job system (job, callback_job, job_queue)
β”‚   β”‚   β”œβ”€β”€ lockfree/               # Lock-free queue implementations (for adaptive mode)
β”‚   β”‚   β”‚   β”œβ”€β”€ memory/             # Hazard pointers, node pools, memory reclamation
β”‚   β”‚   β”‚   └── queues/             # MPMC queue, adaptive queue, strategy selection
β”‚   β”‚   └── sync/                   # Synchronization primitives, atomic operations
β”‚   β”œβ”€β”€ πŸ“ thread_pool/             # Thread pool implementations
β”‚   β”‚   β”œβ”€β”€ core/                   # Pool classes
β”‚   β”‚   β”‚   β”œβ”€β”€ thread_pool.h/cpp   # Standard pool with adaptive queue support
β”‚   β”‚   β”œβ”€β”€ workers/                # Worker implementations
β”‚   β”‚   β”‚   β”œβ”€β”€ thread_worker.h/cpp # Standard worker
β”‚   β”‚   └── async/                  # Future-based tasks
β”‚   β”œβ”€β”€ πŸ“ typed_thread_pool/       # Type-based thread pool with adaptive queues
β”‚   β”‚   β”œβ”€β”€ core/                   # Job types and interfaces (job_types.h, typed_job_interface.h)
β”‚   β”‚   β”œβ”€β”€ jobs/                   # Typed job implementations
β”‚   β”‚   β”‚   β”œβ”€β”€ typed_job.h/tpp    # Base typed job template
β”‚   β”‚   β”‚   └── callback_typed_job.h/tpp # Lambda-based typed jobs
β”‚   β”‚   β”œβ”€β”€ pool/                   # Thread pool implementations
β”‚   β”‚   β”‚   └── typed_thread_pool.h/tpp # Adaptive pool with automatic optimization
β”‚   β”‚   └── scheduling/             # Job queues and workers
β”‚   β”‚       β”œβ”€β”€ adaptive_typed_job_queue.h/tpp/cpp # Adaptive priority queue
β”‚   β”‚       β”œβ”€β”€ typed_lockfree_job_queue.h/tpp/cpp # Lock-free queue (for adaptive mode)
β”‚   β”‚       └── typed_thread_worker.h/tpp # Adaptive worker
β”‚   β”œβ”€β”€ πŸ“ logger/                  # Asynchronous logging system
β”‚   β”‚   β”œβ”€β”€ core/                   # Logger implementation
β”‚   β”‚   β”‚   β”œβ”€β”€ logger_implementation.h/cpp # Standard mutex-based logger
β”‚   β”‚   β”‚   └── log_collector.h/cpp # Adaptive log collector
β”‚   β”‚   β”œβ”€β”€ types/                  # Log types and formatters
β”‚   β”‚   β”œβ”€β”€ writers/                # Console, file, callback writers
β”‚   β”‚   └── jobs/                   # Log job processing
β”‚   β”œβ”€β”€ πŸ“ utilities/               # Utility functions
β”‚   β”‚   β”œβ”€β”€ core/                   # formatter, span
β”‚   β”‚   β”œβ”€β”€ conversion/             # String conversions
β”‚   β”‚   β”œβ”€β”€ time/                   # Date/time utilities
β”‚   β”‚   └── io/                     # File handling
β”‚   └── πŸ“ monitoring/              # Real-time monitoring system
β”‚       β”œβ”€β”€ core/                   # Metrics collector, monitoring types
β”‚       └── storage/                # Ring buffer for time-series data
β”œβ”€β”€ πŸ“ samples/                     # Example applications
β”‚   β”œβ”€β”€ thread_pool_sample/         # Basic thread pool usage
β”‚   β”œβ”€β”€ typed_thread_pool_sample/   # Mutex-based priority scheduling
β”‚   β”œβ”€β”€ typed_thread_pool_sample_2/        # Advanced typed pool usage
β”‚   β”œβ”€β”€ logger_sample/              # Logging examples
β”‚   β”œβ”€β”€ monitoring_sample/          # Real-time metrics collection
β”‚   β”œβ”€β”€ mpmc_queue_sample/          # Adaptive MPMC queue usage
β”‚   β”œβ”€β”€ hazard_pointer_sample/      # Memory reclamation demo
β”‚   β”œβ”€β”€ node_pool_sample/           # Memory pool operations
β”‚   β”œβ”€β”€ adaptive_queue_sample/      # Adaptive queue selection
β”‚   └── typed_thread_pool_sample_2/ # Custom job types
β”œβ”€β”€ πŸ“ unittest/                    # Unit tests (Google Test)
β”‚   β”œβ”€β”€ thread_base_test/           # Base thread functionality tests
β”‚   β”œβ”€β”€ thread_pool_test/           # Thread pool tests
β”‚   β”œβ”€β”€ typed_thread_pool_test/     # Typed pool tests
β”‚   β”œβ”€β”€ logger_test/                # Logger tests
β”‚   └── utilities_test/             # Utility function tests
β”œβ”€β”€ πŸ“ benchmarks/                  # Performance benchmarks
β”‚   β”œβ”€β”€ thread_base_benchmarks/     # Core threading benchmarks
β”‚   β”œβ”€β”€ thread_pool_benchmarks/     # Pool performance tests
β”‚   β”‚   β”œβ”€β”€ thread_pool_benchmark.cpp      # Core pool metrics
β”‚   β”‚   β”œβ”€β”€ adaptive_comparison_benchmark.cpp # πŸ†• Standard vs adaptive
β”‚   β”‚   β”œβ”€β”€ memory_benchmark.cpp           # Memory usage patterns
β”‚   β”‚   β”œβ”€β”€ real_world_benchmark.cpp       # Realistic workloads
β”‚   β”‚   β”œβ”€β”€ stress_test_benchmark.cpp      # Extreme load testing
β”‚   β”‚   β”œβ”€β”€ scalability_benchmark.cpp      # Multi-core scaling
β”‚   β”‚   └── contention_benchmark.cpp       # Contention scenarios
β”‚   β”œβ”€β”€ typed_thread_pool_benchmarks/ # Typed pool benchmarks
β”‚   β”‚   β”œβ”€β”€ typed_scheduling_benchmark.cpp # Priority scheduling
β”‚   β”‚   β”œβ”€β”€ typed_lockfree_benchmark.cpp   # πŸ†• Lock-free vs mutex
β”‚   β”‚   └── queue_comparison_benchmark.cpp # πŸ†• Queue performance
β”‚   β”œβ”€β”€ logger_benchmarks/          # Logging performance
β”‚   └── monitoring_benchmarks/      # Monitoring overhead
β”œβ”€β”€ πŸ“ docs/                        # Documentation
β”œβ”€β”€ πŸ“ cmake/                       # CMake modules
β”œβ”€β”€ πŸ“„ CMakeLists.txt               # Main build configuration
β”œβ”€β”€ πŸ“„ vcpkg.json                  # Dependencies
└── πŸ”§ build.sh/.bat               # Build scripts

πŸ“– Key Files and Their Purpose

Core Module Files

  • thread_base.h/cpp: Abstract base class for all worker threads
  • job.h/cpp: Abstract interface for work units
  • job_queue.h/cpp: Thread-safe FIFO queue implementation
  • callback_job.h/cpp: Lambda-based job implementation

Thread Pool Files

  • thread_pool.h/cpp: Main thread pool class managing workers
  • thread_worker.h/cpp: Worker thread that processes jobs
  • future_extensions.h: Future-based task extensions for async results

Typed Thread Pool Files (Mutex-based)

  • typed_thread_pool.h/tpp: Template-based priority thread pool
  • typed_job_queue.h/tpp: Priority queue for typed jobs
  • typed_thread_worker.h/tpp: Workers with type responsibility lists
  • job_types.h: Default priority enumeration (RealTime, Batch, Background)

Typed Thread Pool Files (Adaptive) πŸ†•

  • typed_thread_pool.h/tpp: Adaptive priority thread pool implementation
  • adaptive_typed_job_queue.h/tpp/cpp: Per-type adaptive MPMC queues
  • typed_thread_worker.h/tpp: Adaptive worker with priority handling
  • typed_queue_statistics_t: Performance monitoring and metrics collection

Logger Files

  • logger.h: Public API with free functions
  • log_collector.h/cpp: Central log message router
  • console_writer.h/cpp: Colored console output
  • file_writer.h/cpp: Rotating file logger

πŸ”— Module Dependencies

utilities (no dependencies)
    β”‚
    β”œβ”€β”€> thread_base
    β”‚        β”‚
    β”‚        β”œβ”€β”€> thread_pool
    β”‚        β”‚
    β”‚        └──> typed_thread_pool
    β”‚                   β”‚
    β”‚                   └── typed_thread_pool (adaptive)
    β”‚
    β”œβ”€β”€> logger
    β”‚
    └──> monitoring

πŸ› οΈ Build Output Structure

build/
β”œβ”€β”€ bin/                    # Executable files
β”‚   β”œβ”€β”€ thread_pool_sample
β”‚   β”œβ”€β”€ typed_thread_pool_sample          # Mutex-based
β”‚   β”œβ”€β”€ typed_thread_pool_sample_2        # Advanced usage
β”‚   β”œβ”€β”€ logger_sample
β”‚   β”œβ”€β”€ monitoring_sample
β”‚   β”œβ”€β”€ adaptive_benchmark               # πŸ†• Performance comparison
β”‚   β”œβ”€β”€ queue_comparison_benchmark        # πŸ†• Queue benchmarks
β”‚   └── ...
β”œβ”€β”€ lib/                    # Static libraries
β”‚   β”œβ”€β”€ libthread_base.a
β”‚   β”œβ”€β”€ libthread_pool.a
β”‚   β”œβ”€β”€ libtyped_thread_pool.a  # Includes both mutex & lock-free
β”‚   β”œβ”€β”€ liblogger.a
β”‚   β”œβ”€β”€ libutilities.a
β”‚   └── libmonitoring.a
└── include/                # Public headers (for installation)

Key Components

  • thread_base class: The foundational abstract class for all thread operations
    • Supports both std::jthread (C++20) and std::thread through conditional compilation
    • Provides lifecycle management (start/stop) and customizable hooks
  • job class: Abstract base class for units of work
  • callback_job class: Concrete job implementation using std::function
  • job_queue class: Thread-safe queue for job management
  • Adaptive components:
    • adaptive_job_queue: Dual-mode queue supporting both mutex and lock-free strategies
    • lockfree_job_queue: Lock-free MPMC queue (utilized by adaptive mode)
    • hazard_pointer: Safe memory reclamation for lock-free data structures
    • node_pool: Memory pool for queue operations
  • Namespace-level logging functions: write_information(), write_error(), write_debug(), etc.
  • log_types enum: Bitwise-enabled log levels (Exception, Error, Information, Debug, Sequence, Parameter)
  • Multiple output targets:
    • console_writer: Asynchronous console output with color support
    • file_writer: Rotating file output with backup support
    • callback_writer: Custom callback for log processing
  • log_collector class: Central hub for log message routing and processing
  • Configuration functions: set_title(), console_target(), file_target(), etc.

Standard Thread Pool

  • thread_pool class: Thread pool with adaptive queue support
    • Dynamic worker addition/removal
    • Dual-mode job queue architecture (mutex and lock-free)
    • Proven reliability for general workloads
  • thread_worker class: Worker thread implementation supporting adaptive queues

Adaptive Queue Features

  • Adaptive job queues: Dual-mode queue implementation with automatic optimization
    • Dynamic strategy selection between mutex and lock-free modes
    • MPMC queue with hazard pointers when needed
    • Intelligent backoff for contention handling
    • Batch processing support for improved throughput
    • Per-worker statistics tracking
    • Optional batch processing mode
    • Configurable backoff strategies

Common Features

  • task<T> template: Future-based task wrapper for async results
  • Builder pattern support: Fluent API for pool configuration
  • Drop-in compatibility: Same API for easy migration
  • metrics_collector class: Real-time performance metrics collection engine
  • Cross-platform system metrics: Memory usage, CPU utilization, active threads
  • Thread pool monitoring: Job completion rates, queue depths, worker utilization
  • Lock-free storage: Memory-efficient ring buffer for time-series data
  • Easy integration: Global singleton collector with simple API
  • Key features:
    • Real-time data collection (100ms-1s intervals)
    • Thread-safe metric registration and updates
    • Configurable buffer sizes and collection intervals
    • Zero-overhead when disabled

The framework provides two distinct typed thread pool implementations optimized for different scenarios:

Typed Thread Pool Implementation

  • typed_thread_pool class: Priority thread pool with adaptive queue support
  • Best for: Type-based job scheduling with automatic optimization
  • Performance: Adaptive queues provide optimal performance for varying workloads
  • Features:
    • Per-type adaptive queues: Each job type can use optimized queue strategy
    • Priority-based routing: RealTime > Batch > Background ordering
    • Adaptive queue support: Uses dual-mode queues for optimal performance
    • Dynamic queue creation: Automatic type queue lifecycle management
    • Advanced statistics: Per-type metrics and performance monitoring

Common Components

  • job_types enum: Default priority levels (RealTime, Batch, Background)
  • Type-aware components:
    • typed_job_t<T>: Jobs with associated type/priority
    • adaptive_typed_job_queue_t<T>: Adaptive priority queue implementation
    • typed_lockfree_job_queue_t<T>: Lock-free priority queue (utilized by adaptive mode)
    • typed_thread_worker_t<T>: Workers with adaptive queue handling
  • callback_typed_job<T>: Lambda-based typed job implementation
  • Custom type support: Use your own enums or types for job prioritization

Usage Guidelines

  • Use Adaptive Implementation: Automatic optimization for all scenarios
  • Benefits: Simplified deployment with automatic performance tuning

Advanced Features & Capabilities

πŸŽ›οΈ Intelligent Task Scheduling

  • Adaptive implementation strategy: Automatic optimization based on runtime conditions
  • Type-aware job distribution: Workers can handle multiple type levels with configurable responsibility lists
  • Priority-based scheduling: Adaptive implementation provides optimal priority ordering (RealTime > Batch > Background)
  • Dynamic type adaptation: Runtime adjustment of worker responsibilities based on workload patterns
  • FIFO guarantee: Strict first-in-first-out ordering within same type levels
  • Per-type queue optimization: Adaptive implementation uses optimized queues for each job type
  • Advanced contention handling: Automatic strategy selection with hazard pointers for safe memory reclamation
  • Scalable architecture: Dynamic scaling optimization based on contention patterns

πŸ”¬ Advanced Threading Features

  • Hierarchical design: Clean thread_base foundation with specialized derived classes
  • C++20 compatibility: Full support for std::jthread with graceful fallback to std::thread
  • Cancellation support: Cooperative task cancellation using std::stop_token
  • Custom thread naming: Enhanced debugging with meaningful thread identification
  • Wake interval support: Periodic task execution without busy waiting
  • Result type: Modern error handling with monadic operations

πŸ“Š Production Monitoring & Diagnostics

  • Real-time metrics: Job processing rates, queue depths, and worker utilization
  • Performance profiling: Built-in timing and bottleneck identification
  • Health checks: Automatic detection of thread failures and recovery
  • Comprehensive logging: Multi-level, multi-target logging with asynchronous processing

βš™οΈ Configuration & Customization

  • Template-based flexibility: Custom type types and job implementations
  • Runtime configuration: JSON-based configuration for deployment flexibility
  • Compile-time optimization: Conditional feature compilation for minimal overhead
  • Builder pattern: Fluent API for easy thread pool construction

πŸ”’ Safety & Reliability

  • Exception safety: Strong exception safety guarantees throughout the framework
  • Resource leak prevention: Automatic cleanup using RAII principles
  • Deadlock prevention: Careful lock ordering and timeout mechanisms
  • Memory corruption protection: Smart pointer usage and bounds checking

Quick Start & Usage Examples

πŸš€ Getting Started in 5 Minutes

Adaptive High-Performance Example

#include "thread_pool/core/thread_pool.h"
#include "thread_base/jobs/callback_job.h"
#include "logger/core/logger.h"

using namespace thread_pool_module;
using namespace thread_module;

int main() {
    // 1. Start the logger
    log_module::start();
    
    // 2. Create a high-performance adaptive thread pool
    auto pool = std::make_shared<thread_pool>();
    
    // 3. Add workers with adaptive queue optimization
    std::vector<std::unique_ptr<thread_worker>> workers;
    for (int i = 0; i < std::thread::hardware_concurrency(); ++i) {
        auto worker = std::make_unique<thread_worker>();
        worker->set_batch_processing(true, 32);  // Process up to 32 jobs at once
        workers.push_back(std::move(worker));
    }
    pool->enqueue_batch(std::move(workers));
    
    // 4. Start processing
    pool->start();
    
    // 5. Submit jobs - adaptive pool handles varying contention efficiently
    std::atomic<int> counter{0};
    const int total_jobs = 100000;
    
    for (int i = 0; i < total_jobs; ++i) {
        pool->enqueue(std::make_unique<callback_job>(
            [&counter, i]() -> result_void {
                counter.fetch_add(1);
                if (i % 10000 == 0) {
                    log_module::write_information("Processed {} jobs", i);
                }
                return {};
            }
        ));
    }
    
    // 6. Wait for completion with progress monitoring
    auto start_time = std::chrono::high_resolution_clock::now();
    while (counter.load() < total_jobs) {
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
    auto end_time = std::chrono::high_resolution_clock::now();
    
    // 7. Get comprehensive performance statistics
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
    auto throughput = static_cast<double>(total_jobs) / duration.count() * 1000.0;
    
    log_module::write_information("Performance Results:");
    log_module::write_information("- Total jobs: {}", total_jobs);
    log_module::write_information("- Execution time: {} ms", duration.count());
    log_module::write_information("- Throughput: {:.2f} jobs/second", throughput);
    
    auto workers_list = pool->get_workers();
    for (size_t i = 0; i < workers_list.size(); ++i) {
        auto stats = static_cast<thread_worker*>(workers_list[i].get())->get_statistics();
        log_module::write_information("Worker {}: {} jobs, avg time: {} ns, {} batch ops",
                                     i, stats.jobs_processed, 
                                     stats.avg_processing_time_ns,
                                     stats.batch_operations);
    }
    
    // 8. Clean shutdown
    pool->stop();
    log_module::stop();
    
    return 0;
}

Performance Tip: The adaptive queues automatically optimize for your workload. They provide both mutex-based reliability and lock-free performance when beneficial.

πŸ”„ More Usage Examples

Standard Thread Pool (Low Contention)

#include "thread_pool/core/thread_pool.h"
#include "thread_base/jobs/callback_job.h"

using namespace thread_pool_module;
using namespace thread_module;

// Create a simple thread pool for low-contention workloads
auto pool = std::make_shared<thread_pool>("StandardPool");

// Add workers
std::vector<std::unique_ptr<thread_worker>> workers;
for (int i = 0; i < 4; ++i) {  // Few workers for simple tasks
    workers.push_back(std::make_unique<thread_worker>());
}
pool->enqueue_batch(std::move(workers));
pool->start();

// Submit jobs
for (int i = 0; i < 100; ++i) {
    pool->enqueue(std::make_unique<callback_job>(
        [i]() -> result_void {
            // Process data
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            log_module::write_debug("Processed item {}", i);
            return {};
        }
    ));
}

Adaptive Thread Pool (High Contention)

#include "thread_pool/core/thread_pool.h"
#include "thread_base/jobs/callback_job.h"

using namespace thread_pool_module;
using namespace thread_module;

// Create adaptive pool for high-contention scenarios
auto pool = std::make_shared<thread_pool>("AdaptivePool");

// Configure workers for maximum throughput
std::vector<std::unique_ptr<thread_worker>> workers;
for (int i = 0; i < std::thread::hardware_concurrency(); ++i) {
    auto worker = std::make_unique<thread_worker>();
    
    // Enable batch processing for better throughput
    worker->set_batch_processing(true, 64);
    
    workers.push_back(std::move(worker));
}
pool->enqueue_batch(std::move(workers));
pool->start();

// Submit jobs from multiple threads (high contention)
// Adaptive queues will automatically switch to lock-free mode when beneficial
std::vector<std::thread> producers;
for (int t = 0; t < 8; ++t) {
    producers.emplace_back([&pool, t]() {
        for (int i = 0; i < 10000; ++i) {
            pool->enqueue(std::make_unique<callback_job>(
                [t, i]() -> result_void {
                    // Fast job execution
                    std::atomic<int> sum{0};
                    for (int j = 0; j < 100; ++j) {
                        sum.fetch_add(j);
                    }
                    return {};
                }
            ));
        }
    });
}

// Wait for all producers
for (auto& t : producers) {
    t.join();
}

// Get detailed statistics
auto workers_vec = pool->get_workers();
for (size_t i = 0; i < workers_vec.size(); ++i) {
    auto stats = static_cast<thread_worker*>(
        workers_vec[i].get())->get_statistics();
    log_module::write_information(
        "Worker {}: {} jobs, {} ΞΌs avg, {} batch ops",
        i, stats.jobs_processed,
        stats.avg_processing_time_ns / 1000,
        stats.batch_operations
    );
}

Asynchronous Logging

#include "logger/core/logger.h"

// Configure logger
log_module::set_title("MyApplication");
log_module::console_target(log_module::log_types::Information | 
                          log_module::log_types::Error);
log_module::file_target(log_module::log_types::All);

// Start logger
log_module::start();

// Use various log levels
log_module::write_information("Application started");
log_module::write_debug("Debug mode enabled");
log_module::write_error("Example error: {}", error_code);
log_module::write_sequence("Processing step {}/{}", current, total);

// Custom callback for critical errors
log_module::callback_target(log_module::log_types::Exception);
log_module::message_callback(
    [](const log_module::log_types& type, 
       const std::string& datetime, 
       const std::string& message) {
        if (type == log_module::log_types::Exception) {
            send_alert_email(message);
        }
    }
);

High-Performance Adaptive Logging

#include "logger/core/logger.h"

using namespace log_module;

// Configure the logger for high-performance scenarios
log_module::set_title("HighPerformanceApp");
log_module::console_target(log_types::Information);
log_module::file_target(log_types::Information);

// Start the logger
log_module::start();

// High-frequency logging from multiple threads
// The logger automatically adapts to contention patterns
std::vector<std::thread> log_threads;
for (int t = 0; t < 16; ++t) {
    log_threads.emplace_back([t]() {
        for (int i = 0; i < 10000; ++i) {
            log_module::write_information(
                "Thread {} - High-frequency log message {}", t, i);
        }
    });
}

// Wait for all threads
for (auto& t : log_threads) {
    t.join();
}

// Adaptive logger provides excellent performance:
// - Automatic optimization based on contention
// - Efficient multi-threaded operation
// - Up to 238% better throughput at 16 threads
// - Ideal for high-concurrency logging scenarios

log_module::stop();

Real-time Performance Monitoring

#include "monitoring/core/metrics_collector.h"
#include "thread_pool/core/thread_pool.h"

using namespace monitoring_module;
using namespace thread_pool_module;

// Start monitoring system
monitoring_config config;
config.collection_interval = std::chrono::milliseconds(100);  // 100ms intervals
metrics::start_global_monitoring(config);

// Create and monitor a thread pool
auto pool = std::make_shared<thread_pool>();
pool->start();

// Register thread pool metrics
auto collector = global_metrics_collector::instance().get_collector();
auto pool_metrics = std::make_shared<thread_pool_metrics>();
collector->register_thread_pool_metrics(pool_metrics);

// Submit jobs and monitor in real-time
for (int i = 0; i < 1000; ++i) {
    pool->enqueue(std::make_unique<callback_job>([&pool_metrics]() -> result_void {
        // Update metrics
        pool_metrics->jobs_completed.fetch_add(1);
        return {};
    }));
}

// Get real-time metrics
auto snapshot = metrics::get_current_metrics();
std::cout << "Jobs completed: " << snapshot.thread_pool.jobs_completed.load() << "\n";
std::cout << "Memory usage: " << snapshot.system.memory_usage_bytes.load() << " bytes\n";

// Stop monitoring
metrics::stop_global_monitoring();

πŸ“š Comprehensive Sample Collection

Our samples demonstrate real-world usage patterns and best practices:

Performance & Concurrency

Thread Pool Fundamentals

Monitoring & Diagnostics

πŸ› οΈ Build & Integration

Prerequisites

  • CMake 3.16 or later
  • C++20 capable compiler (GCC 9+, Clang 10+, MSVC 2019+)
  • vcpkg package manager (automatically installed by dependency scripts)

Build Steps

# Clone the repository
git clone https://github.com/kcenon/thread_system.git
cd thread_system

# Install dependencies via vcpkg
./dependency.sh  # Linux/macOS
./dependency.bat # Windows

# Build the project
./build.sh       # Linux/macOS
./build.bat      # Windows

# Run samples
./build/bin/thread_pool_sample
./build/bin/typed_thread_pool_sample
./build/bin/logger_sample

# Run tests (Linux/Windows only, disabled on macOS)
cd build && ctest --verbose

CMake Integration

# Using as a subdirectory
add_subdirectory(thread_system)
target_link_libraries(your_target PRIVATE 
    thread_base 
    thread_pool 
    typed_thread_pool 
    logger
)

# Using with FetchContent
include(FetchContent)
FetchContent_Declare(
    thread_system
    GIT_REPOSITORY https://github.com/kcenon/thread_system.git
    GIT_TAG main
)
FetchContent_MakeAvailable(thread_system)

API Documentation

Core API Reference

Quick API Overview

// Thread Pool API
namespace thread_pool_module {
    // Thread pool with adaptive queue support
    class thread_pool {
        auto start() -> std::optional<std::string>;
        auto stop(bool immediately = false) -> void;
        auto enqueue(std::unique_ptr<job>&& job) -> std::optional<std::string>;
        auto enqueue_batch(std::vector<std::unique_ptr<job>>&& jobs) -> std::optional<std::string>;
        auto get_workers() const -> const std::vector<std::shared_ptr<thread_worker>>&;
        auto get_queue_statistics() const -> queue_statistics;
    };
    
    // Thread worker with adaptive capabilities
    class thread_worker : public thread_base {
        struct worker_statistics {
            uint64_t jobs_processed;
            uint64_t total_processing_time_ns;
            uint64_t batch_operations;
            uint64_t avg_processing_time_ns;
        };
        
        auto set_batch_processing(bool enabled, size_t batch_size = 32) -> void;
        auto get_statistics() const -> worker_statistics;
    };
}

// Typed Thread Pool API (Mutex-based)
namespace typed_thread_pool_module {
    template<typename T>
    class typed_thread_pool_t {
        auto start() -> result_void;
        auto stop(bool clear_queue = false) -> result_void;
        auto enqueue(std::unique_ptr<typed_job_t<T>>&& job) -> result_void;
        auto enqueue_batch(std::vector<std::unique_ptr<typed_job_t<T>>>&& jobs) -> result_void;
    };
    
    // Adaptive Typed Queue API (supports both mutex and lock-free modes)
    template<typename T>
    class adaptive_typed_job_queue_t {
        auto enqueue(std::unique_ptr<typed_job_t<T>>&& job) -> result_void;
        auto dequeue() -> result<std::unique_ptr<job>>;
        auto dequeue(const T& type) -> result<std::unique_ptr<typed_job_t<T>>>;
        auto size() const -> std::size_t;
        auto empty() const -> bool;
        auto get_typed_statistics() const -> typed_queue_statistics_t<T>;
    };
    
    // Lock-free Queue (utilized by adaptive mode when beneficial)
    template<typename T>
    class typed_lockfree_job_queue_t {
        auto enqueue(std::unique_ptr<typed_job_t<T>>&& job) -> result_void;
        auto dequeue() -> result<std::unique_ptr<job>>;
        auto dequeue(const T& type) -> result<std::unique_ptr<typed_job_t<T>>>;
        auto size() const -> std::size_t;
        auto empty() const -> bool;
        auto get_typed_statistics() const -> typed_queue_statistics_t<T>;
    };
}

// Monitoring API
namespace monitoring_module {
    class metrics_collector {
        auto start() -> thread_module::result_void;
        auto stop() -> void;
        auto get_current_snapshot() const -> metrics_snapshot;
        auto register_system_metrics(std::shared_ptr<system_metrics> metrics) -> void;
        auto register_thread_pool_metrics(std::shared_ptr<thread_pool_metrics> metrics) -> void;
    };
    
    // Convenience functions
    namespace metrics {
        auto start_global_monitoring(monitoring_config config = {}) -> thread_module::result_void;
        auto stop_global_monitoring() -> void;
        auto get_current_metrics() -> metrics_snapshot;
        auto is_monitoring_active() -> bool;
    }
}

// Logger API
namespace log_module {
    auto start() -> std::optional<std::string>;
    auto stop() -> void;
    
    template<typename... Args>
    auto write_information(const char* format, const Args&... args) -> void;
    auto write_error(const char* format, const Args&... args) -> void;
    auto write_debug(const char* format, const Args&... args) -> void;
}

Contributing

We welcome contributions! Please see our Contributing Guide for details.

Development Setup

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

Code Style

  • Follow modern C++ best practices
  • Use RAII and smart pointers
  • Maintain consistent formatting (clang-format configuration provided)
  • Write comprehensive unit tests for new features

Support

Configuration & Tuning

  • Queue Strategy

    • Standard pool uses an adaptive queue by default: thread_module::create_job_queue(adaptive_job_queue::queue_strategy::ADAPTIVE).
    • Typed pool uses an adaptive typed queue by default: typed_thread_pool_module::create_typed_job_queue(...).
    • Strategies: AUTO_DETECT, FORCE_LEGACY (mutex-based), FORCE_LOCKFREE, ADAPTIVE.
    • Thresholds: evaluation interval and contention/latency thresholds are compile-time constants today. Expose as configuration if your deployment requires runtime tuning.
  • Worker Sizing

    • CPU-bound: start with workers ~= hardware threads; benchmark and adjust.
    • IO-bound: allow more workers than cores to cover blocking time.
    • Mixed workloads: prefer typed thread pool to protect latency for critical types.
  • Periodic Wake Interval

    • Use thread_base::set_wake_interval(std::optional<std::chrono::milliseconds>) for periodic housekeeping.
    • Access is guarded by a dedicated mutex for correctness (~5% overhead under contention); use only when you need periodic work.
  • Monitoring & Logging

    • Enable monitoring_module::metrics_collector to record pool/worker/system metrics at a configured interval.
    • Use logger writers (console/file/callback) suitable to production environments; keep file rotation enabled for long-running services.

Error Handling & Cancellation

  • Result Pattern

    • Operations return result_void or result<T> (see thread_base/sync/error_handling.h).
    • On success: result_void{} or result<T>{value}; On failure: error{error_code::..., "message"}.
    • Example:
      auto r = pool->enqueue(std::move(job));
      if (r) { /* success */ } else { /* r.get_error().to_string() */ }
  • Cancellation

    • Jobs can accept a cancellation_token; long-running jobs should periodically check token.is_cancelled() and return error{error_code::operation_canceled, ...} when appropriate.
    • Cancellation is cooperative; use for graceful shutdown and bounded latency guarantees.

Build & Platform Policy

  • C++20 Features (autodetected in CMake)

    • std::format vs {fmt}: Windows builds default to {fmt} for maximum compatibility. Non-Windows uses std::format only if compile tests pass. Override on Windows with -DWINDOWS_ALLOW_STD_FORMAT=ON if you accept risks.
    • std::jthread: enabled via USE_STD_JTHREAD when supported; otherwise falls back to std::thread.
    • std::span, std::chrono::current_zone, std::filesystem: enabled when compile tests succeed; otherwise fallbacks are used where available.
  • Dependencies (via vcpkg): {fmt}, gtest, benchmark, spdlog, libiconv.

  • Quick Build

    • Unix/macOS: ./dependency.sh && ./build.sh
    • Windows: dependency.bat && build.bat

Testing

  • Unit Tests: GoogleTest-based suites under unittest/ (thread base/pool, typed pool, logger, utilities, platform behavior).
  • Run: configure with CMake and execute ctest -C <Debug|Release> --output-on-failure from the build directory. The root scripts trigger tests on CI matrices.
  • Coverage: see TEST_COVERAGE_IMPROVEMENT_REPORT.md and CI configuration for thresholds and guidance.

Known Limitations & Notes

  • Adaptive Typed Queue Migration

    • Current adaptive_typed_job_queue switches implementation without migrating already enqueued items between strategies. Plan graceful switches: drain or stop before switching, or select FORCE_* strategies when migration safety is required.
  • Lock-free Queue Backoff Limits

    • lockfree_job_queue uses retry and yield/backoff. Under extreme contention, enqueue/dequeue may fail after MAX_TOTAL_RETRIES with a typed error. Handle by backoff-and-retry or routing to an alternate path.
  • Size Semantics in Typed Queues

    • Legacy typed queues report total size, not per-type; lock-free typed queues expose per-type sizes. Use typed APIs for accurate type-aware visibility.
  • Performance vs. Safety Trade-offs

    • Mutex protections (e.g., wake interval) and cancellation checks have documented overhead (+3–5%) that improve correctness. Disable only with full understanding of the risks.

Upgrade & Migration Tips

  • Moving from single-queue to typed scheduling: start by routing only your most latency-sensitive types; keep background work on the standard pool to minimize overhead.
  • For bursty traffic with variable contention, keep the default adaptive queue; for highly predictable loads, consider forcing FORCE_LOCKFREE/FORCE_LEGACY and tune for your steady-state.

License

This project is licensed under the BSD 3-Clause License - see the LICENSE file for details.

Acknowledgments

  • Thanks to all contributors who have helped improve this project
  • Special thanks to the C++ community for continuous feedback and support
  • Inspired by modern concurrent programming patterns and best practices

Made with ❀️ by πŸ€β˜€πŸŒ•πŸŒ₯ 🌊

About

A modern C++ threading framework offering robust thread management via thread_base class, real-time multi-threaded logging system, thread pool with type-based job scheduling, compatibility with std::jthread (C++20) and std::thread, and comprehensive error handling mechanisms.

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •