@@ -99,7 +99,7 @@ internal async Task<IExecutionResult> ExecuteAsync(
99
99
100
100
if ( scopeDataLoader )
101
101
{
102
- // we ensure that at the begin of each execution there is a fresh batching scope.
102
+ // we ensure that at the beginning of each execution there is a fresh batching scope.
103
103
services . InitializeDataLoaderScope ( ) ;
104
104
}
105
105
@@ -153,7 +153,14 @@ internal async Task<IExecutionResult> ExecuteAsync(
153
153
_contextPool . Return ( context ) ;
154
154
}
155
155
156
- scope ? . Dispose ( ) ;
156
+ if ( scope is IAsyncDisposable asyncScope )
157
+ {
158
+ await asyncScope . DisposeAsync ( ) ;
159
+ }
160
+ else
161
+ {
162
+ scope ? . Dispose ( ) ;
163
+ }
157
164
}
158
165
}
159
166
@@ -174,7 +181,7 @@ public Task<IResponseStream> ExecuteBatchAsync(
174
181
175
182
private async IAsyncEnumerable < IOperationResult > CreateResponseStream (
176
183
OperationRequestBatch requestBatch ,
177
- [ EnumeratorCancellation ] CancellationToken cancellationToken = default )
184
+ [ EnumeratorCancellation ] CancellationToken ct = default )
178
185
{
179
186
IServiceScope ? scope = null ;
180
187
@@ -197,6 +204,31 @@ private async IAsyncEnumerable<IOperationResult> CreateResponseStream(
197
204
// we ensure that at the start of each execution there is a fresh batching scope.
198
205
services . InitializeDataLoaderScope ( ) ;
199
206
207
+ try
208
+ {
209
+ await foreach ( var result in ExecuteBatchStream ( requestBatch , services , ct ) . ConfigureAwait ( false ) )
210
+ {
211
+ yield return result ;
212
+ }
213
+ }
214
+ finally
215
+ {
216
+ if ( scope is IAsyncDisposable asyncScope )
217
+ {
218
+ await asyncScope . DisposeAsync ( ) ;
219
+ }
220
+ else
221
+ {
222
+ scope ? . Dispose ( ) ;
223
+ }
224
+ }
225
+ }
226
+
227
+ private async IAsyncEnumerable < IOperationResult > ExecuteBatchStream (
228
+ OperationRequestBatch requestBatch ,
229
+ IServiceProvider services ,
230
+ [ EnumeratorCancellation ] CancellationToken ct = default )
231
+ {
200
232
var requests = requestBatch . Requests ;
201
233
var requestCount = requests . Count ;
202
234
var tasks = new List < Task > ( requestCount ) ;
@@ -205,7 +237,7 @@ private async IAsyncEnumerable<IOperationResult> CreateResponseStream(
205
237
206
238
for ( var i = 0 ; i < requestCount ; i ++ )
207
239
{
208
- tasks . Add ( ExecuteBatchItemAsync ( requests [ i ] , i , completed , cancellationToken ) ) ;
240
+ tasks . Add ( ExecuteBatchItemAsync ( WithServices ( requests [ i ] , services ) , i , completed , ct ) ) ;
209
241
}
210
242
211
243
var buffer = new IOperationResult [ 8 ] ;
@@ -228,7 +260,7 @@ private async IAsyncEnumerable<IOperationResult> CreateResponseStream(
228
260
229
261
if ( task . Status is not TaskStatus . RanToCompletion )
230
262
{
231
- // we await to throw if its not successful.
263
+ // we await to throw if it's not successful.
232
264
await task ;
233
265
}
234
266
@@ -252,6 +284,21 @@ private async IAsyncEnumerable<IOperationResult> CreateResponseStream(
252
284
while ( tasks . Count > 0 || bufferCount > 0 ) ;
253
285
}
254
286
287
+ private static IOperationRequest WithServices ( IOperationRequest request , IServiceProvider services )
288
+ {
289
+ switch ( request )
290
+ {
291
+ case OperationRequest operationRequest :
292
+ return operationRequest . WithServices ( services ) ;
293
+
294
+ case VariableBatchRequest variableBatchRequest :
295
+ return variableBatchRequest . WithServices ( services ) ;
296
+
297
+ default :
298
+ throw new InvalidOperationException ( "Unexpected request type." ) ;
299
+ }
300
+ }
301
+
255
302
private async Task ExecuteBatchItemAsync (
256
303
IOperationRequest request ,
257
304
int requestIndex ,
0 commit comments