Skip to content

Commit 60d89a1

Browse files
committed
Merge pull request #5 from TheClimateCorporation/stop-tidy
Stop tasks when one throws an exception
2 parents fe4d627 + 0ce4438 commit 60d89a1

File tree

5 files changed

+171
-109
lines changed

5 files changed

+171
-109
lines changed

CHANGES.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
CHANGES
22

3+
0.2.2
4+
5+
- Made pmap behave like map does when a function throws an exception
6+
37
0.2.1
48

59
- Made it work with java 1.6

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ The claypoole library provides threadpool-based parallel versions of Clojure
44
functions such as `pmap`, `future`, and `for`.
55

66
Claypoole is available in the Clojars repository. Just use this leiningen
7-
dependency: `[com.climate/claypoole "0.2.1"]`.
7+
dependency: `[com.climate/claypoole "0.2.2"]`.
88

99
## Why do you use claypoole?
1010

src/clj/com/climate/claypoole.clj

Lines changed: 99 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
PriorityThreadpoolImpl]
2828
[java.util.concurrent
2929
Callable
30+
CancellationException
3031
ExecutorService
3132
Future
3233
LinkedBlockingQueue
@@ -263,7 +264,91 @@
263264
serial. This may be helpful during profiling, for example.
264265
"
265266
[pool & body]
266-
`(future-call ~pool (^{:once true} fn [] ~@body)))
267+
`(future-call ~pool (^{:once true} fn future-body [] ~@body)))
268+
269+
(defn- make-canceller
270+
"Creates a function to cancel a bunch of futures."
271+
[future-reader future-seq]
272+
(let [first-already-cancelled (atom Long/MAX_VALUE)]
273+
(fn [i]
274+
(let [cancel-end @first-already-cancelled]
275+
;; Don't re-kill futures we've already zapped to prevent an O(n^2)
276+
;; explosion.
277+
(when (< i cancel-end)
278+
(swap! first-already-cancelled min i)
279+
;; Kill the future reader.
280+
(future-cancel future-reader)
281+
;; Stop the tasks above i before cancel-end.
282+
(doseq [f (->> future-seq (take cancel-end) (drop (inc i)))]
283+
(future-cancel f)))))))
284+
285+
(defn- pmap-core
286+
"Given functions to customize for pmap or upmap, create a function that does
287+
the hard work of pmap."
288+
[send-result read-result]
289+
(fn [pool f arg-seqs]
290+
(let [[shutdown? pool] (impl/->threadpool pool)
291+
;; Use map to handle the argument sequences.
292+
args (apply map vector (map impl/unchunk arg-seqs))
293+
;; Pre-declare the canceller because it needs the tasks but the tasks
294+
;; need it too.
295+
canceller (promise)
296+
start-task (fn [i a]
297+
;; We can't directly make a future add itself to a
298+
;; queue. Instead, we use a promise for indirection.
299+
(let [p (promise)]
300+
(deliver p (future-call
301+
pool
302+
(with-meta
303+
;; Try to run the task, but
304+
;; definitely add the future to
305+
;; the queue.
306+
#(try
307+
(let [result (apply f a)]
308+
(send-result @p)
309+
result)
310+
;; Even if we had an exception
311+
;; running the task, make sure the
312+
;; future shows up in the queue.
313+
(catch Exception e
314+
;; We've still got to send that
315+
;; result, even if it was an
316+
;; exception, and we have to do it
317+
;; before we start the canceller.
318+
(send-result @p)
319+
;; If we've had an exception, kill
320+
;; future and ongoing processes.
321+
(@canceller i)
322+
(throw e)))
323+
;; Add the args to the function's
324+
;; metadata for prioritization.
325+
{:args a})))
326+
@p))
327+
futures (map-indexed start-task args)
328+
;; Start all the tasks in a real future, so we don't block.
329+
read-future (core/future
330+
(try
331+
;; Force all those futures to start.
332+
(dorun futures)
333+
;; If we created a temporary pool, shut it down.
334+
(finally (when shutdown? (shutdown pool)))))]
335+
(deliver canceller (make-canceller read-future futures))
336+
;; Read results as available.
337+
(concat (map read-result futures)
338+
;; Deref the read-future to get its exceptions, if it has any.
339+
(lazy-seq (try @read-future
340+
;; But if it was cancelled, the user doesn't care.
341+
(catch CancellationException e)))))))
342+
343+
(defn- pmap-boilerplate
344+
"Do boilerplate pmap checks, then call the real pmap function."
345+
[pool f arg-seqs pmap-fn]
346+
(when (empty? arg-seqs)
347+
(throw (IllegalArgumentException.
348+
"pmap requires at least one sequence to map over")))
349+
(if (serial? pool)
350+
(doall (apply map f arg-seqs))
351+
(pmap-fn pool f arg-seqs)))
267352

268353
(defn pmap
269354
"Like clojure.core.pmap, except:
@@ -286,78 +371,23 @@
286371
serial via (doall map). This may be helpful during profiling, for example.
287372
"
288373
[pool f & arg-seqs]
289-
(when (empty? arg-seqs)
290-
(throw (IllegalArgumentException.
291-
"pmap requires at least one sequence to map over")))
292-
(if (serial? pool)
293-
(doall (apply map f arg-seqs))
294-
(let [[shutdown? pool] (impl/->threadpool pool)
295-
;; Use map to handle the argument sequences.
296-
args (apply map vector (map impl/unchunk arg-seqs))
297-
futures (for [a args]
298-
(future-call pool
299-
(with-meta #(apply f a)
300-
{:args a})))
301-
;; Start eagerly parallel processing.
302-
read-future (core/future
303-
(try
304-
(dorun futures)
305-
(finally (when shutdown? (shutdown pool)))))]
306-
;; Read results as available.
307-
(concat (map deref futures)
308-
;; Deref the reading future to get its exceptions, if it had any.
309-
(lazy-seq (deref read-future))))))
374+
(pmap-boilerplate pool f arg-seqs
375+
;; pmap is easy--just deref the futures.
376+
(let [send-result (constantly nil)
377+
read-result deref]
378+
(pmap-core send-result read-result))))
310379

311380
(defn upmap
312-
"Like pmap, except the return value is a sequence of results ordered by
381+
"Like pmap, except that the return value is a sequence of results ordered by
313382
*completion time*, not by input order."
314383
[pool f & arg-seqs]
315-
(when (empty? arg-seqs)
316-
(throw (IllegalArgumentException.
317-
"upmap requires at least one sequence to map over")))
318-
(if (serial? pool)
319-
(doall (apply map f arg-seqs))
320-
(let [[shutdown? pool] (impl/->threadpool pool)
321-
;; Use map to handle the argument sequences.
322-
args (apply map vector (map impl/unchunk arg-seqs))
323-
q (LinkedBlockingQueue.)
324-
;; Start eagerly parallel processing.
325-
read-future (core/future
326-
;; Try to run schedule all the tasks, but definitely
327-
;; shutdown the pool if necessary.
328-
(try
329-
(doseq [a args
330-
:let [p (promise)]]
331-
;; Try to schedule one task, but definitely add
332-
;; something to the queue for the task.
333-
(try
334-
;; We can't directly make a future add itself to
335-
;; a queue. Instead, we use a promise for
336-
;; indirection.
337-
(deliver p (future-call
338-
pool
339-
(with-meta
340-
;; Try to run the task, but
341-
;; definitely add the future to
342-
;; the queue.
343-
#(try
344-
(apply f a)
345-
;; Even if we had an exception
346-
;; running the task, make sure
347-
;; the future shows up in the
348-
;; queue.
349-
(finally (.add q @p)))
350-
{:args a})))
351-
;; If we had an exception scheduling a task,
352-
;; let's plan to re-throw that at queue read
353-
;; time.
354-
(catch Exception e
355-
(.add q (delay (throw e))))))
356-
(finally (when shutdown? (shutdown pool)))))]
357-
;; Read results as available.
358-
(concat (for [_ args] (-> q .take deref))
359-
;; Deref the reading future to get its exceptions, if it had any.
360-
(lazy-seq (deref read-future))))))
384+
(pmap-boilerplate pool f arg-seqs
385+
;; upmap is a little complex; read data out of a queue to
386+
;; get the earliest-available data.
387+
(let [q (LinkedBlockingQueue.)
388+
send-result (fn [result] (.add q result))
389+
read-result (fn [_] (-> q .take deref))]
390+
(pmap-core send-result read-result))))
361391

362392
(defn pcalls
363393
"Like clojure.core.pcalls, except it takes a threadpool. For more detail on

src/clj/com/climate/claypoole/impl.clj

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,10 @@
128128
129129
Based on http://stackoverflow.com/questions/3407876/how-do-i-avoid-clojures-chunking-behavior-for-lazy-seqs-that-i-want-to-short-ci"
130130
[s]
131-
(when-let [s (seq s)]
132-
(cons (first s)
133-
(lazy-seq (unchunk (rest s))))))
131+
(lazy-seq
132+
(when-let [s (seq s)]
133+
(cons (first s)
134+
(unchunk (rest s))))))
134135

135136
(defn threadpool
136137
"Make a threadpool. It should be shutdown when no longer needed.

0 commit comments

Comments
 (0)