22
22
doc_count ,
23
23
w ,
24
24
grouped_docs ,
25
- reply
25
+ reply ,
26
+ update_options ,
27
+ started = []
26
28
}).
27
29
28
30
go (_ , [], _ ) ->
@@ -33,25 +35,25 @@ go(DbName, AllDocs0, Opts) ->
33
35
validate_atomic_update (DbName , AllDocs , lists :member (all_or_nothing , Opts )),
34
36
Options = lists :delete (all_or_nothing , Opts ),
35
37
GroupedDocs = lists :map (
36
- fun ({# shard {name = Name , node = Node } = Shard , Docs }) ->
37
- Docs1 = untag_docs (Docs ),
38
- Ref = rexi :cast (Node , {fabric_rpc , update_docs , [Name , Docs1 , Options ]}),
39
- {Shard # shard {ref = Ref }, Docs }
38
+ fun ({# shard {} = Shard , Docs }) ->
39
+ {Shard # shard {ref = make_ref ()}, Docs }
40
40
end ,
41
41
group_docs_by_shard (DbName , AllDocs )
42
42
),
43
43
{Workers , _ } = lists :unzip (GroupedDocs ),
44
44
RexiMon = fabric_util :create_monitors (Workers ),
45
45
W = couch_util :get_value (w , Options , integer_to_list (mem3 :quorum (DbName ))),
46
46
Acc0 = # acc {
47
+ update_options = Options ,
47
48
waiting_count = length (Workers ),
48
49
doc_count = length (AllDocs ),
49
50
w = list_to_integer (W ),
50
51
grouped_docs = GroupedDocs ,
51
52
reply = dict :new ()
52
53
},
53
54
Timeout = fabric_util :request_timeout (),
54
- try rexi_utils :recv (Workers , # shard .ref , fun handle_message /3 , Acc0 , infinity , Timeout ) of
55
+ Acc1 = start_leaders (Acc0 ),
56
+ try rexi_utils :recv (Workers , # shard .ref , fun handle_message /3 , Acc1 , infinity , Timeout ) of
55
57
{ok , {Health , Results }} when
56
58
Health =:= ok ; Health =:= accepted ; Health =:= error
57
59
->
@@ -72,24 +74,32 @@ go(DbName, AllDocs0, Opts) ->
72
74
rexi_monitor :stop (RexiMon )
73
75
end .
74
76
75
- handle_message ({rexi_DOWN , _ , {_ , NodeRef }, _ }, _Worker , # acc {} = Acc0 ) ->
77
+ handle_message ({rexi_DOWN , _ , {_ , NodeRef }, _ }, Worker , # acc {} = Acc0 ) ->
76
78
# acc {grouped_docs = GroupedDocs } = Acc0 ,
77
79
NewGrpDocs = [X || {# shard {node = N }, _ } = X <- GroupedDocs , N =/= NodeRef ],
78
- skip_message (Acc0 # acc {waiting_count = length (NewGrpDocs ), grouped_docs = NewGrpDocs });
80
+ Acc1 = Acc0 # acc {waiting_count = length (NewGrpDocs ), grouped_docs = NewGrpDocs },
81
+ Acc2 = start_followers (Worker , Acc1 ),
82
+ skip_message (Acc2 );
79
83
handle_message ({rexi_EXIT , _ }, Worker , # acc {} = Acc0 ) ->
80
84
# acc {waiting_count = WC , grouped_docs = GrpDocs } = Acc0 ,
81
85
NewGrpDocs = lists :keydelete (Worker , 1 , GrpDocs ),
82
- skip_message (Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs });
86
+ Acc1 = Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs },
87
+ Acc2 = start_followers (Worker , Acc1 ),
88
+ skip_message (Acc2 );
83
89
handle_message ({error , all_dbs_active }, Worker , # acc {} = Acc0 ) ->
84
90
% treat it like rexi_EXIT, the hope at least one copy will return successfully
85
91
# acc {waiting_count = WC , grouped_docs = GrpDocs } = Acc0 ,
86
92
NewGrpDocs = lists :keydelete (Worker , 1 , GrpDocs ),
87
- skip_message (Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs });
93
+ Acc1 = Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs },
94
+ Acc2 = start_followers (Worker , Acc1 ),
95
+ skip_message (Acc2 );
88
96
handle_message (internal_server_error , Worker , # acc {} = Acc0 ) ->
89
97
% happens when we fail to load validation functions in an RPC worker
90
98
# acc {waiting_count = WC , grouped_docs = GrpDocs } = Acc0 ,
91
99
NewGrpDocs = lists :keydelete (Worker , 1 , GrpDocs ),
92
- skip_message (Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs });
100
+ Acc1 = Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs },
101
+ Acc2 = start_followers (Worker , Acc1 ),
102
+ skip_message (Acc2 );
93
103
handle_message (attachment_chunk_received , _Worker , # acc {} = Acc0 ) ->
94
104
{ok , Acc0 };
95
105
handle_message ({ok , Replies }, Worker , # acc {} = Acc0 ) ->
@@ -100,33 +110,42 @@ handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
100
110
grouped_docs = GroupedDocs ,
101
111
reply = DocReplyDict0
102
112
} = Acc0 ,
103
- {value , {_ , Docs }, NewGrpDocs } = lists :keytake (Worker , 1 , GroupedDocs ),
104
- DocReplyDict = append_update_replies (Docs , Replies , DocReplyDict0 ),
105
- case {WaitingCount , dict :size (DocReplyDict )} of
106
- {1 , _ } ->
107
- % last message has arrived, we need to conclude things
108
- {Health , W , Reply } = dict :fold (
109
- fun force_reply /3 ,
110
- {ok , W , []},
111
- DocReplyDict
112
- ),
113
- {stop , {Health , Reply }};
114
- {_ , DocCount } ->
115
- % we've got at least one reply for each document, let's take a look
116
- case dict :fold (fun maybe_reply /3 , {stop , W , []}, DocReplyDict ) of
117
- continue ->
118
- {ok , Acc0 # acc {
119
- waiting_count = WaitingCount - 1 ,
120
- grouped_docs = NewGrpDocs ,
121
- reply = DocReplyDict
122
- }};
123
- {stop , W , FinalReplies } ->
124
- {stop , {ok , FinalReplies }}
125
- end ;
126
- _ ->
127
- {ok , Acc0 # acc {
128
- waiting_count = WaitingCount - 1 , grouped_docs = NewGrpDocs , reply = DocReplyDict
129
- }}
113
+ {value , {_ , Docs }, NewGrpDocs0 } = lists :keytake (Worker , 1 , GroupedDocs ),
114
+ DocReplyDict = append_update_replies (Docs , Replies , W , DocReplyDict0 ),
115
+ Acc1 = Acc0 # acc {grouped_docs = NewGrpDocs0 , reply = DocReplyDict },
116
+ Acc2 = remove_conflicts (Docs , Replies , Acc1 ),
117
+ NewGrpDocs = Acc2 # acc .grouped_docs ,
118
+ case skip_message (Acc2 ) of
119
+ {stop , Msg } ->
120
+ {stop , Msg };
121
+ {ok , Acc3 } ->
122
+ Acc4 = start_followers (Worker , Acc3 ),
123
+ case {Acc4 # acc .waiting_count , dict :size (DocReplyDict )} of
124
+ {1 , _ } ->
125
+ % last message has arrived, we need to conclude things
126
+ {Health , W , Reply } = dict :fold (
127
+ fun force_reply /3 ,
128
+ {ok , W , []},
129
+ DocReplyDict
130
+ ),
131
+ {stop , {Health , Reply }};
132
+ {_ , DocCount } ->
133
+ % we've got at least one reply for each document, let's take a look
134
+ case dict :fold (fun maybe_reply /3 , {stop , W , []}, DocReplyDict ) of
135
+ continue ->
136
+ {ok , Acc4 # acc {
137
+ waiting_count = Acc4 # acc .waiting_count - 1 ,
138
+ grouped_docs = NewGrpDocs
139
+ }};
140
+ {stop , W , FinalReplies } ->
141
+ {stop , {ok , FinalReplies }}
142
+ end ;
143
+ _ ->
144
+ {ok , Acc4 # acc {
145
+ waiting_count = Acc4 # acc .waiting_count - 1 ,
146
+ grouped_docs = NewGrpDocs
147
+ }}
148
+ end
130
149
end ;
131
150
handle_message ({missing_stub , Stub }, _ , _ ) ->
132
151
throw ({missing_stub , Stub });
@@ -318,13 +337,90 @@ group_docs_by_shard(DbName, Docs) ->
318
337
)
319
338
).
320
339
321
- append_update_replies ([], [], DocReplyDict ) ->
340
+ % % use 'lowest' node that hosts this shard range as leader
341
+ is_leader (Worker , Workers ) ->
342
+ Worker == lists :min ([W || W <- Workers , W # shard .range == Worker # shard .range ]).
343
+
344
+ start_leaders (# acc {} = Acc0 ) ->
345
+ # acc {grouped_docs = GroupedDocs } = Acc0 ,
346
+ {Workers , _ } = lists :unzip (GroupedDocs ),
347
+ Started = lists :foldl (
348
+ fun ({Worker , Docs }, RefAcc ) ->
349
+ case is_leader (Worker , Workers ) of
350
+ true ->
351
+ start_worker (Worker , Docs , Acc0 ),
352
+ [Worker # shard .ref | RefAcc ];
353
+ false ->
354
+ RefAcc
355
+ end
356
+ end ,
357
+ [],
358
+ GroupedDocs
359
+ ),
360
+ Acc0 # acc {started = lists :append ([Started , Acc0 # acc .started ])}.
361
+
362
+ start_followers (# shard {} = Leader , # acc {} = Acc0 ) ->
363
+ Followers = [
364
+ {Worker , Docs }
365
+ || {Worker , Docs } <- Acc0 # acc .grouped_docs ,
366
+ Worker # shard .range == Leader # shard .range ,
367
+ not lists :member (Worker # shard .ref , Acc0 # acc .started )
368
+ ],
369
+ lists :foreach (
370
+ fun ({Worker , Docs }) ->
371
+ start_worker (Worker , Docs , Acc0 )
372
+ end ,
373
+ Followers
374
+ ),
375
+ Started = [Ref || {# shard {ref = Ref }, _Docs } <- Followers ],
376
+ Acc0 # acc {started = lists :append ([Started , Acc0 # acc .started ])}.
377
+
378
+ start_worker (# shard {ref = Ref } = Worker , Docs , # acc {} = Acc0 ) when is_reference (Ref ) ->
379
+ # shard {name = Name , node = Node } = Worker ,
380
+ # acc {update_options = UpdateOptions } = Acc0 ,
381
+ rexi :cast_ref (Ref , Node , {fabric_rpc , update_docs , [Name , untag_docs (Docs ), UpdateOptions ]}),
382
+ ok ;
383
+ start_worker (# shard {ref = undefined }, _Docs , # acc {}) ->
384
+ % for unit tests below.
385
+ ok .
386
+
387
+ append_update_replies ([], [], _W , DocReplyDict ) ->
322
388
DocReplyDict ;
323
- append_update_replies ([Doc | Rest ], [], Dict0 ) ->
389
+ append_update_replies ([Doc | Rest ], [], W , Dict0 ) ->
324
390
% icky, if replicated_changes only errors show up in result
325
- append_update_replies (Rest , [], dict :append (Doc , noreply , Dict0 ));
326
- append_update_replies ([Doc | Rest1 ], [Reply | Rest2 ], Dict0 ) ->
327
- append_update_replies (Rest1 , Rest2 , dict :append (Doc , Reply , Dict0 )).
391
+ append_update_replies (Rest , [], W , dict :append (Doc , noreply , Dict0 ));
392
+ append_update_replies ([Doc | Rest1 ], [conflict | Rest2 ], W , Dict0 ) ->
393
+ % % fake conflict replies from followers as we won't ask them
394
+ append_update_replies (
395
+ Rest1 , Rest2 , W , dict :append_list (Doc , lists :duplicate (W , conflict ), Dict0 )
396
+ );
397
+ append_update_replies ([Doc | Rest1 ], [Reply | Rest2 ], W , Dict0 ) ->
398
+ append_update_replies (Rest1 , Rest2 , W , dict :append (Doc , Reply , Dict0 )).
399
+
400
+ % % leader found a conflict, remove that doc from the other (follower) workers,
401
+ % % removing the worker entirely if no docs remain.
402
+ remove_conflicts ([], [], # acc {} = Acc0 ) ->
403
+ Acc0 ;
404
+ remove_conflicts ([Doc | DocRest ], [conflict | ReplyRest ], # acc {} = Acc0 ) ->
405
+ # acc {grouped_docs = GroupedDocs0 } = Acc0 ,
406
+ GroupedDocs1 = lists :foldl (
407
+ fun ({Worker , Docs }, FoldAcc ) ->
408
+ case lists :delete (Doc , Docs ) of
409
+ [] ->
410
+ FoldAcc ;
411
+ Rest ->
412
+ [{Worker , Rest } | FoldAcc ]
413
+ end
414
+ end ,
415
+ [],
416
+ GroupedDocs0
417
+ ),
418
+ Acc1 = Acc0 # acc {waiting_count = length (GroupedDocs1 ), grouped_docs = GroupedDocs1 },
419
+ remove_conflicts (DocRest , ReplyRest , Acc1 );
420
+ remove_conflicts ([_Doc | DocRest ], [_Reply | ReplyRest ], # acc {} = Acc0 ) ->
421
+ remove_conflicts (DocRest , ReplyRest , Acc0 );
422
+ remove_conflicts ([_Doc | DocRest ], [], # acc {} = Acc0 ) ->
423
+ remove_conflicts (DocRest , [], Acc0 ).
328
424
329
425
skip_message (# acc {waiting_count = 0 , w = W , reply = DocReplyDict }) ->
330
426
{Health , W , Reply } = dict :fold (fun force_reply /3 , {ok , W , []}, DocReplyDict ),
0 commit comments