Skip to content

Commit f50dfc8

Browse files
tknsnail
authored andcommitted
refactor: ♻️ 计划作业模块
1 parent e82a172 commit f50dfc8

File tree

7 files changed

+109
-43
lines changed

7 files changed

+109
-43
lines changed

src/backend/NetAdmin/NetAdmin.Infrastructure/EventBus/DefaultEventPublisher.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ public DefaultEventPublisher()
1818
_ = new TaskFactory<Task>().StartNew( //
1919
async state => {
2020
var subscribers = (List<MethodInfo>)state;
21-
await foreach (var msg in _eventChannel.Reader.ReadAllAsync()) {
22-
_ = Parallel.ForEach( //
23-
subscribers.Where(x => x.GetParameters().FirstOrDefault()?.ParameterType == msg.GetType())
24-
, (x, _) => x.Invoke(App.GetService(x.DeclaringType), [msg]));
25-
}
21+
await Parallel.ForEachAsync(_eventChannel.Reader.ReadAllAsync(), (msg, __) => {
22+
_ = Parallel.ForEach( //
23+
subscribers.Where(x => x.GetParameters().FirstOrDefault()?.ParameterType == msg.GetType())
24+
, (x, _) => x.Invoke(App.GetService(x.DeclaringType), [msg]));
25+
return ValueTask.CompletedTask;
26+
})
27+
.ConfigureAwait(false);
2628
}, App.EffectiveTypes.Where(x => typeof(IEventSubscriber).IsAssignableFrom(x) && x.IsClass && !x.IsAbstract).SelectMany(x => x.GetMethods(BindingFlags.Instance | BindingFlags.Public).Where(y => y.IsDefined(typeof(EventSubscribeAttribute)))).ToList());
2729
}
2830

src/backend/NetAdmin/NetAdmin.Infrastructure/NetAdmin.Infrastructure.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<ItemGroup>
66
<PackageReference Include="NetAdmin.FreeSql.DbContext" Version="1.1.1" Label="refs"/>
77
<PackageReference Include="NetAdmin.FreeSql.Provider.Sqlite" Version="1.1.1" Label="refs"/>
8-
<PackageReference Include="Gurion" Version="1.2.9" Label="refs"/>
8+
<PackageReference Include="Gurion" Version="1.2.10" Label="refs"/>
99
<PackageReference Include="Microsoft.Extensions.Caching.StackExchangeRedis" Version="9.0.0"/>
1010
<PackageReference Include="Minio" Version="6.0.4"/>
1111
<PackageReference Include="NSExt" Version="2.3.3"/>
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
namespace NetAdmin.Infrastructure.Schedule;
2+
3+
/// <summary>
4+
/// 作业处理程序
5+
/// </summary>
6+
public interface IJob
7+
{
8+
/// <summary>
9+
/// 具体处理逻辑
10+
/// </summary>
11+
Task ExecuteAsync(CancellationToken cancelToken);
12+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
namespace NetAdmin.Infrastructure.Schedule;
2+
3+
/// <summary>
4+
/// 作业配置
5+
/// </summary>
6+
[AttributeUsage(AttributeTargets.Class)]
7+
public sealed class JobConfigAttribute : Attribute
8+
{
9+
/// <summary>
10+
/// 上一次执行时间
11+
/// </summary>
12+
public DateTime? LastExecutionTime { get; set; }
13+
14+
/// <summary>
15+
/// 启动时运行
16+
/// </summary>
17+
public bool RunOnStart { get; init; }
18+
19+
/// <summary>
20+
/// 触发器表达式
21+
/// </summary>
22+
public string TriggerCron { get; init; }
23+
}

src/backend/NetAdmin/NetAdmin.SysComponent.Host/Extensions/ServiceCollectionExtensions.cs

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
using Gurion.Schedule;
1+
using Cronos;
22
using NetAdmin.Domain.Contexts;
33
using NetAdmin.Domain.Events;
44
using NetAdmin.Host.Filters;
5-
using NetAdmin.SysComponent.Host.Jobs;
5+
using NetAdmin.Host.Middlewares;
6+
using NetAdmin.Infrastructure.Schedule;
67
using NetAdmin.SysComponent.Host.Utils;
78
using FreeSqlBuilder = NetAdmin.Infrastructure.Utils.FreeSqlBuilder;
89

@@ -62,17 +63,58 @@ public static IServiceCollection AddFreeSql( //
6263
/// <summary>
6364
/// 添加定时任务
6465
/// </summary>
65-
public static IServiceCollection AddSchedules(this IServiceCollection me, bool force = false, Action<ScheduleOptionsBuilder> optionsAction = null)
66+
public static IServiceCollection AddSchedules(this IServiceCollection me, bool force = false)
6667
{
67-
return App.WebHostEnvironment.IsProduction() || force
68-
? me.AddSchedule( //
69-
builder => {
70-
_ = builder //
71-
.AddJob<ScheduledJob>(true, Triggers.PeriodSeconds(1).SetRunOnStart(true))
72-
.AddJob<FreeScheduledJob>(true, Triggers.PeriodMinutes(1).SetRunOnStart(true));
73-
74-
optionsAction?.Invoke(builder);
75-
})
76-
: me;
68+
if (!App.WebHostEnvironment.IsProduction() && !force) {
69+
return me;
70+
}
71+
72+
var jobTypes = App.EffectiveTypes
73+
.Where(x => typeof(IJob).IsAssignableFrom(x) && x.IsClass && !x.IsAbstract && x.IsDefined(typeof(JobConfigAttribute)))
74+
.ToDictionary(x => x, x => x.GetCustomAttribute<JobConfigAttribute>());
75+
var runOnStartJobTypes = jobTypes.Where(x => //
76+
x.Value.RunOnStart);
77+
RunJob(runOnStartJobTypes);
78+
_ = Task.Run(LoopTaskAsync);
79+
return me;
80+
81+
#pragma warning disable S2190
82+
async Task LoopTaskAsync()
83+
#pragma warning restore S2190
84+
{
85+
while (true) {
86+
await Task.Delay(1000).ConfigureAwait(false);
87+
if (SafetyShopHostMiddleware.IsShutdown) {
88+
Console.WriteLine(Ln.此节点已下线);
89+
}
90+
else {
91+
RunJob(jobTypes.Where(Filter));
92+
}
93+
}
94+
95+
bool Filter(KeyValuePair<Type, JobConfigAttribute> x)
96+
{
97+
return !x.Value.TriggerCron.NullOrEmpty() &&
98+
CronExpression.Parse(x.Value.TriggerCron, CronFormat.IncludeSeconds)
99+
.GetNextOccurrence(x.Value.LastExecutionTime ?? DateTime.UtcNow.AddDays(-1), TimeZoneInfo.Local)
100+
?.ToLocalTime() <= DateTime.Now;
101+
}
102+
103+
// ReSharper disable once FunctionNeverReturns
104+
}
105+
}
106+
107+
private static void RunJob(IEnumerable<KeyValuePair<Type, JobConfigAttribute>> jobTypes)
108+
{
109+
foreach (var job in jobTypes) {
110+
try {
111+
_ = typeof(IJob).GetMethod(nameof(IJob.ExecuteAsync))!.Invoke( //
112+
Activator.CreateInstance(job.Key), [CancellationToken.None]);
113+
job.Value.LastExecutionTime = DateTime.UtcNow;
114+
}
115+
catch (Exception ex) {
116+
LogHelper.Get<IServiceCollection>().Error(ex);
117+
}
118+
}
77119
}
78120
}

src/backend/NetAdmin/NetAdmin.SysComponent.Host/Jobs/FreeScheduledJob.cs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
using Gurion.Schedule;
21
using NetAdmin.Host.BackgroundRunning;
3-
using NetAdmin.Host.Middlewares;
2+
using NetAdmin.Infrastructure.Schedule;
43

54
namespace NetAdmin.SysComponent.Host.Jobs;
65

76
/// <summary>
87
/// 释放计划作业
98
/// </summary>
9+
[JobConfig(TriggerCron = "0 * * * * *")]
1010
public sealed class FreeScheduledJob : WorkBase<FreeScheduledJob>, IJob
1111
{
1212
private readonly IJobService _jobService;
@@ -22,17 +22,11 @@ public FreeScheduledJob()
2222
/// <summary>
2323
/// 具体处理逻辑
2424
/// </summary>
25-
/// <param name="context">作业执行前上下文</param>
26-
/// <param name="stoppingToken">取消任务 Token</param>
25+
/// <param name="cancelToken">取消任务 Token</param>
2726
/// <exception cref="NetAdminGetLockerException">加锁失败异常</exception>
28-
public async Task ExecuteAsync(JobExecutingContext context, CancellationToken stoppingToken)
27+
public async Task ExecuteAsync(CancellationToken cancelToken)
2928
{
30-
if (SafetyShopHostMiddleware.IsShutdown) {
31-
Console.WriteLine(Ln.此节点已下线);
32-
return;
33-
}
34-
35-
await WorkflowAsync(true, stoppingToken).ConfigureAwait(false);
29+
await WorkflowAsync(true, cancelToken).ConfigureAwait(false);
3630
}
3731

3832
/// <summary>

src/backend/NetAdmin/NetAdmin.SysComponent.Host/Jobs/ScheduledJob.cs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
using FreeSql.Internal;
22
using Gurion.RemoteRequest;
33
using Gurion.RemoteRequest.Extensions;
4-
using Gurion.Schedule;
54
using NetAdmin.Application.Extensions;
65
using NetAdmin.Domain.Dto.Sys.Job;
76
using NetAdmin.Domain.Dto.Sys.JobRecord;
87
using NetAdmin.Host.BackgroundRunning;
9-
using NetAdmin.Host.Middlewares;
8+
using NetAdmin.Infrastructure.Schedule;
109

1110
namespace NetAdmin.SysComponent.Host.Jobs;
1211

1312
/// <summary>
1413
/// 计划作业
1514
/// </summary>
15+
[JobConfig(TriggerCron = "* * * * * *")]
1616
public sealed class ScheduledJob : WorkBase<ScheduledJob>, IJob
1717
{
1818
private static string _accessToken;
@@ -30,19 +30,12 @@ public ScheduledJob()
3030
/// <summary>
3131
/// 具体处理逻辑
3232
/// </summary>
33-
/// <param name="context">作业执行前上下文</param>
34-
/// <param name="stoppingToken">取消任务 Token</param>
33+
/// <param name="cancelToken">取消任务 Token</param>
3534
/// <exception cref="NetAdminGetLockerException">加锁失败异常</exception>
36-
public async Task ExecuteAsync(JobExecutingContext context, CancellationToken stoppingToken)
35+
public Task ExecuteAsync(CancellationToken cancelToken)
3736
{
38-
if (SafetyShopHostMiddleware.IsShutdown) {
39-
Console.WriteLine(Ln.此节点已下线);
40-
return;
41-
}
42-
43-
// ReSharper disable once MethodSupportsCancellation
44-
await Parallel.ForAsync(0, Numbers.SCHEDULED_JOB_PARALLEL_NUM, async (_, _) => await WorkflowAsync(stoppingToken).ConfigureAwait(false))
45-
.ConfigureAwait(false);
37+
return Parallel.ForAsync(0, Numbers.SCHEDULED_JOB_PARALLEL_NUM, cancelToken
38+
, async (_, _) => await WorkflowAsync(cancelToken).ConfigureAwait(false));
4639
}
4740

4841
/// <summary>

0 commit comments

Comments
 (0)