20
20
import java .util .stream .Stream ;
21
21
import java .util .stream .StreamSupport ;
22
22
23
- import static graphql .execution .ExecutionParameters .newParameters ;
24
23
import static java .util .stream .Collectors .toList ;
25
24
26
25
public class ReactiveExecutionStrategy extends ExecutionStrategy {
@@ -33,17 +32,11 @@ public ExecutionResult execute(ExecutionContext context, ExecutionParameters par
33
32
context ,
34
33
source ,
35
34
__ -> fields .entrySet ().stream (),
36
- (entry , sourceValue ) -> {
37
- ReactiveContext subContext = new ReactiveContext (context , entry .getKey ());
38
-
39
- ExecutionParameters newParameters = newParameters ()
40
- .typeInfo (parameters .typeInfo ())
41
- .fields (parameters .fields ())
42
- .arguments (parameters .arguments ())
43
- .source (source == null ? null : sourceValue )
44
- .build ();
45
- return resolveField (subContext , newParameters , entry .getValue ());
46
- },
35
+ (entry , sourceValue ) -> resolveField (
36
+ new ReactiveContext (context , entry .getKey ()),
37
+ transformParametersSource (parameters , source == null ? null : sourceValue ),
38
+ entry .getValue ()
39
+ ),
47
40
results -> {
48
41
Map <Object , Object > result = new HashMap <>();
49
42
for (Object entry : results ) {
@@ -77,15 +70,7 @@ protected ExecutionResult completeValue(ExecutionContext context, ExecutionParam
77
70
78
71
if ((fieldType instanceof GraphQLScalarType || fieldType instanceof GraphQLEnumType ) && result instanceof Publisher ) {
79
72
Flowable <Change > changesFlow = Flowable .fromPublisher ((Publisher <?>) result )
80
- .map (it -> {
81
- ExecutionParameters newParameters = newParameters ()
82
- .source (it )
83
- .fields (parameters .fields ())
84
- .typeInfo (parameters .typeInfo ())
85
- .build ();
86
-
87
- return new Change (context , completeValue (context , newParameters , fields ).getData ());
88
- });
73
+ .map (it -> new Change (context , completeValue (context , transformParametersSource (parameters , it ), fields ).getData ()));
89
74
90
75
return new ExecutionResultImpl (changesFlow , null );
91
76
}
@@ -107,15 +92,11 @@ protected ExecutionResult completeValue(ExecutionContext context, ExecutionParam
107
92
AtomicInteger i = new AtomicInteger ();
108
93
return stream .map (it -> new SimpleEntry <>(i .getAndIncrement (), adapt (it )));
109
94
},
110
- (entry , __ ) -> {
111
- ExecutionParameters newParameters = ExecutionParameters .newParameters ()
112
- .source (entry .getValue ())
113
- .fields (parameters .fields ())
114
- .arguments (parameters .arguments ())
115
- .typeInfo (parameters .typeInfo ().asType (wrappedType ))
116
- .build ();
117
- return completeValue (new ReactiveContext (context , entry .getKey ()), newParameters , fields );
118
- },
95
+ (entry , __ ) -> completeValue (
96
+ new ReactiveContext (context , entry .getKey ()),
97
+ transformParametersSource (parameters , entry .getValue (), parameters .typeInfo ().asType (wrappedType )),
98
+ fields
99
+ ),
119
100
results -> Stream .of (results )
120
101
.map (SimpleEntry .class ::cast )
121
102
.sorted (Comparator .comparingInt (it -> (Integer ) it .getKey ()))
@@ -127,6 +108,19 @@ protected ExecutionResult completeValue(ExecutionContext context, ExecutionParam
127
108
return super .completeValue (context , parameters , fields );
128
109
}
129
110
111
+ protected ExecutionParameters transformParametersSource (ExecutionParameters parameters , Object newSource ) {
112
+ return transformParametersSource (parameters , newSource , parameters .typeInfo ());
113
+ }
114
+
115
+ protected ExecutionParameters transformParametersSource (ExecutionParameters parameters , Object newSource , TypeInfo typeInfo ) {
116
+ return ExecutionParameters .newParameters ()
117
+ .source (newSource )
118
+ .fields (parameters .fields ())
119
+ .arguments (parameters .arguments ())
120
+ .typeInfo (typeInfo )
121
+ .build ();
122
+ }
123
+
130
124
protected <K , V > ExecutionResultImpl complexChangesFlow (
131
125
ExecutionContext context ,
132
126
Object source ,
0 commit comments