Skip to content

Commit 58139e1

Browse files
Support user-defined response executor
1 parent 6add493 commit 58139e1

File tree

2 files changed

+80
-9
lines changed

2 files changed

+80
-9
lines changed

src/lsp4clj/server.clj

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
[lsp4clj.protocols.endpoint :as protocols.endpoint]
1010
[lsp4clj.trace :as trace]
1111
[promesa.core :as p]
12+
[promesa.exec :as p.exec]
1213
[promesa.protocols :as p.protocols])
1314
(:import
1415
(java.util.concurrent CancellationException)))
@@ -196,6 +197,7 @@
196197
trace-ch
197198
tracer*
198199
^java.time.Clock clock
200+
response-executor
199201
on-close
200202
request-id*
201203
pending-sent-requests*
@@ -357,13 +359,13 @@
357359
;; thread, whatever that thread is. Since the callbacks are not
358360
;; under our control, they are under our users' control, they could
359361
;; block. Therefore, we do not want the completing thread to be our
360-
;; thread. This is very easy for users to
361-
;; miss, therefore we complete the promise on the default executor.
362-
(p/thread-call :default
363-
(fn []
364-
(if error
365-
(p/reject! p (ex-info "Received error response" resp))
366-
(p/resolve! p result)))))
362+
;; thread. This is very easy for users to miss, therefore we
363+
;; complete the promise using an explicit executor.
364+
(p.exec/submit! response-executor
365+
(fn []
366+
(if error
367+
(p/reject! p (ex-info "Received error response" resp))
368+
(p/resolve! p result)))))
367369
(trace this trace/received-unmatched-response resp now)))
368370
(catch Throwable e
369371
(log-error-receiving this e resp))))
@@ -420,9 +422,10 @@
420422
(update server :tracer* reset! (trace/tracer-for-level trace-level)))
421423

422424
(defn chan-server
423-
[{:keys [output-ch input-ch log-ch trace? trace-level trace-ch clock on-close]
425+
[{:keys [output-ch input-ch log-ch trace? trace-level trace-ch clock on-close response-executor]
424426
:or {clock (java.time.Clock/systemDefaultZone)
425-
on-close (constantly nil)}}]
427+
on-close (constantly nil)
428+
response-executor :default}}]
426429
(let [;; before defaulting trace-ch, so that default is "off"
427430
tracer (trace/tracer-for-level (or trace-level
428431
(when (or trace? trace-ch) "verbose")
@@ -437,6 +440,7 @@
437440
:tracer* (atom tracer)
438441
:clock clock
439442
:on-close on-close
443+
:response-executor response-executor
440444
:request-id* (atom 0)
441445
:pending-sent-requests* (atom {})
442446
:pending-received-requests* (atom {})

test/lsp4clj/server_test.clj

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,73 @@
481481
(h/assert-take output-ch)))
482482
(server/shutdown server))))
483483

484+
(defn- core-async-dispatch-thread? [^Thread thread]
485+
(re-matches #"async-dispatch-\d+" (.getName thread)))
486+
487+
(deftest can-determine-core-async-dispatch-thread
488+
(testing "current thread"
489+
(is (not (core-async-dispatch-thread? (Thread/currentThread)))))
490+
(testing "thread running go blocks"
491+
(let [ch (async/chan)
492+
_ (async/go (async/>! ch (Thread/currentThread)))
493+
thread (async/<!! ch)]
494+
(is (core-async-dispatch-thread? thread))))
495+
(testing "thread running core.async thread macro"
496+
(let [ch (async/chan)
497+
_ (async/thread (async/>!! ch (Thread/currentThread)))
498+
thread (async/<!! ch)]
499+
(is (not (core-async-dispatch-thread? thread))))))
500+
501+
(deftest request-should-complete-on-a-suitable-executor
502+
(testing "successful completion"
503+
(let [input-ch (async/chan 3)
504+
output-ch (async/chan 3)
505+
server (server/chan-server {:output-ch output-ch
506+
:input-ch input-ch})
507+
_ (server/start server nil)
508+
thread-p (-> (server/send-request server "req" {:body "foo"})
509+
(p/then (fn [_] (Thread/currentThread))))
510+
client-rcvd-msg (h/assert-take output-ch)
511+
_ (async/put! input-ch (lsp.responses/response (:id client-rcvd-msg) {:result "good"}))
512+
thread (deref thread-p 100 nil)]
513+
(is (not (core-async-dispatch-thread? thread)))
514+
(is (instance? java.util.concurrent.ForkJoinWorkerThread thread)
515+
"completes on default ForkJoinPool executor")
516+
(server/shutdown server)))
517+
(testing "exceptional completion"
518+
(let [input-ch (async/chan 3)
519+
output-ch (async/chan 3)
520+
server (server/chan-server {:output-ch output-ch
521+
:input-ch input-ch})
522+
_ (server/start server nil)
523+
thread-p (-> (server/send-request server "req" {:body "foo"})
524+
(p/catch (fn [_] (Thread/currentThread))))
525+
client-rcvd-msg (h/assert-take output-ch)
526+
_ (async/put! input-ch
527+
(-> (lsp.responses/response (:id client-rcvd-msg))
528+
(lsp.responses/error {:code 1234
529+
:message "Something bad"
530+
:data {:body "foo"}})))
531+
thread (deref thread-p 100 nil)]
532+
(is (not (core-async-dispatch-thread? thread)))
533+
(is (instance? java.util.concurrent.ForkJoinWorkerThread thread)
534+
"completes on default ForkJoinPool executor")
535+
(server/shutdown server)))
536+
(testing "completion with :current-thread executor for legacy behavior"
537+
(let [input-ch (async/chan 3)
538+
output-ch (async/chan 3)
539+
server (server/chan-server {:output-ch output-ch
540+
:input-ch input-ch
541+
:response-executor :current-thread})
542+
_ (server/start server nil)
543+
thread-p (-> (server/send-request server "req" {:body "foo"})
544+
(p/then (fn [_] (Thread/currentThread))))
545+
client-rcvd-msg (h/assert-take output-ch)
546+
_ (async/put! input-ch (lsp.responses/response (:id client-rcvd-msg) {:result "good"}))
547+
thread (deref thread-p 100 nil)]
548+
(is (core-async-dispatch-thread? thread) "completes on core.async dispatch thread")
549+
(server/shutdown server))))
550+
484551
(def fixed-clock
485552
(-> (java.time.LocalDateTime/of 2022 03 05 13 35 23 0)
486553
(.toInstant java.time.ZoneOffset/UTC)

0 commit comments

Comments
 (0)