18
18
using ServerlessWorkflow . Sdk . Models ;
19
19
using ServerlessWorkflow . Sdk . Services . IO ;
20
20
using Synapse . Application . Commands . Workflows ;
21
-
22
- namespace Synapse . Application . Services
21
+ using Synapse . Infrastructure . Plugins ;
22
+
23
+ namespace Synapse . Application . Services ;
24
+
25
+
26
+ /// <summary>
27
+ /// Represents an <see cref="IHostedService"/> used to monitor <see cref="WorkflowDefinition"/> files
28
+ /// </summary>
29
+ public class WorkflowDefinitionFileMonitor
30
+ : BackgroundService
31
+ {
32
+
33
+ /// <summary>
34
+ /// Initializes a new <see cref="WorkflowDefinitionFileMonitor"/>
35
+ /// </summary>
36
+ /// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
37
+ /// <param name="logger">The service used to perform logging</param>
38
+ /// <param name="options">The service used to access the current <see cref="SynapseApplicationOptions"/></param>
39
+ /// <param name="pluginManager">The service used to manage <see cref="IPlugin"/>s</param>
40
+ /// <param name="workflowReader">The service used to read <see cref="WorkflowDefinition"/>s</param>
41
+ public WorkflowDefinitionFileMonitor ( IServiceProvider serviceProvider , ILogger < WorkflowDefinitionFileMonitor > logger , IOptions < SynapseApplicationOptions > options , IPluginManager pluginManager , IWorkflowReader workflowReader )
23
42
{
24
-
25
- /// <summary>
26
- /// Represents an <see cref="IHostedService"/> used to monitor <see cref="WorkflowDefinition"/> files
27
- /// </summary>
28
- public class WorkflowDefinitionFileMonitor
29
- : BackgroundService
43
+ this . ServiceProvider = serviceProvider ;
44
+ this . Logger = logger ;
45
+ this . Options = options . Value ;
46
+ this . PluginManager = pluginManager ;
47
+ this . WorkflowReader = workflowReader ;
48
+ }
49
+
50
+ /// <summary>
51
+ /// Gets the current <see cref="IServiceProvider"/>
52
+ /// </summary>
53
+ protected IServiceProvider ServiceProvider { get ; }
54
+
55
+ /// <summary>
56
+ /// Gets the service used to perform logging
57
+ /// </summary>
58
+ protected ILogger Logger { get ; }
59
+
60
+ /// <summary>
61
+ /// Gets the current <see cref="SynapseApplicationOptions"/>
62
+ /// </summary>
63
+ protected SynapseApplicationOptions Options { get ; }
64
+
65
+ /// <summary>
66
+ /// Gets the service used to manage <see cref="IPlugin"/>s
67
+ /// </summary>
68
+ protected IPluginManager PluginManager { get ; }
69
+
70
+ /// <summary>
71
+ /// Gets the service used to read <see cref="WorkflowDefinition"/>s
72
+ /// </summary>
73
+ protected IWorkflowReader WorkflowReader { get ; }
74
+
75
+ /// <summary>
76
+ /// Gets the <see cref="WorkflowDefinitionFileMonitor"/>'s <see cref="System.Threading.CancellationTokenSource"/>
77
+ /// </summary>
78
+ protected CancellationTokenSource CancellationTokenSource { get ; private set ; } = null ! ;
79
+
80
+ /// <summary>
81
+ /// Gets a service used to watch <see cref="WorkflowDefinition"/> files
82
+ /// </summary>
83
+ protected FileSystemWatcher FileSystemWatcher { get ; private set ; } = null ! ;
84
+
85
+ /// <inheritdoc/>
86
+ protected override async Task ExecuteAsync ( CancellationToken stoppingToken )
87
+ {
88
+ await this . PluginManager . WaitForStartupAsync ( stoppingToken ) ;
89
+ if ( string . IsNullOrWhiteSpace ( this . Options . DefinitionsDirectory ) )
90
+ return ;
91
+ this . CancellationTokenSource = CancellationTokenSource . CreateLinkedTokenSource ( stoppingToken ) ;
92
+ var directory = new DirectoryInfo ( this . Options . DefinitionsDirectory ) ;
93
+ if ( ! directory . Exists )
94
+ directory . Create ( ) ;
95
+ this . FileSystemWatcher = new ( directory . FullName )
96
+ {
97
+ IncludeSubdirectories = true ,
98
+ NotifyFilter = NotifyFilters . LastWrite | NotifyFilters . FileName ,
99
+ EnableRaisingEvents = true
100
+ } ;
101
+ this . FileSystemWatcher . Created += this . OnFileCreatedOrChangedAsync ;
102
+ this . FileSystemWatcher . Changed += this . OnFileCreatedOrChangedAsync ;
103
+ foreach ( var file in directory . GetFiles ( "*.*" , SearchOption . AllDirectories )
104
+ . Where ( f => f . Extension . ToLower ( ) == ".json" || f . Extension . ToLower ( ) == ".yml" || f . Extension . ToLower ( ) == ".yaml" ) )
30
105
{
106
+ await this . ReadAndCreateWorkflowAsync ( file . FullName , true ) ;
107
+ }
108
+ }
109
+
110
+ /// <summary>
111
+ /// Handles the creation of a new file in the definitions directory
112
+ /// </summary>
113
+ /// <param name="sender">The sender of the <see cref="FileSystemEventArgs"/></param>
114
+ /// <param name="e">The <see cref="FileSystemEventArgs"/> to handle</param>
115
+ protected virtual async void OnFileCreatedOrChangedAsync ( object sender , FileSystemEventArgs e )
116
+ {
117
+ if ( e . ChangeType != WatcherChangeTypes . Created
118
+ && e . ChangeType != WatcherChangeTypes . Changed )
119
+ return ;
120
+ switch ( Path . GetExtension ( e . FullPath . ToLower ( ) ) )
121
+ {
122
+ case ".json" :
123
+ case ".yaml" :
124
+ case ".yml" :
125
+ break ;
126
+ default :
127
+ return ;
128
+ }
129
+ await this . ReadAndCreateWorkflowAsync ( e . FullPath , false ) ;
130
+ }
131
+
132
+ /// <summary>
133
+ /// Reads the <see cref="WorkflowDefinition"/> from the specified file and creates a new <see cref="V1Workflow"/>, if it does not already exist
134
+ /// </summary>
135
+ /// <param name="filePath">The path to the <see cref="WorkflowDefinition"/> file to read</param>
136
+ /// <param name="ifNotExists">A boolean indicating to only import read and create a new <see cref="V1Workflow"/> if it already exists</param>
137
+ /// <returns>A new awaitable <see cref="Task"/></returns>
138
+ protected virtual async Task ReadAndCreateWorkflowAsync ( string filePath , bool ifNotExists )
139
+ {
140
+ try
141
+ {
142
+ using var stream = File . OpenRead ( filePath ) ;
143
+ var definition = await this . WorkflowReader . ReadAsync ( stream ) ;
144
+ using var scope = this . ServiceProvider . CreateScope ( ) ;
145
+ await scope . ServiceProvider . GetRequiredService < IMediator > ( ) . ExecuteAndUnwrapAsync ( new V1CreateWorkflowCommand ( definition , ifNotExists ) , this . CancellationTokenSource . Token ) ;
146
+ }
147
+ catch ( IOException ex ) when ( ex . HResult == - 2147024864 ) { }
148
+ catch ( Exception ex )
149
+ {
150
+ this . Logger . LogError ( "An error occured while reading a valid Serverless Workflow definition from the specified file '{filePath}': {ex}" , filePath , ex . ToString ( ) ) ;
151
+ return ;
152
+ }
153
+ }
31
154
32
- /// <summary>
33
- /// Initializes a new <see cref="WorkflowDefinitionFileMonitor"/>
34
- /// </summary>
35
- /// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
36
- /// <param name="logger">The service used to perform logging</param>
37
- /// <param name="options">The service used to access the current <see cref="SynapseApplicationOptions"/></param>
38
- /// <param name="workflowReader">The service used to read <see cref="WorkflowDefinition"/>s</param>
39
- public WorkflowDefinitionFileMonitor ( IServiceProvider serviceProvider , ILogger < WorkflowDefinitionFileMonitor > logger , IOptions < SynapseApplicationOptions > options , IWorkflowReader workflowReader )
40
- {
41
- this . ServiceProvider = serviceProvider ;
42
- this . Logger = logger ;
43
- this . Options = options . Value ;
44
- this . WorkflowReader = workflowReader ;
45
- }
46
-
47
- /// <summary>
48
- /// Gets the current <see cref="IServiceProvider"/>
49
- /// </summary>
50
- protected IServiceProvider ServiceProvider { get ; }
51
-
52
- /// <summary>
53
- /// Gets the service used to perform logging
54
- /// </summary>
55
- protected ILogger Logger { get ; }
56
-
57
- /// <summary>
58
- /// Gets the current <see cref="SynapseApplicationOptions"/>
59
- /// </summary>
60
- protected SynapseApplicationOptions Options { get ; }
61
-
62
- /// <summary>
63
- /// Gets the service used to read <see cref="WorkflowDefinition"/>s
64
- /// </summary>
65
- protected IWorkflowReader WorkflowReader { get ; }
66
-
67
- /// <summary>
68
- /// Gets the <see cref="WorkflowDefinitionFileMonitor"/>'s <see cref="System.Threading.CancellationTokenSource"/>
69
- /// </summary>
70
- protected CancellationTokenSource CancellationTokenSource { get ; private set ; } = null ! ;
71
-
72
- /// <summary>
73
- /// Gets a service used to watch <see cref="WorkflowDefinition"/> files
74
- /// </summary>
75
- protected FileSystemWatcher FileSystemWatcher { get ; private set ; } = null ! ;
76
-
77
- /// <inheritdoc/>
78
- protected override async Task ExecuteAsync ( CancellationToken stoppingToken )
79
- {
80
- if ( string . IsNullOrWhiteSpace ( this . Options . DefinitionsDirectory ) )
81
- return ;
82
- this . CancellationTokenSource = CancellationTokenSource . CreateLinkedTokenSource ( stoppingToken ) ;
83
- var directory = new DirectoryInfo ( this . Options . DefinitionsDirectory ) ;
84
- if ( ! directory . Exists )
85
- directory . Create ( ) ;
86
- this . FileSystemWatcher = new ( directory . FullName )
87
- {
88
- IncludeSubdirectories = true ,
89
- NotifyFilter = NotifyFilters . LastWrite | NotifyFilters . FileName ,
90
- EnableRaisingEvents = true
91
- } ;
92
- this . FileSystemWatcher . Created += this . OnFileCreatedOrChangedAsync ;
93
- this . FileSystemWatcher . Changed += this . OnFileCreatedOrChangedAsync ;
94
- foreach ( var file in directory . GetFiles ( "*.*" , SearchOption . AllDirectories )
95
- . Where ( f => f . Extension . ToLower ( ) == ".json" || f . Extension . ToLower ( ) == ".yml" || f . Extension . ToLower ( ) == ".yaml" ) )
96
- {
97
- await this . ReadAndCreateWorkflowAsync ( file . FullName , true ) ;
98
- }
99
- }
100
-
101
- /// <summary>
102
- /// Handles the creation of a new file in the definitions directory
103
- /// </summary>
104
- /// <param name="sender">The sender of the <see cref="FileSystemEventArgs"/></param>
105
- /// <param name="e">The <see cref="FileSystemEventArgs"/> to handle</param>
106
- protected virtual async void OnFileCreatedOrChangedAsync ( object sender , FileSystemEventArgs e )
107
- {
108
- if ( e . ChangeType != WatcherChangeTypes . Created
109
- && e . ChangeType != WatcherChangeTypes . Changed )
110
- return ;
111
- switch ( Path . GetExtension ( e . FullPath . ToLower ( ) ) )
112
- {
113
- case ".json" :
114
- case ".yaml" :
115
- case ".yml" :
116
- break ;
117
- default :
118
- return ;
119
- }
120
- await this . ReadAndCreateWorkflowAsync ( e . FullPath , false ) ;
121
- }
122
-
123
- /// <summary>
124
- /// Reads the <see cref="WorkflowDefinition"/> from the specified file and creates a new <see cref="V1Workflow"/>, if it does not already exist
125
- /// </summary>
126
- /// <param name="filePath">The path to the <see cref="WorkflowDefinition"/> file to read</param>
127
- /// <param name="ifNotExists">A boolean indicating to only import read and create a new <see cref="V1Workflow"/> if it already exists</param>
128
- /// <returns>A new awaitable <see cref="Task"/></returns>
129
- protected virtual async Task ReadAndCreateWorkflowAsync ( string filePath , bool ifNotExists )
130
- {
131
- try
132
- {
133
- using var stream = File . OpenRead ( filePath ) ;
134
- var definition = await this . WorkflowReader . ReadAsync ( stream ) ;
135
- using var scope = this . ServiceProvider . CreateScope ( ) ;
136
- await scope . ServiceProvider . GetRequiredService < IMediator > ( ) . ExecuteAndUnwrapAsync ( new V1CreateWorkflowCommand ( definition , ifNotExists ) , this . CancellationTokenSource . Token ) ;
137
- }
138
- catch ( IOException ex ) when ( ex . HResult == - 2147024864 ) { }
139
- catch ( Exception ex )
140
- {
141
- this . Logger . LogError ( "An error occured while reading a valid Serverless Workflow definition from the specified file '{filePath}': {ex}" , filePath , ex . ToString ( ) ) ;
142
- return ;
143
- }
144
- }
145
-
146
- /// <inheritdoc/>
147
- public override void Dispose ( )
148
- {
149
- this . CancellationTokenSource ? . Dispose ( ) ;
150
- this . FileSystemWatcher ? . Dispose ( ) ;
151
- base . Dispose ( ) ;
152
- GC . SuppressFinalize ( this ) ;
153
- }
155
+ /// <inheritdoc/>
156
+ public override void Dispose ( )
157
+ {
158
+ this . CancellationTokenSource ? . Dispose ( ) ;
159
+ this . FileSystemWatcher ? . Dispose ( ) ;
160
+ base . Dispose ( ) ;
161
+ GC . SuppressFinalize ( this ) ;
162
+ }
154
163
155
- }
164
+ }
156
165
157
- }
0 commit comments