From de44109ba01bb5fc280c043c713c9df95f889fa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 2 Jul 2025 10:08:40 +0200 Subject: [PATCH] CQ: Retry opening write file when writing messages Followup to ff8ecf1cf7cfd22981668cbed374a5572560dd80 only this time it's for the index. (cherry picked from commit af8c0af4086ac9e02852d0b3d06cf5dcdb8b999f) --- .../src/rabbit_classic_queue_index_v2.erl | 4 ++- .../src/rabbit_classic_queue_store_v2.erl | 23 ++----------- deps/rabbit/src/rabbit_file.erl | 32 +++++++++++++++++++ 3 files changed, 38 insertions(+), 21 deletions(-) diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl index 3dc4d2f9bcc1..c0c812abf99c 100644 --- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl @@ -591,7 +591,9 @@ publish(MsgId, SeqId, Location, Props, IsPersistent, ShouldConfirm, TargetRamCou new_segment_file(Segment, SegmentEntryCount, State = #qi{ segments = Segments }) -> #qi{ fds = OpenFds } = reduce_fd_usage(Segment, State), false = maps:is_key(Segment, OpenFds), %% assert - {ok, Fd} = file:open(segment_file(Segment, State), [read, write, raw, binary]), + {ok, Fd} = rabbit_file:open_eventually( + segment_file(Segment, State), + [read, write, raw, binary]), %% We then write the segment file header. It contains %% some useful info and some reserved bytes for future use. %% We currently do not make use of this information. It is diff --git a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl index 8e8d0de92d8e..d3286da45532 100644 --- a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl @@ -194,25 +194,6 @@ maybe_flush_buffer(State = #qs{ write_buffer_size = WriteBufferSize }) -> false -> State end. -open_eventually(File, Modes) -> - open_eventually(File, Modes, 3). - -open_eventually(_, _, 0) -> - {error, eacces}; -open_eventually(File, Modes, N) -> - case file:open(File, Modes) of - OK = {ok, _} -> - OK; - %% When the current write file was recently deleted it - %% is possible on Windows to get an {error,eacces}. - %% Sometimes Windows sets the files to "DELETE PENDING" - %% state and delays deletion a bit. So we wait 10ms and - %% try again up to 3 times. - {error, eacces} -> - timer:sleep(10), - open_eventually(File, Modes, N - 1) - end. - flush_buffer(State = #qs{ write_buffer_size = 0 }, _) -> State; flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }, FsyncFun) -> @@ -223,7 +204,9 @@ flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }, FsyncFun) -> Writes = flush_buffer_build(WriteList, CheckCRC32, SegmentEntryCount), %% Then we do the writes for each segment. State = lists:foldl(fun({Segment, LocBytes}, FoldState) -> - {ok, Fd} = open_eventually(segment_file(Segment, FoldState), [read, write, raw, binary]), + {ok, Fd} = rabbit_file:open_eventually( + segment_file(Segment, FoldState), + [read, write, raw, binary]), case file:position(Fd, eof) of {ok, 0} -> %% We write the file header if it does not exist. diff --git a/deps/rabbit/src/rabbit_file.erl b/deps/rabbit/src/rabbit_file.erl index 8fbd663bbe7b..a054e8748763 100644 --- a/deps/rabbit/src/rabbit_file.erl +++ b/deps/rabbit/src/rabbit_file.erl @@ -17,6 +17,7 @@ -export([read_file_info/1]). -export([filename_as_a_directory/1]). -export([filename_to_binary/1, binary_to_filename/1]). +-export([open_eventually/2]). -define(TMP_EXT, ".tmp"). @@ -338,3 +339,34 @@ binary_to_filename(Bin) when is_binary(Bin) -> Other -> erlang:error(Other) end. + +%% On Windows the file may be in "DELETE PENDING" state following +%% its deletion (when the last message was acked). A subsequent +%% open may fail with an {error,eacces}. In that case we wait 10ms +%% and retry up to 3 times. + +-spec open_eventually(File, Modes) -> {ok, IoDevice} | {error, Reason} when + File :: Filename | iodata(), + Filename :: file:name_all(), + Modes :: [file:mode() | ram | directory], + IoDevice :: file:io_device(), + Reason :: file:posix() | badarg | system_limit. + +open_eventually(File, Modes) -> + open_eventually(File, Modes, 3). + +open_eventually(_, _, 0) -> + {error, eacces}; +open_eventually(File, Modes, N) -> + case file:open(File, Modes) of + OK = {ok, _} -> + OK; + %% When the current write file was recently deleted it + %% is possible on Windows to get an {error,eacces}. + %% Sometimes Windows sets the files to "DELETE PENDING" + %% state and delays deletion a bit. So we wait 10ms and + %% try again up to 3 times. + {error, eacces} -> + timer:sleep(10), + open_eventually(File, Modes, N - 1) + end.