Skip to content

从零开始写动态线程池框架,以及动态线程池的一切(线程池源码、第三方框架线程池、并发)

License

PansonPanson/moirai

Repository files navigation

Moirai

Moirai 是一个参考 Hippo4j 写的动态线程池框架,核心逻辑与 Hippo4j 一致,主要用于个人学习,在亲手造轮子中学习 Hippo4j 源码,以及学习动态线程池的设计原理与编码实践。

除了框架本身以外,本仓库还较为系统性整理了动态线程池相关的知识,欢迎交流。

如何设计一个动态线程池框架?

一、模块依赖关系

moirai-server-bootstrap maven 依赖关系(project module)

moirai-server-bootstrap 
└── moirai-server-console 
    ├── moirai-server-config
    │   ├── moirai-common 
    │   └── moirai-adapter 
    │       └── moirai-core 
    │           └── moirai-server-monitor-base 
    ├── moirai-server-discovery 
    └── moirai-server-auth 
  1. 根节点是 moirai-server-bootstrap
  2. 直接依赖是 moirai-server-console(编译作用域)
  3. 二级依赖包含三个模块:
    • 配置模块 moirai-server-config
    • 服务发现模块 moirai-server-discovery
    • 认证模块 moirai-server-auth
  4. 配置模块延伸出基础技术栈:
    • 公共模块 → 适配器 → 核心模块 → 监控基础模块
graph TD
    A[moirai-server-bootstrap] --> B[moirai-server-console]
    B --> C[moirai-server-config]
    B --> D[moirai-server-discovery]
    B --> E[moirai-server-auth]
    C --> F[moirai-common]
    C --> G[moirai-adapter]
    G --> H[moirai-core]
    H --> I[moirai-server-monitor-base]
Loading

moirai-example maven 依赖关系(project module)

依赖关系图:

graph TD
    A[moirai-example] --> B[moirai-spring-boot-starter]
    B --> C[moirai-core]
    C --> D[moirai-common]
    B --> E[moirai-server-monitor-base]
    B --> F[moirai-adapter]
    B --> G[moirai-message]
Loading

层级结构: - Level 0: 根模块 moirai-example - Level 1: 核心依赖 moirai-spring-boot-starter - Level 2: Starter 的四个直接子模块 - Level 3: moirai-core 的唯一子模块 moirai-common

二、插件体系

classDiagram
direction BT
class AbstractTaskTimerPlugin {
  + AbstractTaskTimerPlugin() 
  + beforeExecute(Thread, Runnable) void
  # currentTime() long
  # processTaskTime(long) void
  + afterExecute(Runnable, Throwable) void
}
class ExecuteAwarePlugin {
<<Interface>>
  + beforeExecute(Thread, Runnable) void
  + afterExecute(Runnable, Throwable) void
}
class RejectedAwarePlugin {
<<Interface>>
  + beforeRejectedExecution(Runnable, ThreadPoolExecutor) void
}
class ShutdownAwarePlugin {
<<Interface>>
  + afterShutdown(ThreadPoolExecutor, List~Runnable~) void
  + afterTerminated(ExtensibleThreadPoolExecutor) void
  + beforeShutdown(ThreadPoolExecutor) void
}
class Summary {
  + Summary(long, long, long, long) 
  - long taskCount
  - long minTaskTimeMillis
  - long totalTaskTimeMillis
  - long maxTaskTimeMillis
   long maxTaskTimeMillis
   long taskCount
   long minTaskTimeMillis
   long totalTaskTimeMillis
   long avgTaskTimeMillis
}
class TaskAwarePlugin {
<<Interface>>
  + beforeTaskCreate(ThreadPoolExecutor, Runnable, V) Runnable
  + beforeTaskCreate(ThreadPoolExecutor, Callable~V~) Callable~V~
  + beforeTaskExecute(Runnable) Runnable
}
class TaskDecoratorPlugin {
  + TaskDecoratorPlugin() 
  - List~TaskDecorator~ decorators
  + removeDecorator(TaskDecorator) void
  + clearDecorators() void
  + beforeTaskExecute(Runnable) Runnable
  + addDecorator(TaskDecorator) void
   PluginRuntime pluginRuntime
   String id
   List~TaskDecorator~ decorators
}
class TaskRejectCountRecordPlugin {
  + TaskRejectCountRecordPlugin() 
  - AtomicLong rejectCount
  + beforeRejectedExecution(Runnable, ThreadPoolExecutor) void
   PluginRuntime pluginRuntime
   Long rejectCountNum
   AtomicLong rejectCount
   String id
}
class TaskRejectNotifyAlarmPlugin {
  + TaskRejectNotifyAlarmPlugin() 
  + beforeRejectedExecution(Runnable, ThreadPoolExecutor) void
   String id
}
class TaskTimeRecordPlugin {
  + TaskTimeRecordPlugin() 
  + summarize() Summary
  # processTaskTime(long) void
   PluginRuntime pluginRuntime
   String id
}
class TaskTimeoutNotifyAlarmPlugin {
  + TaskTimeoutNotifyAlarmPlugin(String, Long, ThreadPoolExecutor) 
  - Long executeTimeOut
  # processTaskTime(long) void
   String id
   Long executeTimeOut
}
class ThreadPoolExecutorShutdownPlugin {
  + ThreadPoolExecutorShutdownPlugin(long) 
  + long awaitTerminationMillis
  - awaitTerminationIfNecessary(ExtensibleThreadPoolExecutor) void
  + beforeShutdown(ThreadPoolExecutor) void
  # cancelRemainingTask(Runnable) void
  + afterShutdown(ThreadPoolExecutor, List~Runnable~) void
   PluginRuntime pluginRuntime
   String id
   long awaitTerminationMillis
}
class ThreadPoolPlugin {
<<Interface>>
  + start() void
  + stop() void
   PluginRuntime pluginRuntime
   String id
}

AbstractTaskTimerPlugin  ..>  ExecuteAwarePlugin 
ExecuteAwarePlugin  -->  ThreadPoolPlugin 
RejectedAwarePlugin  -->  ThreadPoolPlugin 
ShutdownAwarePlugin  -->  ThreadPoolPlugin 
TaskTimeRecordPlugin  -->  Summary 
TaskAwarePlugin  -->  ThreadPoolPlugin 
TaskDecoratorPlugin  ..>  TaskAwarePlugin 
TaskRejectCountRecordPlugin  ..>  RejectedAwarePlugin 
TaskRejectNotifyAlarmPlugin  ..>  RejectedAwarePlugin 
TaskTimeRecordPlugin  -->  AbstractTaskTimerPlugin 
TaskTimeoutNotifyAlarmPlugin  -->  AbstractTaskTimerPlugin 
ThreadPoolExecutorShutdownPlugin  ..>  ShutdownAwarePlugin 

Loading

三、线程池配置信息是如何注册到服务端的?

graph TD
A[开始: postProcessAfterInitialization] --> B{Bean是DynamicThreadPoolExecutor<br/>或适配器匹配?}
B -->|是| C{检查DynamicThreadPool注解}
B -->|否| D{Bean是DynamicThreadPoolWrapper?}
D -->|是| E[调用registerAndSubscribe]
D -->|否| Z[返回bean]

    C -->|找到注解| F[提取DynamicThreadPoolExecutor实例]
    C -->|未找到注解| Z
    
    F --> G[创建DynamicThreadPoolWrapper]
    G --> H[调用fillPoolAndRegister]
    
    subgraph fillPoolAndRegister流程
        H --> I[构建查询参数]
        I --> J[HTTP GET查询服务端配置]
        J --> K{服务端存在配置?}
        K -->|是| L[更新本地线程池参数]
        K -->|否| M[构建DynamicThreadPoolRegisterParameter]
        L --> N[注册到GlobalThreadPoolManage]
        M --> O[调用GlobalThreadPoolManage.dynamicRegister]
        O --> P[触发DynamicThreadPoolConfigService注册]
        P --> Q[HTTP POST注册到服务端]
        Q --> R{注册成功?}
        R -->|是| S[缓存告警配置]
        R -->|否| T[记录错误日志]
        S --> N
    end
    
    H --> U[替换第三方线程池执行器]
    U --> V[订阅配置更新]
    V --> W[返回处理后的bean]
    
    E --> X[调用registerAndSubscribe]
    X --> Y[同fillPoolAndRegister流程]
    
    subgraph GlobalThreadPoolManage
        N --> AA[存入EXECUTOR_MAP]
        N --> AB[存入POOL_PARAMETER]
    end
    
    subgraph DynamicThreadPoolConfigService
        P --> BA[参数校验]
        BA --> BB[填充租户/项目信息]
        BB --> BC[发送HTTP请求]
        BC --> BD[处理告警配置]
    end
    
    style A fill:#f9f,stroke:#333
    style Z fill:#f9f,stroke:#333
    style H fill:#bbf,stroke:#666
    style N fill:#bfb,stroke:#666
    style P fill:#fbb,stroke:#666
    
Loading

四、目录

线程池相关

Spring 相关

框架涉及到的设计模式

About

从零开始写动态线程池框架,以及动态线程池的一切(线程池源码、第三方框架线程池、并发)

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published