@@ -96,40 +96,47 @@ handle_message(attachment_chunk_received, _Worker, #acc{} = Acc0) ->
96
96
{ok , Acc0 };
97
97
handle_message ({ok , Replies }, Worker , # acc {} = Acc0 ) ->
98
98
# acc {
99
- waiting_count = WaitingCount ,
100
99
len_docs = DocCount ,
101
100
w = W ,
102
101
grouped_docs = GroupedDocs ,
103
102
reply = DocReplyDict0
104
103
} = Acc0 ,
105
- {value , {_ , Docs }, NewGrpDocs } = lists :keytake (Worker , 1 , GroupedDocs ),
106
- Acc1 = start_followers (Worker , Acc0 ),
107
- DocReplyDict = append_update_replies (Docs , Replies , DocReplyDict0 ),
108
- case {WaitingCount , dict :size (DocReplyDict )} of
109
- {1 , _ } ->
110
- % last message has arrived, we need to conclude things
111
- {Health , W , Reply } = dict :fold (
112
- fun force_reply /3 ,
113
- {ok , W , []},
114
- DocReplyDict
115
- ),
116
- {stop , {Health , Reply }};
117
- {_ , DocCount } ->
118
- % we've got at least one reply for each document, let's take a look
119
- case dict :fold (fun maybe_reply /3 , {stop , W , []}, DocReplyDict ) of
120
- continue ->
121
- {ok , Acc1 # acc {
122
- waiting_count = WaitingCount - 1 ,
123
- grouped_docs = NewGrpDocs ,
124
- reply = DocReplyDict
125
- }};
126
- {stop , W , FinalReplies } ->
127
- {stop , {ok , FinalReplies }}
128
- end ;
129
- _ ->
130
- {ok , Acc1 # acc {
131
- waiting_count = WaitingCount - 1 , grouped_docs = NewGrpDocs , reply = DocReplyDict
132
- }}
104
+ {value , {_ , Docs }, NewGrpDocs0 } = lists :keytake (Worker , 1 , GroupedDocs ),
105
+ DocReplyDict = append_update_replies (Docs , Replies , W , DocReplyDict0 ),
106
+ Acc1 = Acc0 # acc {grouped_docs = NewGrpDocs0 , reply = DocReplyDict },
107
+ Acc2 = remove_conflicts (Docs , Replies , Acc1 ),
108
+ NewGrpDocs = Acc2 # acc .grouped_docs ,
109
+ case skip_message (Acc2 ) of
110
+ {stop , Msg } ->
111
+ {stop , Msg };
112
+ {ok , Acc3 } ->
113
+ Acc4 = start_followers (Worker , Acc3 ),
114
+ case {Acc4 # acc .waiting_count , dict :size (DocReplyDict )} of
115
+ {1 , _ } ->
116
+ % last message has arrived, we need to conclude things
117
+ {Health , W , Reply } = dict :fold (
118
+ fun force_reply /3 ,
119
+ {ok , W , []},
120
+ DocReplyDict
121
+ ),
122
+ {stop , {Health , Reply }};
123
+ {_ , DocCount } ->
124
+ % we've got at least one reply for each document, let's take a look
125
+ case dict :fold (fun maybe_reply /3 , {stop , W , []}, DocReplyDict ) of
126
+ continue ->
127
+ {ok , Acc4 # acc {
128
+ waiting_count = Acc4 # acc .waiting_count - 1 ,
129
+ grouped_docs = NewGrpDocs
130
+ }};
131
+ {stop , W , FinalReplies } ->
132
+ {stop , {ok , FinalReplies }}
133
+ end ;
134
+ _ ->
135
+ {ok , Acc4 # acc {
136
+ waiting_count = Acc4 # acc .waiting_count - 1 ,
137
+ grouped_docs = NewGrpDocs
138
+ }}
139
+ end
133
140
end ;
134
141
handle_message ({missing_stub , Stub }, _ , _ ) ->
135
142
throw ({missing_stub , Stub });
@@ -367,13 +374,41 @@ start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc0) when is_reference(
367
374
start_worker (# shard {ref = undefined }, _Docs , # acc {}) ->
368
375
ok .
369
376
370
- append_update_replies ([], [], DocReplyDict ) ->
377
+ append_update_replies ([], [], _W , DocReplyDict ) ->
371
378
DocReplyDict ;
372
- append_update_replies ([Doc | Rest ], [], Dict0 ) ->
379
+ append_update_replies ([Doc | Rest ], [], W , Dict0 ) ->
373
380
% icky, if replicated_changes only errors show up in result
374
- append_update_replies (Rest , [], dict :append (Doc , noreply , Dict0 ));
375
- append_update_replies ([Doc | Rest1 ], [Reply | Rest2 ], Dict0 ) ->
376
- append_update_replies (Rest1 , Rest2 , dict :append (Doc , Reply , Dict0 )).
381
+ append_update_replies (Rest , [], W , dict :append (Doc , noreply , Dict0 ));
382
+ append_update_replies ([Doc | Rest1 ], [conflict | Rest2 ], W , Dict0 ) ->
383
+ % % fake conflict replies from followers as we won't ask them
384
+ append_update_replies (
385
+ Rest1 , Rest2 , W , dict :append_list (Doc , lists :duplicate (W , conflict ), Dict0 )
386
+ );
387
+ append_update_replies ([Doc | Rest1 ], [Reply | Rest2 ], W , Dict0 ) ->
388
+ append_update_replies (Rest1 , Rest2 , W , dict :append (Doc , Reply , Dict0 )).
389
+
390
+ % % leader found a conflict, remove that doc from the other (follower) workers,
391
+ % % removing the worker entirely if no docs remain.
392
+ remove_conflicts ([], [], # acc {} = Acc0 ) ->
393
+ Acc0 ;
394
+ remove_conflicts ([Doc | DocRest ], [conflict | ReplyRest ], # acc {} = Acc0 ) ->
395
+ # acc {grouped_docs = GroupedDocs0 } = Acc0 ,
396
+ GroupedDocs1 = lists :foldl (
397
+ fun ({Worker , Docs }, FoldAcc ) ->
398
+ case lists :delete (Doc , Docs ) of
399
+ [] ->
400
+ FoldAcc ;
401
+ Rest ->
402
+ [{Worker , Rest } | FoldAcc ]
403
+ end
404
+ end ,
405
+ [],
406
+ GroupedDocs0
407
+ ),
408
+ Acc1 = Acc0 # acc {waiting_count = length (GroupedDocs1 ), grouped_docs = GroupedDocs1 },
409
+ remove_conflicts (DocRest , ReplyRest , Acc1 );
410
+ remove_conflicts ([_Doc | DocRest ], [_Reply | ReplyRest ], # acc {} = Acc0 ) ->
411
+ remove_conflicts (DocRest , ReplyRest , Acc0 ).
377
412
378
413
skip_message (# acc {waiting_count = 0 , w = W , reply = DocReplyDict }) ->
379
414
{Health , W , Reply } = dict :fold (fun force_reply /3 , {ok , W , []}, DocReplyDict ),
0 commit comments