3
3
import io .github .lvyahui8 .spring .aggregate .config .RuntimeSettings ;
4
4
import io .github .lvyahui8 .spring .aggregate .facade .DataBeanAggregateQueryFacade ;
5
5
import io .github .lvyahui8 .spring .aggregate .facade .impl .DataBeanAggregateQueryFacadeImpl ;
6
+ import io .github .lvyahui8 .spring .aggregate .interceptor .AggregateQueryInterceptor ;
7
+ import io .github .lvyahui8 .spring .aggregate .interceptor .AggregateQueryInterceptorChain ;
8
+ import io .github .lvyahui8 .spring .aggregate .interceptor .impl .AggregateQueryInterceptorChainImpl ;
9
+ import io .github .lvyahui8 .spring .aggregate .model .DataConsumeDefinition ;
6
10
import io .github .lvyahui8 .spring .aggregate .model .DataProvideDefinition ;
7
11
import io .github .lvyahui8 .spring .aggregate .repository .DataProviderRepository ;
8
12
import io .github .lvyahui8 .spring .aggregate .repository .impl .DataProviderRepositoryImpl ;
9
- import io .github .lvyahui8 .spring .aggregate .service .DataBeanAggregateQueryService ;
10
- import io .github .lvyahui8 .spring .aggregate .service .impl .DataBeanAggregateQueryServiceImpl ;
13
+ import io .github .lvyahui8 .spring .aggregate .service .DataBeanAggregateService ;
14
+ import io .github .lvyahui8 .spring .aggregate .service .impl .DataBeanAggregateServiceImpl ;
11
15
import io .github .lvyahui8 .spring .aggregate .util .DefinitionUtils ;
12
16
import io .github .lvyahui8 .spring .annotation .DataProvider ;
13
17
import lombok .extern .slf4j .Slf4j ;
18
+ import org .apache .commons .lang3 .StringUtils ;
14
19
import org .reflections .Reflections ;
15
20
import org .reflections .scanners .MethodAnnotationsScanner ;
16
21
import org .springframework .beans .BeansException ;
17
22
import org .springframework .beans .factory .annotation .Autowired ;
23
+ import org .springframework .beans .factory .annotation .Qualifier ;
18
24
import org .springframework .boot .autoconfigure .condition .ConditionalOnMissingBean ;
19
25
import org .springframework .boot .context .properties .EnableConfigurationProperties ;
20
26
import org .springframework .context .ApplicationContext ;
21
27
import org .springframework .context .ApplicationContextAware ;
22
28
import org .springframework .context .annotation .Bean ;
23
29
import org .springframework .context .annotation .Configuration ;
30
+ import org .springframework .core .Ordered ;
24
31
import org .springframework .core .annotation .AnnotationUtils ;
32
+ import org .springframework .core .annotation .Order ;
25
33
import org .springframework .scheduling .concurrent .CustomizableThreadFactory ;
26
34
import org .springframework .util .Assert ;
27
- import org .springframework .util .StringUtils ;
28
35
29
36
import java .lang .reflect .Method ;
30
37
import java .lang .reflect .Modifier ;
31
- import java .util .Set ;
38
+ import java .util .* ;
32
39
import java .util .concurrent .ExecutorService ;
33
40
import java .util .concurrent .LinkedBlockingDeque ;
34
41
import java .util .concurrent .ThreadPoolExecutor ;
35
42
import java .util .concurrent .TimeUnit ;
43
+ import java .util .stream .Collectors ;
36
44
37
45
/**
38
46
* @author lvyahui (lvyahui8@gmail.com,lvyahui8@126.com)
@@ -54,24 +62,90 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
54
62
55
63
@ Bean
56
64
@ ConditionalOnMissingBean
57
- public DataBeanAggregateQueryFacade dataBeanAggregateQueryFacade () {
58
- return new DataBeanAggregateQueryFacadeImpl (dataBeanAggregateQueryService ());
65
+ public DataBeanAggregateQueryFacade dataBeanAggregateQueryFacade (
66
+ @ Qualifier ("dataProviderRepository" ) DataProviderRepository dataProviderRepository ) {
67
+ return new DataBeanAggregateQueryFacadeImpl (dataBeanAggregateQueryService (dataProviderRepository ));
68
+ }
69
+
70
+ private void checkCycle (Map <String ,Set <String >> graphAdjMap ) {
71
+ Map <String ,Integer > visitStatusMap = new HashMap <>(graphAdjMap .size () * 2 );
72
+ for (Map .Entry <String , Set <String >> item : graphAdjMap .entrySet ()) {
73
+ if (visitStatusMap .containsKey (item .getKey ())) {
74
+ continue ;
75
+ }
76
+ dfs (graphAdjMap ,visitStatusMap ,item .getKey ());
77
+ }
78
+ }
79
+
80
+ private void dfs (Map <String ,Set <String >> graphAdjMap ,Map <String ,Integer > visitStatusMap , String node ) {
81
+ if (visitStatusMap .containsKey (node )) {
82
+ if (visitStatusMap .get (node ) == 1 ) {
83
+ List <String > relatedNodes = new ArrayList <>();
84
+ for (Map .Entry <String ,Integer > item : visitStatusMap .entrySet ()) {
85
+ if (item .getValue () == 1 ) {
86
+ relatedNodes .add (item .getKey ());
87
+ }
88
+ }
89
+ throw new IllegalStateException ("There are loops in the dependency graph. Related nodes:" + StringUtils .join (relatedNodes ));
90
+ }
91
+ return ;
92
+ }
93
+ visitStatusMap .put (node ,1 );
94
+ log .info ("visited:{}" , node );
95
+ for (String relateNode : graphAdjMap .get (node )) {
96
+ dfs (graphAdjMap ,visitStatusMap ,relateNode );
97
+ }
98
+ visitStatusMap .put (node ,2 );
59
99
}
60
100
61
101
@ Bean
62
102
@ ConditionalOnMissingBean
63
- public DataBeanAggregateQueryService dataBeanAggregateQueryService () {
64
- DataProviderRepository repository = new DataProviderRepositoryImpl ();
65
- scanProviders (repository );
66
- DataBeanAggregateQueryServiceImpl service = new DataBeanAggregateQueryServiceImpl (repository );
67
- service .setRuntimeSettings (createRuntimeSettings ());
103
+ public DataBeanAggregateService dataBeanAggregateQueryService (
104
+ @ Qualifier ("dataProviderRepository" ) DataProviderRepository dataProviderRepository ) {
105
+ if (properties .getBasePackages () != null ) {
106
+ Map <String ,Set <String >> provideDependMap = new HashMap <>(64 );
107
+ for (String basePackage : properties .getBasePackages ()) {
108
+ Reflections reflections = new Reflections (basePackage , new MethodAnnotationsScanner ());
109
+ Set <Method > providerMethods = reflections .getMethodsAnnotatedWith (DataProvider .class );
110
+ for (Method method : providerMethods ) {
111
+ DataProvider beanProvider = AnnotationUtils .findAnnotation (method , DataProvider .class );
112
+ @ SuppressWarnings ("ConstantConditions" )
113
+ String dataId = beanProvider .id ();
114
+ Assert .isTrue (Modifier .isPublic (method .getModifiers ()),"data provider method must be public" );
115
+ Assert .isTrue (! StringUtils .isEmpty (dataId ),"data id must be not null!" );
116
+ DataProvideDefinition provider = DefinitionUtils .getProvideDefinition (method );
117
+
118
+ provider .setId (dataId );
119
+ provider .setIdempotent (beanProvider .idempotent ());
120
+ provider .setTimeout (beanProvider .timeout () > 0 ? beanProvider .timeout () : properties .getDefaultTimeout ());
121
+ Assert .isTrue (! dataProviderRepository .contains (dataId ), "Data providers with the same name are not allowed. dataId: " + dataId );
122
+ provideDependMap .put (dataId ,provider .getDepends ().stream ().map (DataConsumeDefinition ::getId ).collect (Collectors .toSet ()));
123
+ dataProviderRepository .put (provider );
124
+ }
125
+ }
126
+ checkCycle (provideDependMap );
127
+ }
128
+
129
+ DataBeanAggregateServiceImpl service = new DataBeanAggregateServiceImpl ();
130
+ RuntimeSettings runtimeSettings = new RuntimeSettings ();
131
+ runtimeSettings .setIgnoreException (properties .isIgnoreException ());
132
+ runtimeSettings .setTimeout (properties .getDefaultTimeout ());
133
+ service .setRepository (dataProviderRepository );
134
+ service .setRuntimeSettings (runtimeSettings );
68
135
service .setExecutorService (aggregateExecutorService ());
136
+ service .setInterceptorChain (aggregateQueryInterceptorChain ());
137
+ service .setTaskWrapperClazz (properties .getTaskWrapperClass ());
69
138
service .setApplicationContext (applicationContext );
70
139
return service ;
71
140
}
72
141
142
+ /**
143
+ * 允许用户自定义线程池
144
+ *
145
+ * @return 线程池服务
146
+ */
73
147
@ Bean (name = "aggregateExecutorService" )
74
- @ ConditionalOnMissingBean (name = "aggregateExecutorService" )
148
+ @ ConditionalOnMissingBean (name = "aggregateExecutorService" , value = ExecutorService . class )
75
149
public ExecutorService aggregateExecutorService () {
76
150
return new ThreadPoolExecutor (
77
151
properties .getThreadNumber (),
@@ -81,39 +155,39 @@ public ExecutorService aggregateExecutorService() {
81
155
new CustomizableThreadFactory (properties .getThreadPrefix ()));
82
156
}
83
157
84
- private void scanProviders (DataProviderRepository repository ) {
85
- if (properties .getBasePackages () != null ) {
86
- for (String basePackage : properties .getBasePackages ()) {
87
- Reflections reflections = new Reflections (basePackage , new MethodAnnotationsScanner ());
88
- Set <Method > providerMethods = reflections .getMethodsAnnotatedWith (DataProvider .class );
89
- for (Method method : providerMethods ) {
90
- dealProviderMethod (repository , method );
91
- }
92
- }
93
- }
94
- }
95
-
96
- private void dealProviderMethod (DataProviderRepository repository , Method method ) {
97
- DataProvider beanProvider = AnnotationUtils .findAnnotation (method , DataProvider .class );
98
- @ SuppressWarnings ("ConstantConditions" )
99
- String dataId = beanProvider .id ();
100
- Assert .isTrue (Modifier .isPublic (method .getModifiers ()),"data provider method must be public" );
101
- Assert .isTrue (! StringUtils .isEmpty (dataId ),"data id must be not null!" );
102
- DataProvideDefinition provider = DefinitionUtils .getProvideDefinition (method );
103
- provider .setId (dataId );
104
- provider .setIdempotent (beanProvider .idempotent ());
105
- provider .setTimeout (beanProvider .timeout () > 0 ? beanProvider .timeout () : properties .getDefaultTimeout ());
106
- repository .put (dataId ,provider );
158
+ /**
159
+ * 允许用户自定义provider存储
160
+ *
161
+ * @return provider数据仓库
162
+ */
163
+ @ Bean (name = "dataProviderRepository" )
164
+ @ ConditionalOnMissingBean (DataProviderRepository .class )
165
+ public DataProviderRepository dataProviderRepository () {
166
+ return new DataProviderRepositoryImpl ();
107
167
}
108
168
109
169
110
- private RuntimeSettings createRuntimeSettings () {
111
- RuntimeSettings runtimeSettings = new RuntimeSettings ();
112
- runtimeSettings .setEnableLogging (properties .getEnableLogging () != null
113
- ? properties .getEnableLogging () : false );
114
- runtimeSettings .setIgnoreException (properties .isIgnoreException ());
115
- runtimeSettings .setTimeout (properties .getDefaultTimeout ());
116
- return runtimeSettings ;
170
+ @ Bean (name = "aggregateQueryInterceptorChain" )
171
+ @ ConditionalOnMissingBean (AggregateQueryInterceptorChain .class )
172
+ public AggregateQueryInterceptorChain aggregateQueryInterceptorChain () {
173
+ Map <String , AggregateQueryInterceptor > interceptorMap = applicationContext .getBeansOfType (AggregateQueryInterceptor .class );
174
+ AggregateQueryInterceptorChainImpl interceptorChain = new AggregateQueryInterceptorChainImpl ();
175
+ if (interceptorMap != null && ! interceptorMap .isEmpty ()) {
176
+ List <AggregateQueryInterceptor > interceptors = new ArrayList <>(interceptorMap .values ());
177
+ interceptors .sort (new Comparator <AggregateQueryInterceptor >() {
178
+ @ Override
179
+ public int compare (AggregateQueryInterceptor o1 , AggregateQueryInterceptor o2 ) {
180
+ Order order1 = o1 .getClass ().getAnnotation (Order .class );
181
+ Order order2 = o2 .getClass ().getAnnotation (Order .class );
182
+ int oi1 = order1 == null ? Ordered .LOWEST_PRECEDENCE : order1 .value ();
183
+ int oi2 = order2 == null ? Ordered .LOWEST_PRECEDENCE : order2 .value ();
184
+ return oi1 - oi2 ;
185
+ }
186
+ });
187
+ for (AggregateQueryInterceptor interceptor : interceptors ) {
188
+ interceptorChain .addInterceptor (interceptor );
189
+ }
190
+ }
191
+ return interceptorChain ;
117
192
}
118
-
119
193
}
0 commit comments