1
+ //<snippet1>
2
+ open System
3
+ open System.Collections .Concurrent
4
+ open System.Threading
5
+ open System.Threading .Tasks
6
+
7
+ module AddTakeDemo =
8
+ // Demonstrates:
9
+ // BlockingCollection<T>.Add()
10
+ // BlockingCollection<T>.Take()
11
+ // BlockingCollection<T>.CompleteAdding()
12
+ let blockingCollectionAddTakeCompleteAdding () =
13
+ task {
14
+ use bc = new BlockingCollection< int>()
15
+ // Spin up a Task to populate the BlockingCollection
16
+ let t1 =
17
+ task {
18
+ bc.Add 1
19
+ bc.Add 2
20
+ bc.Add 3
21
+ bc.CompleteAdding()
22
+ }
23
+
24
+ // Spin up a Task to consume the BlockingCollection
25
+ let t2 =
26
+ task {
27
+ try
28
+ // Consume consume the BlockingCollection
29
+ while true do
30
+ printfn $" {bc.Take()}"
31
+ with : ? InvalidOperationException ->
32
+ // An InvalidOperationException means that Take() was called on a completed collection
33
+ printfn " That's All!"
34
+ }
35
+ let! _ = Task.WhenAll( t1, t2)
36
+ ()
37
+ }
38
+
39
+ //<snippet2>
40
+ module TryTakeDemo =
41
+ // Demonstrates:
42
+ // BlockingCollection<T>.Add()
43
+ // BlockingCollection<T>.CompleteAdding()
44
+ // BlockingCollection<T>.TryTake()
45
+ // BlockingCollection<T>.IsCompleted
46
+ let blockingCollectionTryTake () =
47
+ // Construct and fill our BlockingCollection
48
+ use bc = new BlockingCollection< int>()
49
+ let NUMITEMS = 10000 ;
50
+ for i = 0 to NUMITEMS - 1 do
51
+ bc.Add i
52
+ bc.CompleteAdding()
53
+ let mutable outerSum = 0
54
+
55
+ // Delegate for consuming the BlockingCollection and adding up all items
56
+ let action =
57
+ Action( fun () ->
58
+ let mutable localItem = 0
59
+ let mutable localSum = 0
60
+
61
+ while bc.TryTake & localItem do
62
+ localSum <- localSum + localItem
63
+ Interlocked.Add(& outerSum, localSum)
64
+ |> ignore)
65
+
66
+ // Launch three parallel actions to consume the BlockingCollection
67
+ Parallel.Invoke( action, action, action)
68
+
69
+ printfn $" Sum[0..{NUMITEMS}) = {outerSum}, should be {((NUMITEMS * (NUMITEMS - 1)) / 2)}"
70
+ printfn $" bc.IsCompleted = {bc.IsCompleted} (should be true)"
71
+ //</snippet2>
72
+
73
+ //<snippet3>
74
+ module FromToAnyDemo =
75
+ // Demonstrates:
76
+ // Bounded BlockingCollection<T>
77
+ // BlockingCollection<T>.TryAddToAny()
78
+ // BlockingCollection<T>.TryTakeFromAny()
79
+ let blockingCollectionFromToAny () =
80
+ let bcs =
81
+ [|
82
+ new BlockingCollection< int>( 5 ) // collection bounded to 5 items
83
+ new BlockingCollection< int>( 5 ) // collection bounded to 5 items
84
+ |]
85
+ // Should be able to add 10 items w/o blocking
86
+ let mutable numFailures = 0 ;
87
+ for i = 0 to 9 do
88
+ if BlockingCollection< int>. TryAddToAny( bcs, i) = - 1 then
89
+ numFailures <- numFailures + 1
90
+ printfn $" TryAddToAny: {numFailures} failures (should be 0)"
91
+
92
+ // Should be able to retrieve 10 items
93
+ let mutable numItems = 0
94
+ let mutable item = 0
95
+ while BlockingCollection< int>. TryTakeFromAny( bcs, & item) <> - 1 do
96
+ numItems <- numItems + 1
97
+ printfn $" TryTakeFromAny: retrieved {numItems} items (should be 10)"
98
+ //</snippet3>
99
+
100
+ //<snippet4>
101
+ module ConsumingEnumerableDemo =
102
+ // Demonstrates:
103
+ // BlockingCollection<T>.Add()
104
+ // BlockingCollection<T>.CompleteAdding()
105
+ // BlockingCollection<T>.GetConsumingEnumerable()
106
+ let blockingCollectionGetConsumingEnumerable () =
107
+ task {
108
+ use bc = new BlockingCollection< int>()
109
+ // Kick off a producer task
110
+ let producerTask =
111
+ task {
112
+ for i = 0 to 9 do
113
+ bc.Add i
114
+ printfn $" Producing: {i}"
115
+
116
+ do ! Task.Delay 100 // sleep 100 ms between adds
117
+ // Need to do this to keep foreach below from hanging
118
+ bc.CompleteAdding()
119
+ }
120
+
121
+ // Now consume the blocking collection with foreach.
122
+ // Use bc.GetConsumingEnumerable() instead of just bc because the
123
+ // former will block waiting for completion and the latter will
124
+ // simply take a snapshot of the current state of the underlying collection.
125
+ for item in bc.GetConsumingEnumerable() do
126
+ printfn $" Consuming: {item}"
127
+ do ! producerTask // Allow task to complete cleanup
128
+ }
129
+ //</snippet4>
130
+
131
+ let main =
132
+ task {
133
+ do ! AddTakeDemo.blockingCollectionAddTakeCompleteAdding ()
134
+ TryTakeDemo.blockingCollectionTryTake ()
135
+ FromToAnyDemo.blockingCollectionFromToAny ()
136
+ do ! ConsumingEnumerableDemo.blockingCollectionGetConsumingEnumerable ()
137
+ printfn " Press any key to exit."
138
+ Console.ReadKey( true ) |> ignore
139
+ }
140
+ main.Wait()
141
+ //</snippet1>
0 commit comments