@@ -208,12 +208,10 @@ function initialize_streaming!(self_streams, spec, task)
208
208
end
209
209
output_buffer = get (spec. options, :stream_output_buffer , ProcessRingBuffer)
210
210
stream = Stream {T,output_buffer} (output_buffer_amount)
211
- spec. options = NamedTuple (filter (opt -> opt[1 ] != :stream_output_buffer &&
212
- opt[1 ] != :stream_output_buffer_amount ,
213
- Base. pairs (spec. options)))
214
211
self_streams[task. uid] = stream
215
212
216
- spec. f = StreamingFunction (spec. f, stream)
213
+ max_evals = get (spec. options, :stream_max_evals , - 1 )
214
+ spec. f = StreamingFunction (spec. f, stream, max_evals)
217
215
spec. options = merge (spec. options, (;occupancy= Dict (Any=> 0 )))
218
216
219
217
# Register Stream globally
@@ -256,6 +254,7 @@ const STREAM_THUNK_ID = TaskLocalValue{Int}(()->0)
256
254
struct StreamingFunction{F, S}
257
255
f:: F
258
256
stream:: S
257
+ max_evals:: Int
259
258
end
260
259
chunktype (sf:: StreamingFunction{F} ) where F = F
261
260
function (sf:: StreamingFunction )(args... ; kwargs... )
@@ -319,14 +318,17 @@ end
319
318
function stream! (sf:: StreamingFunction , uid,
320
319
args:: Tuple , kwarg_names:: Tuple , kwarg_values:: Tuple )
321
320
f = move (thunk_processor (), sf. f)
322
- while true
321
+ counter = 0
322
+
323
+ while sf. max_evals < 0 || counter < sf. max_evals
323
324
# Get values from Stream args/kwargs
324
325
stream_args = _stream_take_values! (args, uid)
325
326
stream_kwarg_values = _stream_take_values! (kwarg_values, uid)
326
327
stream_kwargs = _stream_namedtuple (kwarg_names, stream_kwarg_values)
327
328
328
329
# Run a single cycle of f
329
330
stream_result = f (stream_args... ; stream_kwargs... )
331
+ counter += 1
330
332
331
333
# Exit streaming on graceful request
332
334
if stream_result isa FinishStream
@@ -412,7 +414,8 @@ function finalize_streaming!(tasks::Vector{Pair{DTaskSpec,DTask}}, self_streams)
412
414
413
415
# Filter out all streaming options
414
416
to_filter = (:stream_input_buffer , :stream_input_buffer_amount ,
415
- :stream_output_buffer , :stream_output_buffer_amount )
417
+ :stream_output_buffer , :stream_output_buffer_amount ,
418
+ :stream_max_evals )
416
419
spec. options = NamedTuple (filter (opt -> ! (opt[1 ] in to_filter),
417
420
Base. pairs (spec. options)))
418
421
if haskey (spec. options, :propagates )
0 commit comments