Skip to content

Commit ae7b0ad

Browse files
authored
Merge pull request #10 from lvyahui8/develop
Develop merge into master
2 parents f3a9ae2 + 90195b3 commit ae7b0ad

File tree

23 files changed

+326
-27
lines changed

23 files changed

+326
-27
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ DataBeanAggregateQueryFacade dataBeanAggregateQueryFacade;
114114

115115
#### 方式一: 函数式调用
116116

117+
注意这里不能将函数式调用改为Lambda表达式, 两者的实际行为是不一致的.
118+
117119
```java
118120
User user = dataBeanAggregateQueryFacade.get(
119121
Collections.singletonMap("userId", 1L),

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ limitations under the License.
2020

2121
<groupId>io.github.lvyahui8</groupId>
2222
<artifactId>spring-boot-data-aggregator</artifactId>
23-
<version>1.1.0</version>
23+
<version>1.1.1</version>
2424
<modules>
2525
<module>spring-boot-data-aggregator-core</module>
2626
<module>spring-boot-data-aggregator-autoconfigure</module>

spring-boot-data-aggregator-autoconfigure/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>spring-boot-data-aggregator</artifactId>
77
<groupId>io.github.lvyahui8</groupId>
8-
<version>1.1.0</version>
8+
<version>1.1.1</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

spring-boot-data-aggregator-autoconfigure/src/main/java/io/github/lvyahui8/spring/autoconfigure/BeanAggregateAutoConfiguration.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.github.lvyahui8.spring.aggregate.interceptor.AggregateQueryInterceptor;
77
import io.github.lvyahui8.spring.aggregate.interceptor.AggregateQueryInterceptorChain;
88
import io.github.lvyahui8.spring.aggregate.interceptor.impl.AggregateQueryInterceptorChainImpl;
9+
import io.github.lvyahui8.spring.aggregate.model.DataConsumeDefinition;
910
import io.github.lvyahui8.spring.aggregate.model.DataProvideDefinition;
1011
import io.github.lvyahui8.spring.aggregate.repository.DataProviderRepository;
1112
import io.github.lvyahui8.spring.aggregate.repository.impl.DataProviderRepositoryImpl;
@@ -14,6 +15,7 @@
1415
import io.github.lvyahui8.spring.aggregate.util.DefinitionUtils;
1516
import io.github.lvyahui8.spring.annotation.DataProvider;
1617
import lombok.extern.slf4j.Slf4j;
18+
import org.apache.commons.lang3.StringUtils;
1719
import org.reflections.Reflections;
1820
import org.reflections.scanners.MethodAnnotationsScanner;
1921
import org.springframework.beans.BeansException;
@@ -30,7 +32,6 @@
3032
import org.springframework.core.annotation.Order;
3133
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
3234
import org.springframework.util.Assert;
33-
import org.springframework.util.StringUtils;
3435

3536
import java.lang.reflect.Method;
3637
import java.lang.reflect.Modifier;
@@ -39,6 +40,7 @@
3940
import java.util.concurrent.LinkedBlockingDeque;
4041
import java.util.concurrent.ThreadPoolExecutor;
4142
import java.util.concurrent.TimeUnit;
43+
import java.util.stream.Collectors;
4244

4345
/**
4446
* @author lvyahui (lvyahui8@gmail.com,lvyahui8@126.com)
@@ -65,11 +67,43 @@ public DataBeanAggregateQueryFacade dataBeanAggregateQueryFacade(
6567
return new DataBeanAggregateQueryFacadeImpl(dataBeanAggregateQueryService(dataProviderRepository));
6668
}
6769

70+
private void checkCycle(Map<String,Set<String>> graphAdjMap) {
71+
Map<String,Integer> visitStatusMap = new HashMap<>(graphAdjMap.size() * 2);
72+
for (Map.Entry<String, Set<String>> item : graphAdjMap.entrySet()) {
73+
if (visitStatusMap.containsKey(item.getKey())) {
74+
continue;
75+
}
76+
dfs(graphAdjMap,visitStatusMap,item.getKey());
77+
}
78+
}
79+
80+
private void dfs(Map<String,Set<String>> graphAdjMap,Map<String,Integer> visitStatusMap, String node) {
81+
if (visitStatusMap.containsKey(node)) {
82+
if(visitStatusMap.get(node) == 1) {
83+
List<String> relatedNodes = new ArrayList<>();
84+
for (Map.Entry<String,Integer> item : visitStatusMap.entrySet()) {
85+
if (item.getValue() == 1) {
86+
relatedNodes.add(item.getKey());
87+
}
88+
}
89+
throw new IllegalStateException("There are loops in the dependency graph. Related nodes:" + StringUtils.join(relatedNodes));
90+
}
91+
return ;
92+
}
93+
visitStatusMap.put(node,1);
94+
log.info("visited:{}", node);
95+
for (String relateNode : graphAdjMap.get(node)) {
96+
dfs(graphAdjMap,visitStatusMap,relateNode);
97+
}
98+
visitStatusMap.put(node,2);
99+
}
100+
68101
@Bean
69102
@ConditionalOnMissingBean
70103
public DataBeanAggregateQueryService dataBeanAggregateQueryService (
71104
@Qualifier("dataProviderRepository") DataProviderRepository dataProviderRepository) {
72105
if(properties.getBasePackages() != null) {
106+
Map<String,Set<String>> provideDependMap = new HashMap<>();
73107
for (String basePackage : properties.getBasePackages()) {
74108
Reflections reflections = new Reflections(basePackage, new MethodAnnotationsScanner());
75109
Set<Method> providerMethods = reflections.getMethodsAnnotatedWith(DataProvider.class);
@@ -80,13 +114,18 @@ public DataBeanAggregateQueryService dataBeanAggregateQueryService (
80114
Assert.isTrue(Modifier.isPublic(method.getModifiers()),"data provider method must be public");
81115
Assert.isTrue(! StringUtils.isEmpty(dataId),"data id must be not null!");
82116
DataProvideDefinition provider = DefinitionUtils.getProvideDefinition(method);
117+
83118
provider.setId(dataId);
84119
provider.setIdempotent(beanProvider.idempotent());
85120
provider.setTimeout(beanProvider.timeout() > 0 ? beanProvider.timeout() : properties.getDefaultTimeout());
121+
Assert.isTrue(! dataProviderRepository.contains(dataId), "Data providers with the same name are not allowed. dataId: " + dataId);
122+
provideDependMap.put(dataId,provider.getDepends().stream().map(DataConsumeDefinition::getId).collect(Collectors.toSet()));
86123
dataProviderRepository.put(provider);
87124
}
88125
}
126+
checkCycle(provideDependMap);
89127
}
128+
90129
DataBeanAggregateQueryServiceImpl service = new DataBeanAggregateQueryServiceImpl();
91130
RuntimeSettings runtimeSettings = new RuntimeSettings();
92131
runtimeSettings.setEnableLogging(properties.getEnableLogging() != null
@@ -97,6 +136,7 @@ public DataBeanAggregateQueryService dataBeanAggregateQueryService (
97136
service.setRuntimeSettings(runtimeSettings);
98137
service.setExecutorService(aggregateExecutorService());
99138
service.setInterceptorChain(aggregateQueryInterceptorChain());
139+
service.setTaskWrapperClazz(properties.getTaskWrapperClass());
100140
service.setApplicationContext(applicationContext);
101141
return service;
102142
}
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.github.lvyahui8.spring.autoconfigure;
22

3+
import io.github.lvyahui8.spring.aggregate.service.AsyncQueryTaskWrapper;
4+
import io.github.lvyahui8.spring.aggregate.service.AsyncQueryTaskWrapperAdapter;
35
import lombok.Data;
46
import org.springframework.boot.context.properties.ConfigurationProperties;
57

@@ -13,29 +15,33 @@ public class BeanAggregateProperties {
1315
/**
1416
* Packages that need to scan for aggregated annotations
1517
*/
16-
private String[] basePackages;
18+
private String[] basePackages;
1719
/**
1820
* Thread name prefix for asynchronous threads
1921
*/
20-
private String threadPrefix = "aggregateTask-";
22+
private String threadPrefix = "aggregateTask-";
2123
/**
2224
* Thread size of the asynchronous thread pool
2325
*/
2426
private int threadNumber = Runtime.getRuntime().availableProcessors() * 3;
2527
/**
2628
* The size of the queue that holds the task to be executed
2729
*/
28-
private int queueSize = 1000;
30+
private int queueSize = 1000;
2931
/**
3032
* Set a default timeout for the method of providing data
3133
*/
32-
private Long defaultTimeout = 3000L;
34+
private Long defaultTimeout = 3000L;
3335
/**
3436
* Allow output log
3537
*/
36-
private Boolean enableLogging = true;
38+
private Boolean enableLogging = true;
3739
/**
3840
* Ignore exception thrown by asynchronous execution, method returns null value
3941
*/
40-
private boolean ignoreException = false;
42+
private boolean ignoreException = false;
43+
/**
44+
* Async task implement
45+
*/
46+
private Class<? extends AsyncQueryTaskWrapper> taskWrapperClass = AsyncQueryTaskWrapperAdapter.class;
4147
}

spring-boot-data-aggregator-core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>spring-boot-data-aggregator</artifactId>
77
<groupId>io.github.lvyahui8</groupId>
8-
<version>1.1.0</version>
8+
<version>1.1.1</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

spring-boot-data-aggregator-core/src/main/java/io/github/lvyahui8/spring/aggregate/model/DataConsumeDefinition.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@
22

33
import lombok.Data;
44

5+
import java.util.Map;
6+
57
/**
68
* @author lvyahui (lvyahui8@gmail.com,lvyahui8@126.com)
79
* @since 2019/6/2 22:13
810
*/
911
@Data
1012
public class DataConsumeDefinition {
11-
private String id;
12-
private Class<?> clazz;
13-
private Boolean ignoreException;
13+
private String id;
14+
private Class<?> clazz;
15+
private Boolean ignoreException;
16+
private Map<String,String> dynamicParameterKeyMap;
17+
private String originalParameterName;
1418
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.github.lvyahui8.spring.aggregate.service;
2+
3+
import java.util.concurrent.Callable;
4+
5+
/**
6+
* @author lvyahui (lvyahui8@gmail.com,lvyahui8@126.com)
7+
* @since 2019/12/25 22:40
8+
*/
9+
public abstract class AsyncQueryTask<T> implements Callable<T> {
10+
Thread taskFromThread;
11+
AsyncQueryTaskWrapper asyncQueryTaskWrapper;
12+
13+
public AsyncQueryTask(Thread taskFromThread, AsyncQueryTaskWrapper asyncQueryTaskWrapper) {
14+
this.taskFromThread = taskFromThread;
15+
this.asyncQueryTaskWrapper = asyncQueryTaskWrapper;
16+
}
17+
18+
@Override
19+
public T call() throws Exception {
20+
try {
21+
if(asyncQueryTaskWrapper != null) {
22+
asyncQueryTaskWrapper.beforeExecute(taskFromThread);
23+
}
24+
return execute();
25+
} finally {
26+
if (asyncQueryTaskWrapper != null) {
27+
asyncQueryTaskWrapper.afterExecute(taskFromThread);
28+
}
29+
}
30+
}
31+
32+
/**
33+
*
34+
* @return
35+
* @throws Exception
36+
*/
37+
public abstract T execute() throws Exception;
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.github.lvyahui8.spring.aggregate.service;
2+
3+
/**
4+
* @author lvyahui (lvyahui8@gmail.com,lvyahui8@126.com)
5+
* @since 2019/12/24 23:07
6+
*/
7+
public interface AsyncQueryTaskWrapper {
8+
/**
9+
* 任务提交之前执行. 此方法在提交任务的那个线程中执行
10+
*/
11+
void beforeSubmit();
12+
13+
/**
14+
* 任务开始执行前执行. 此方法在异步线程中执行
15+
* @param taskFrom 提交任务的那个线程
16+
*/
17+
void beforeExecute(Thread taskFrom);
18+
19+
/**
20+
* 任务执行结束后执行. 此方法在异步线程中执行
21+
* 注意, 不管用户的方法抛出何种异常, 此方法都会执行.
22+
* @param taskFrom 提交任务的那个线程
23+
*/
24+
void afterExecute(Thread taskFrom);
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.github.lvyahui8.spring.aggregate.service;
2+
3+
/**
4+
* @author lvyahui (lvyahui8@gmail.com,lvyahui8@126.com)
5+
* @since 2019/12/25 22:57
6+
*/
7+
public class AsyncQueryTaskWrapperAdapter implements AsyncQueryTaskWrapper{
8+
9+
@Override
10+
public void beforeSubmit() {
11+
12+
}
13+
14+
@Override
15+
public void beforeExecute(Thread taskFrom) {
16+
17+
}
18+
19+
@Override
20+
public void afterExecute(Thread taskFrom) {
21+
22+
}
23+
}

0 commit comments

Comments
 (0)