Skip to content

Commit 67c23ea

Browse files
committed
Use vthread pool instead of core.async pipeline-blocking
This is a port of a fix we applied to data-loader to remove head-of-line blocking from the flow. We lose ordering guarantees, but we do not require them anyway. Signed-off-by: Greg Haskins <greg@manetu.com>
1 parent 7d35adf commit 67c23ea

File tree

1 file changed

+13
-4
lines changed

1 file changed

+13
-4
lines changed

src/manetu/sparql_loadtest/core.clj

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
[medley.core :as m]
66
[promesa.core :as p]
77
[taoensso.timbre :as log]
8-
[clojure.core.async :refer [<! go go-loop] :as async]
8+
[clojure.core.async :refer [<!! <! >!! go go-loop] :as async]
99
[progrock.core :as pr]
1010
[doric.core :refer [table]]
1111
[ring.util.codec :as ring.codec]
@@ -34,9 +34,18 @@
3434
:duration d)))))))
3535

3636
(defn- pipeline-blocking
37-
[nr xf in]
37+
[nr f in]
3838
(let [out (async/chan nr)]
39-
(async/pipeline-blocking nr out xf in)
39+
(-> (p/all
40+
(map (fn [_]
41+
(p/vthread
42+
(loop []
43+
(when-let [m (<!! in)]
44+
(>!! out (f m))
45+
(recur)))))
46+
(range nr)))
47+
(p/then (fn [_]
48+
(async/close! out))))
4049
out))
4150

4251
(defn async-xform
@@ -57,7 +66,7 @@
5766
(log/trace "launching with concurrency:" concurrency)
5867
(let [query (-> query slurp ring.codec/url-encode)]
5968
(->> (binding-loader/get-bindings bindings nr batch-size)
60-
(pipeline-blocking concurrency (map (partial execute-query ctx query)))
69+
(pipeline-blocking concurrency (partial execute-query ctx query))
6170
(async-xform (mapcat (fn [{:keys [success result] :as x}]
6271
(if (true? success)
6372
(map (fn [r] (assoc x :result r)) result)

0 commit comments

Comments
 (0)