22
22
doc_count ,
23
23
w ,
24
24
grouped_docs ,
25
- reply
25
+ reply ,
26
+ update_options ,
27
+ leaders = [],
28
+ started = []
26
29
}).
27
30
28
31
go (_ , [], _ ) ->
@@ -33,25 +36,25 @@ go(DbName, AllDocs0, Opts) ->
33
36
validate_atomic_update (DbName , AllDocs , lists :member (all_or_nothing , Opts )),
34
37
Options = lists :delete (all_or_nothing , Opts ),
35
38
GroupedDocs = lists :map (
36
- fun ({# shard {name = Name , node = Node } = Shard , Docs }) ->
37
- Docs1 = untag_docs (Docs ),
38
- Ref = rexi :cast (Node , {fabric_rpc , update_docs , [Name , Docs1 , Options ]}),
39
- {Shard # shard {ref = Ref }, Docs }
39
+ fun ({# shard {} = Shard , Docs }) ->
40
+ {Shard # shard {ref = make_ref ()}, Docs }
40
41
end ,
41
42
group_docs_by_shard (DbName , AllDocs )
42
43
),
43
44
{Workers , _ } = lists :unzip (GroupedDocs ),
44
45
RexiMon = fabric_util :create_monitors (Workers ),
45
46
W = couch_util :get_value (w , Options , integer_to_list (mem3 :quorum (DbName ))),
46
47
Acc0 = # acc {
48
+ update_options = Options ,
47
49
waiting_count = length (Workers ),
48
50
doc_count = length (AllDocs ),
49
51
w = list_to_integer (W ),
50
52
grouped_docs = GroupedDocs ,
51
53
reply = dict :new ()
52
54
},
53
55
Timeout = fabric_util :request_timeout (),
54
- try rexi_utils :recv (Workers , # shard .ref , fun handle_message /3 , Acc0 , infinity , Timeout ) of
56
+ Acc1 = start_leaders (Acc0 ),
57
+ try rexi_utils :recv (Workers , # shard .ref , fun handle_message /3 , Acc1 , infinity , Timeout ) of
55
58
{ok , {Health , Results }} when
56
59
Health =:= ok ; Health =:= accepted ; Health =:= error
57
60
->
@@ -72,61 +75,78 @@ go(DbName, AllDocs0, Opts) ->
72
75
rexi_monitor :stop (RexiMon )
73
76
end .
74
77
75
- handle_message ({rexi_DOWN , _ , {_ , NodeRef }, _ }, _Worker , # acc {} = Acc0 ) ->
78
+ handle_message ({rexi_DOWN , _ , {_ , NodeRef }, _ }, Worker , # acc {} = Acc0 ) ->
76
79
# acc {grouped_docs = GroupedDocs } = Acc0 ,
77
80
NewGrpDocs = [X || {# shard {node = N }, _ } = X <- GroupedDocs , N =/= NodeRef ],
78
- skip_message (Acc0 # acc {waiting_count = length (NewGrpDocs ), grouped_docs = NewGrpDocs });
81
+ Acc1 = Acc0 # acc {waiting_count = length (NewGrpDocs ), grouped_docs = NewGrpDocs },
82
+ Acc2 = start_followers (Worker , Acc1 ),
83
+ skip_message (Acc2 );
79
84
handle_message ({rexi_EXIT , _ }, Worker , # acc {} = Acc0 ) ->
80
85
# acc {waiting_count = WC , grouped_docs = GrpDocs } = Acc0 ,
81
86
NewGrpDocs = lists :keydelete (Worker , 1 , GrpDocs ),
82
- skip_message (Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs });
87
+ Acc1 = Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs },
88
+ Acc2 = start_followers (Worker , Acc1 ),
89
+ skip_message (Acc2 );
83
90
handle_message ({error , all_dbs_active }, Worker , # acc {} = Acc0 ) ->
84
91
% treat it like rexi_EXIT, the hope at least one copy will return successfully
85
92
# acc {waiting_count = WC , grouped_docs = GrpDocs } = Acc0 ,
86
93
NewGrpDocs = lists :keydelete (Worker , 1 , GrpDocs ),
87
- skip_message (Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs });
94
+ Acc1 = Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs },
95
+ Acc2 = start_followers (Worker , Acc1 ),
96
+ skip_message (Acc2 );
88
97
handle_message (internal_server_error , Worker , # acc {} = Acc0 ) ->
89
98
% happens when we fail to load validation functions in an RPC worker
90
99
# acc {waiting_count = WC , grouped_docs = GrpDocs } = Acc0 ,
91
100
NewGrpDocs = lists :keydelete (Worker , 1 , GrpDocs ),
92
- skip_message (Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs });
101
+ Acc1 = Acc0 # acc {waiting_count = WC - 1 , grouped_docs = NewGrpDocs },
102
+ Acc2 = start_followers (Worker , Acc1 ),
103
+ skip_message (Acc2 );
93
104
handle_message (attachment_chunk_received , _Worker , # acc {} = Acc0 ) ->
94
105
{ok , Acc0 };
95
106
handle_message ({ok , Replies }, Worker , # acc {} = Acc0 ) ->
96
107
# acc {
97
- waiting_count = WaitingCount ,
98
108
doc_count = DocCount ,
99
109
w = W ,
100
110
grouped_docs = GroupedDocs ,
101
111
reply = DocReplyDict0
102
112
} = Acc0 ,
103
- {value , {_ , Docs }, NewGrpDocs } = lists :keytake (Worker , 1 , GroupedDocs ),
104
- DocReplyDict = append_update_replies (Docs , Replies , DocReplyDict0 ),
105
- case {WaitingCount , dict :size (DocReplyDict )} of
106
- {1 , _ } ->
107
- % last message has arrived, we need to conclude things
108
- {Health , W , Reply } = dict :fold (
109
- fun force_reply /3 ,
110
- {ok , W , []},
111
- DocReplyDict
112
- ),
113
- {stop , {Health , Reply }};
114
- {_ , DocCount } ->
115
- % we've got at least one reply for each document, let's take a look
116
- case dict :fold (fun maybe_reply /3 , {stop , W , []}, DocReplyDict ) of
117
- continue ->
118
- {ok , Acc0 # acc {
119
- waiting_count = WaitingCount - 1 ,
120
- grouped_docs = NewGrpDocs ,
121
- reply = DocReplyDict
122
- }};
123
- {stop , W , FinalReplies } ->
124
- {stop , {ok , FinalReplies }}
125
- end ;
126
- _ ->
127
- {ok , Acc0 # acc {
128
- waiting_count = WaitingCount - 1 , grouped_docs = NewGrpDocs , reply = DocReplyDict
129
- }}
113
+ {value , {_ , Docs }, NewGrpDocs0 } = lists :keytake (Worker , 1 , GroupedDocs ),
114
+ IsLeader = lists :member (Worker # shard .ref , Acc0 # acc .leaders ),
115
+ DocReplyDict = append_update_replies (Docs , Replies , W , IsLeader , DocReplyDict0 ),
116
+ Acc1 = Acc0 # acc {grouped_docs = NewGrpDocs0 , reply = DocReplyDict },
117
+ Acc2 = remove_conflicts (Docs , Replies , Acc1 ),
118
+ NewGrpDocs = Acc2 # acc .grouped_docs ,
119
+ case skip_message (Acc2 ) of
120
+ {stop , Msg } ->
121
+ {stop , Msg };
122
+ {ok , Acc3 } ->
123
+ Acc4 = start_followers (Worker , Acc3 ),
124
+ case {Acc4 # acc .waiting_count , dict :size (DocReplyDict )} of
125
+ {1 , _ } ->
126
+ % last message has arrived, we need to conclude things
127
+ {Health , W , Reply } = dict :fold (
128
+ fun force_reply /3 ,
129
+ {ok , W , []},
130
+ DocReplyDict
131
+ ),
132
+ {stop , {Health , Reply }};
133
+ {_ , DocCount } ->
134
+ % we've got at least one reply for each document, let's take a look
135
+ case dict :fold (fun maybe_reply /3 , {stop , W , []}, DocReplyDict ) of
136
+ continue ->
137
+ {ok , Acc4 # acc {
138
+ waiting_count = Acc4 # acc .waiting_count - 1 ,
139
+ grouped_docs = NewGrpDocs
140
+ }};
141
+ {stop , W , FinalReplies } ->
142
+ {stop , {ok , FinalReplies }}
143
+ end ;
144
+ _ ->
145
+ {ok , Acc4 # acc {
146
+ waiting_count = Acc4 # acc .waiting_count - 1 ,
147
+ grouped_docs = NewGrpDocs
148
+ }}
149
+ end
130
150
end ;
131
151
handle_message ({missing_stub , Stub }, _ , _ ) ->
132
152
throw ({missing_stub , Stub });
@@ -318,13 +338,91 @@ group_docs_by_shard(DbName, Docs) ->
318
338
)
319
339
).
320
340
321
- append_update_replies ([], [], DocReplyDict ) ->
341
+ % % use 'lowest' node that hosts this shard range as leader
342
+ is_leader (Worker , Workers ) ->
343
+ Worker # shard .node ==
344
+ lists :min ([W # shard .node || W <- Workers , W # shard .range == Worker # shard .range ]).
345
+
346
+ start_leaders (# acc {} = Acc0 ) ->
347
+ # acc {grouped_docs = GroupedDocs } = Acc0 ,
348
+ {Workers , _ } = lists :unzip (GroupedDocs ),
349
+ LeaderRefs = lists :foldl (
350
+ fun ({Worker , Docs }, RefAcc ) ->
351
+ case is_leader (Worker , Workers ) of
352
+ true ->
353
+ start_worker (Worker , Docs , Acc0 ),
354
+ [Worker # shard .ref | RefAcc ];
355
+ false ->
356
+ RefAcc
357
+ end
358
+ end ,
359
+ [],
360
+ GroupedDocs
361
+ ),
362
+ Acc0 # acc {leaders = LeaderRefs , started = LeaderRefs }.
363
+
364
+ start_followers (# shard {} = Leader , # acc {} = Acc0 ) ->
365
+ Followers = [
366
+ {Worker , Docs }
367
+ || {Worker , Docs } <- Acc0 # acc .grouped_docs ,
368
+ Worker # shard .range == Leader # shard .range ,
369
+ not lists :member (Worker # shard .ref , Acc0 # acc .started )
370
+ ],
371
+ lists :foreach (
372
+ fun ({Worker , Docs }) ->
373
+ start_worker (Worker , Docs , Acc0 )
374
+ end ,
375
+ Followers
376
+ ),
377
+ Started = [Ref || {# shard {ref = Ref }, _Docs } <- Followers ],
378
+ Acc0 # acc {started = lists :append ([Started , Acc0 # acc .started ])}.
379
+
380
+ start_worker (# shard {ref = Ref } = Worker , Docs , # acc {} = Acc0 ) when is_reference (Ref ) ->
381
+ # shard {name = Name , node = Node } = Worker ,
382
+ # acc {update_options = UpdateOptions } = Acc0 ,
383
+ rexi :cast_ref (Ref , Node , {fabric_rpc , update_docs , [Name , untag_docs (Docs ), UpdateOptions ]}),
384
+ ok ;
385
+ start_worker (# shard {ref = undefined }, _Docs , # acc {}) ->
386
+ % for unit tests below.
387
+ ok .
388
+
389
+ append_update_replies ([], [], _W , _IsLeader , DocReplyDict ) ->
322
390
DocReplyDict ;
323
- append_update_replies ([Doc | Rest ], [], Dict0 ) ->
391
+ append_update_replies ([Doc | Rest ], [], W , IsLeader , Dict0 ) ->
324
392
% icky, if replicated_changes only errors show up in result
325
- append_update_replies (Rest , [], dict :append (Doc , noreply , Dict0 ));
326
- append_update_replies ([Doc | Rest1 ], [Reply | Rest2 ], Dict0 ) ->
327
- append_update_replies (Rest1 , Rest2 , dict :append (Doc , Reply , Dict0 )).
393
+ append_update_replies (Rest , [], W , IsLeader , dict :append (Doc , noreply , Dict0 ));
394
+ append_update_replies ([Doc | Rest1 ], [conflict | Rest2 ], W , true , Dict0 ) ->
395
+ % % fake conflict replies from followers as we won't ask them
396
+ append_update_replies (
397
+ Rest1 , Rest2 , W , true , dict :append_list (Doc , lists :duplicate (W , conflict ), Dict0 )
398
+ );
399
+ append_update_replies ([Doc | Rest1 ], [Reply | Rest2 ], W , IsLeader , Dict0 ) ->
400
+ append_update_replies (Rest1 , Rest2 , W , IsLeader , dict :append (Doc , Reply , Dict0 )).
401
+
402
+ % % leader found a conflict, remove that doc from the other (follower) workers,
403
+ % % removing the worker entirely if no docs remain.
404
+ remove_conflicts ([], [], # acc {} = Acc0 ) ->
405
+ Acc0 ;
406
+ remove_conflicts ([Doc | DocRest ], [conflict | ReplyRest ], # acc {} = Acc0 ) ->
407
+ # acc {grouped_docs = GroupedDocs0 } = Acc0 ,
408
+ GroupedDocs1 = lists :foldl (
409
+ fun ({Worker , Docs }, FoldAcc ) ->
410
+ case lists :delete (Doc , Docs ) of
411
+ [] ->
412
+ FoldAcc # acc {waiting_count = FoldAcc # acc .waiting_count - 1 };
413
+ Rest ->
414
+ [{Worker , Rest } | FoldAcc ]
415
+ end
416
+ end ,
417
+ [],
418
+ GroupedDocs0
419
+ ),
420
+ Acc1 = Acc0 # acc {grouped_docs = GroupedDocs1 },
421
+ remove_conflicts (DocRest , ReplyRest , Acc1 );
422
+ remove_conflicts ([_Doc | DocRest ], [_Reply | ReplyRest ], # acc {} = Acc0 ) ->
423
+ remove_conflicts (DocRest , ReplyRest , Acc0 );
424
+ remove_conflicts ([_Doc | DocRest ], [], # acc {} = Acc0 ) ->
425
+ remove_conflicts (DocRest , [], Acc0 ).
328
426
329
427
skip_message (# acc {waiting_count = 0 , w = W , reply = DocReplyDict }) ->
330
428
{Health , W , Reply } = dict :fold (fun force_reply /3 , {ok , W , []}, DocReplyDict ),
0 commit comments