Moirai 是一个参考 Hippo4j 写的动态线程池框架,核心逻辑与 Hippo4j 一致,主要用于个人学习,在亲手造轮子中学习 Hippo4j 源码,以及学习动态线程池的设计原理与编码实践。
除了框架本身以外,本仓库还较为系统性整理了动态线程池相关的知识,欢迎交流。
- 明确监控的线程池信息(直接读取的核心参数、加工后的数据).md
- 明确可以动态更新的线程池信息.md
- 自定义可变容量的阻塞队列.md
- 动态线程池的扩展点以及插件体系.md
- 提供默认插件注册功能.md
- 使用 Builder 模式构建动态线程池.md
- 线程池配置信息注册到服务端.md
- 适配 TTL 和 Spring ThreadPoolTaskExecutor.md
- 实现客户端与服务端之间的服务发现.md
- 实现客户端与服务端之间的心跳检测(续约).md
- 实现过期服务移除功能.md
- 使用长轮询实现配置信息的动态变更.md
- ……
moirai-server-bootstrap
└── moirai-server-console
├── moirai-server-config
│ ├── moirai-common
│ └── moirai-adapter
│ └── moirai-core
│ └── moirai-server-monitor-base
├── moirai-server-discovery
└── moirai-server-auth
- 根节点是
moirai-server-bootstrap - 直接依赖是
moirai-server-console(编译作用域) - 二级依赖包含三个模块:
- 配置模块
moirai-server-config - 服务发现模块
moirai-server-discovery - 认证模块
moirai-server-auth
- 配置模块
- 配置模块延伸出基础技术栈:
- 公共模块 → 适配器 → 核心模块 → 监控基础模块
graph TD
A[moirai-server-bootstrap] --> B[moirai-server-console]
B --> C[moirai-server-config]
B --> D[moirai-server-discovery]
B --> E[moirai-server-auth]
C --> F[moirai-common]
C --> G[moirai-adapter]
G --> H[moirai-core]
H --> I[moirai-server-monitor-base]
依赖关系图:
graph TD
A[moirai-example] --> B[moirai-spring-boot-starter]
B --> C[moirai-core]
C --> D[moirai-common]
B --> E[moirai-server-monitor-base]
B --> F[moirai-adapter]
B --> G[moirai-message]
层级结构:
- Level 0: 根模块 moirai-example
- Level 1: 核心依赖 moirai-spring-boot-starter
- Level 2: Starter 的四个直接子模块
- Level 3: moirai-core 的唯一子模块 moirai-common
classDiagram
direction BT
class AbstractTaskTimerPlugin {
+ AbstractTaskTimerPlugin()
+ beforeExecute(Thread, Runnable) void
# currentTime() long
# processTaskTime(long) void
+ afterExecute(Runnable, Throwable) void
}
class ExecuteAwarePlugin {
<<Interface>>
+ beforeExecute(Thread, Runnable) void
+ afterExecute(Runnable, Throwable) void
}
class RejectedAwarePlugin {
<<Interface>>
+ beforeRejectedExecution(Runnable, ThreadPoolExecutor) void
}
class ShutdownAwarePlugin {
<<Interface>>
+ afterShutdown(ThreadPoolExecutor, List~Runnable~) void
+ afterTerminated(ExtensibleThreadPoolExecutor) void
+ beforeShutdown(ThreadPoolExecutor) void
}
class Summary {
+ Summary(long, long, long, long)
- long taskCount
- long minTaskTimeMillis
- long totalTaskTimeMillis
- long maxTaskTimeMillis
long maxTaskTimeMillis
long taskCount
long minTaskTimeMillis
long totalTaskTimeMillis
long avgTaskTimeMillis
}
class TaskAwarePlugin {
<<Interface>>
+ beforeTaskCreate(ThreadPoolExecutor, Runnable, V) Runnable
+ beforeTaskCreate(ThreadPoolExecutor, Callable~V~) Callable~V~
+ beforeTaskExecute(Runnable) Runnable
}
class TaskDecoratorPlugin {
+ TaskDecoratorPlugin()
- List~TaskDecorator~ decorators
+ removeDecorator(TaskDecorator) void
+ clearDecorators() void
+ beforeTaskExecute(Runnable) Runnable
+ addDecorator(TaskDecorator) void
PluginRuntime pluginRuntime
String id
List~TaskDecorator~ decorators
}
class TaskRejectCountRecordPlugin {
+ TaskRejectCountRecordPlugin()
- AtomicLong rejectCount
+ beforeRejectedExecution(Runnable, ThreadPoolExecutor) void
PluginRuntime pluginRuntime
Long rejectCountNum
AtomicLong rejectCount
String id
}
class TaskRejectNotifyAlarmPlugin {
+ TaskRejectNotifyAlarmPlugin()
+ beforeRejectedExecution(Runnable, ThreadPoolExecutor) void
String id
}
class TaskTimeRecordPlugin {
+ TaskTimeRecordPlugin()
+ summarize() Summary
# processTaskTime(long) void
PluginRuntime pluginRuntime
String id
}
class TaskTimeoutNotifyAlarmPlugin {
+ TaskTimeoutNotifyAlarmPlugin(String, Long, ThreadPoolExecutor)
- Long executeTimeOut
# processTaskTime(long) void
String id
Long executeTimeOut
}
class ThreadPoolExecutorShutdownPlugin {
+ ThreadPoolExecutorShutdownPlugin(long)
+ long awaitTerminationMillis
- awaitTerminationIfNecessary(ExtensibleThreadPoolExecutor) void
+ beforeShutdown(ThreadPoolExecutor) void
# cancelRemainingTask(Runnable) void
+ afterShutdown(ThreadPoolExecutor, List~Runnable~) void
PluginRuntime pluginRuntime
String id
long awaitTerminationMillis
}
class ThreadPoolPlugin {
<<Interface>>
+ start() void
+ stop() void
PluginRuntime pluginRuntime
String id
}
AbstractTaskTimerPlugin ..> ExecuteAwarePlugin
ExecuteAwarePlugin --> ThreadPoolPlugin
RejectedAwarePlugin --> ThreadPoolPlugin
ShutdownAwarePlugin --> ThreadPoolPlugin
TaskTimeRecordPlugin --> Summary
TaskAwarePlugin --> ThreadPoolPlugin
TaskDecoratorPlugin ..> TaskAwarePlugin
TaskRejectCountRecordPlugin ..> RejectedAwarePlugin
TaskRejectNotifyAlarmPlugin ..> RejectedAwarePlugin
TaskTimeRecordPlugin --> AbstractTaskTimerPlugin
TaskTimeoutNotifyAlarmPlugin --> AbstractTaskTimerPlugin
ThreadPoolExecutorShutdownPlugin ..> ShutdownAwarePlugin
graph TD
A[开始: postProcessAfterInitialization] --> B{Bean是DynamicThreadPoolExecutor<br/>或适配器匹配?}
B -->|是| C{检查DynamicThreadPool注解}
B -->|否| D{Bean是DynamicThreadPoolWrapper?}
D -->|是| E[调用registerAndSubscribe]
D -->|否| Z[返回bean]
C -->|找到注解| F[提取DynamicThreadPoolExecutor实例]
C -->|未找到注解| Z
F --> G[创建DynamicThreadPoolWrapper]
G --> H[调用fillPoolAndRegister]
subgraph fillPoolAndRegister流程
H --> I[构建查询参数]
I --> J[HTTP GET查询服务端配置]
J --> K{服务端存在配置?}
K -->|是| L[更新本地线程池参数]
K -->|否| M[构建DynamicThreadPoolRegisterParameter]
L --> N[注册到GlobalThreadPoolManage]
M --> O[调用GlobalThreadPoolManage.dynamicRegister]
O --> P[触发DynamicThreadPoolConfigService注册]
P --> Q[HTTP POST注册到服务端]
Q --> R{注册成功?}
R -->|是| S[缓存告警配置]
R -->|否| T[记录错误日志]
S --> N
end
H --> U[替换第三方线程池执行器]
U --> V[订阅配置更新]
V --> W[返回处理后的bean]
E --> X[调用registerAndSubscribe]
X --> Y[同fillPoolAndRegister流程]
subgraph GlobalThreadPoolManage
N --> AA[存入EXECUTOR_MAP]
N --> AB[存入POOL_PARAMETER]
end
subgraph DynamicThreadPoolConfigService
P --> BA[参数校验]
BA --> BB[填充租户/项目信息]
BB --> BC[发送HTTP请求]
BC --> BD[处理告警配置]
end
style A fill:#f9f,stroke:#333
style Z fill:#f9f,stroke:#333
style H fill:#bbf,stroke:#666
style N fill:#bfb,stroke:#666
style P fill:#fbb,stroke:#666
- 为什么Java要设计一个线程池?
- 万字长文之线程池源码深入分析
- 线程池源码涉及到的位运算以及相应算法练习
- 线程池源码涉及到的链表结构以及相应算法练习
- LinkedBlockingQueue 原理与源码深度分析
- 核心线程数为0时,线程池如何执行?
- 线程池中那些信息是可以被监控的?
- 线程池异常后:销毁还是复用
- 线程池的核心线程会被回收吗?
- 线程池提交一个任务占多大内存?
- 如何保证服务器宕机后线程池不丢失数据?
- 如何设计一个线程池
- FutureTask源码深入分析?.md
- 虚拟线程原理及性能分析.md
- ForkJoinPool 源码深入分析?.md
- Spring 的线程池设计
- Spring中Async注解底层异步线程池原理
- Dubbo 的线程池设计
- RocketMQ 的线程池设计
- 时间轮
- Tomcat 对 jdk 线程池进行了哪些修改?.md