@@ -72,7 +72,6 @@ is distributed under the [ISC license](LICENSE.md).
72
72
- [ A three-stack lock-free queue] ( #a-three-stack-lock-free-queue )
73
73
- [ A rehashable lock-free hash table] ( #a-rehashable-lock-free-hash-table )
74
74
- [ Beware of torn reads] ( #beware-of-torn-reads )
75
- - [ Scheduler interop] ( #scheduler-interop )
76
75
- [ Development] ( #development )
77
76
78
77
## A quick tour
@@ -2086,187 +2085,6 @@ Notice that above we only validated the access of `a`, because we know that `a`
2086
2085
and ` b ` are always updated atomically and we read ` b ` after reading ` a ` . In this
2087
2086
case that is enough to ensure that read skew is not possible.
2088
2087
2089
- ## Scheduler interop
2090
-
2091
- The blocking mechanism in ** kcas** is based on a
2092
- [ _ domain local await_ ] ( https://github.com/ocaml-multicore/domain-local-await )
2093
- mechanism that schedulers can choose to implement to allow libraries like
2094
- ** kcas** to work with them.
2095
-
2096
- Implementing schedulers is not really what casual users of ** kcas** are supposed
2097
- to do. Below is an example of a _ toy_ scheduler whose purpose is only to give a
2098
- sketch of how a scheduler can provide the domain local await mechanism.
2099
-
2100
- Let's also demonstrate the use of the
2101
- [ ` Queue ` ] ( https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/Queue/index.html ) ,
2102
- [ ` Stack ` ] ( https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/Stack/index.html ) ,
2103
- and
2104
- [ ` Promise ` ] ( https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/Promise/index.html )
2105
- implementations that are conveniently provided by the
2106
- [ ** kcas_data** ] ( https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/index.html )
2107
- package.
2108
-
2109
- Here is the full toy scheduler module:
2110
-
2111
- ``` ocaml
2112
- module Scheduler : sig
2113
- type t
2114
- val spawn : unit -> t
2115
- val join : t -> unit
2116
- val fiber : t -> (unit -> 'a) -> 'a Promise.t
2117
- end = struct
2118
- open Effect.Deep
2119
-
2120
- type _ Effect.t +=
2121
- | Suspend : (('a, unit) continuation -> unit) -> 'a Effect.t
2122
-
2123
- type t = {
2124
- queue: (unit -> unit) Queue.t;
2125
- domain: unit Domain.t
2126
- }
2127
-
2128
- let spawn () =
2129
- let queue = Queue.create () in
2130
- let rec scheduler work =
2131
- let effc (type a) : a Effect.t -> _ = function
2132
- | Suspend ef -> Some ef
2133
- | _ -> None in
2134
- try_with work () { effc };
2135
- match Queue.take_opt queue with
2136
- | Some work -> scheduler work
2137
- | None -> () in
2138
- let prepare_for_await _ =
2139
- let state = Atomic.make `Init in
2140
- let release () =
2141
- if Atomic.get state != `Released then
2142
- match Atomic.exchange state `Released with
2143
- | `Awaiting k ->
2144
- Queue.add (continue k) queue
2145
- | _ -> () in
2146
- let await () =
2147
- if Atomic.get state != `Released then
2148
- Effect.perform @@ Suspend (fun k ->
2149
- if not (Atomic.compare_and_set state `Init
2150
- (`Awaiting k)) then
2151
- continue k ())
2152
- in
2153
- Domain_local_await.{ release; await } in
2154
- let domain = Domain.spawn @@ fun () ->
2155
- try
2156
- while true do
2157
- let work = Queue.take_blocking queue in
2158
- Domain_local_await.using
2159
- ~prepare_for_await
2160
- ~while_running:(fun () -> scheduler work)
2161
- done
2162
- with Exit -> () in
2163
- { queue; domain }
2164
-
2165
- let join t =
2166
- Queue.add (fun () -> raise Exit) t.queue;
2167
- Domain.join t.domain
2168
-
2169
- let fiber t thunk =
2170
- let (promise, resolver) = Promise.create () in
2171
- Queue.add
2172
- (fun () -> Promise.resolve resolver (thunk ()))
2173
- t.queue;
2174
- promise
2175
- end
2176
- ```
2177
-
2178
- The idea is that one can spawn a scheduler to run on a new domain. Then one can
2179
- run fibers on the scheduler. Because the scheduler provides the domain local
2180
- await mechanism libraries like ** kcas** can use it to block in a scheduler
2181
- independent and friendly manner.
2182
-
2183
- Let's then demonstate the integration. To start we spawn a scheduler:
2184
-
2185
- ``` ocaml
2186
- # let scheduler = Scheduler.spawn ()
2187
- val scheduler : Scheduler.t = <abstr>
2188
- ```
2189
-
2190
- The scheduler is now eagerly awaiting for fibers to run. Let's give it a couple
2191
- of them, but, let's first create a queue and a stack to communicate with the
2192
- fibers:
2193
-
2194
- ``` ocaml
2195
- # let in_queue : int Queue.t = Queue.create ()
2196
- val in_queue : int Kcas_data.Queue.t = <abstr>
2197
-
2198
- # let out_stack : int Stack.t = Stack.create ()
2199
- val out_stack : int Kcas_data.Stack.t = <abstr>
2200
- ```
2201
-
2202
- The first fiber we create just copies elements from the ` in_queue ` to the
2203
- ` out_stack ` :
2204
-
2205
- ``` ocaml
2206
- # ignore @@ Scheduler.fiber scheduler @@ fun () ->
2207
- while true do
2208
- let elem = Queue.take_blocking in_queue in
2209
- Printf.printf "Giving %d...\n%!" elem;
2210
- Stack.push elem out_stack
2211
- done
2212
- - : unit = ()
2213
- ```
2214
-
2215
- The second fiber awaits to take two elements from the ` out_stack ` , updates a
2216
- state in between, and then returns their sum:
2217
-
2218
- ``` ocaml
2219
- # let state = Loc.make 0
2220
- val state : int Loc.t = <abstr>
2221
-
2222
- # let sync_to target =
2223
- state
2224
- |> Loc.get_as @@ fun current ->
2225
- Retry.unless (target <= current)
2226
- val sync_to : int -> unit = <fun>
2227
-
2228
- # let a_promise = Scheduler.fiber scheduler @@ fun () ->
2229
- let x = Stack.pop_blocking out_stack in
2230
- Printf.printf "First you gave me %d.\n%!" x;
2231
- Loc.set state 1;
2232
- let y = Stack.pop_blocking out_stack in
2233
- Printf.printf "Then you gave me %d.\n%!" y;
2234
- Loc.set state 2;
2235
- x + y
2236
- val a_promise : int Promise.t = <abstr>
2237
- ```
2238
-
2239
- To interact with the fibers, we add some elements to the ` in_queue ` :
2240
-
2241
- ``` ocaml
2242
- # Queue.add 14 in_queue; sync_to 1
2243
- Giving 14...
2244
- First you gave me 14.
2245
- - : unit = ()
2246
-
2247
- # Queue.add 28 in_queue; sync_to 2
2248
- Giving 28...
2249
- Then you gave me 28.
2250
- - : unit = ()
2251
-
2252
- # Promise.await a_promise
2253
- - : int = 42
2254
- ```
2255
-
2256
- As can be seen above, the scheduler multiplexes the domain among the fibers.
2257
- Notice that thanks to the domain local await mechanism we could just perform
2258
- blocking operations without thinking about the schedulers. Communication between
2259
- the main domain, the scheduler domain, and the fibers _ just works_ ™.
2260
-
2261
- Time to close the shop.
2262
-
2263
- ``` ocaml
2264
- # Scheduler.join scheduler
2265
- - : unit = ()
2266
- ```
2267
-
2268
- _ That's all Folks!_
2269
-
2270
2088
## Development
2271
2089
2272
2090
### Formatting
0 commit comments