Skip to content

Commit c886693

Browse files
Merge pull request #13574 from rabbitmq/amqp-client-incoming-window
Auto widen session incoming-window in AMQP 1.0 client
2 parents e93afc5 + 32854e8 commit c886693

File tree

4 files changed

+223
-57
lines changed

4 files changed

+223
-57
lines changed

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ flow_link_credit(#link_ref{role = receiver, session = Session,
339339
RenewWhenBelow =< Credit) ->
340340
Flow = #'v1_0.flow'{link_credit = {uint, Credit},
341341
drain = Drain},
342-
ok = amqp10_client_session:flow(Session, Handle, Flow, RenewWhenBelow).
342+
ok = amqp10_client_session:flow_link(Session, Handle, Flow, RenewWhenBelow).
343343

344344
%% @doc Stop a receiving link.
345345
%% See AMQP 1.0 spec §2.6.10.
@@ -348,7 +348,7 @@ stop_receiver_link(#link_ref{role = receiver,
348348
link_handle = Handle}) ->
349349
Flow = #'v1_0.flow'{link_credit = {uint, 0},
350350
echo = true},
351-
ok = amqp10_client_session:flow(Session, Handle, Flow, never).
351+
ok = amqp10_client_session:flow_link(Session, Handle, Flow, never).
352352

353353
%%% messages
354354

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 90 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@
2020
attach/2,
2121
detach/2,
2222
transfer/3,
23-
flow/4,
24-
disposition/5
23+
disposition/5,
24+
flow_link/4
2525
]).
2626

27+
%% Manual session flow control is currently only used in tests.
28+
-export([flow/3]).
29+
2730
%% Private API
2831
-export([start_link/4,
2932
socket_ready/2
@@ -51,7 +54,8 @@
5154
[add/2,
5255
diff/2]).
5356

54-
-define(MAX_SESSION_WINDOW_SIZE, 65535).
57+
%% By default, we want to keep the server's remote-incoming-window large at all times.
58+
-define(DEFAULT_MAX_INCOMING_WINDOW, 100_000).
5559
-define(UINT_OUTGOING_WINDOW, {uint, ?UINT_MAX}).
5660
-define(INITIAL_OUTGOING_DELIVERY_ID, ?UINT_MAX).
5761
%% "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
@@ -129,7 +133,8 @@
129133
available = 0 :: non_neg_integer(),
130134
drain = false :: boolean(),
131135
partial_transfers :: undefined | {#'v1_0.transfer'{}, [binary()]},
132-
auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()},
136+
auto_flow :: never | {RenewWhenBelow :: pos_integer(),
137+
Credit :: pos_integer()},
133138
incoming_unsettled = #{} :: #{delivery_number() => ok},
134139
footer_opt :: footer_opt() | undefined
135140
}).
@@ -140,7 +145,10 @@
140145

141146
%% session flow control, see section 2.5.6
142147
next_incoming_id :: transfer_number() | undefined,
143-
incoming_window = ?MAX_SESSION_WINDOW_SIZE :: non_neg_integer(),
148+
%% Can become negative if the peer overshoots our window.
149+
incoming_window :: integer(),
150+
auto_flow :: never | {RenewWhenBelow :: pos_integer(),
151+
NewWindowSize :: pos_integer()},
144152
next_outgoing_id = ?INITIAL_OUTGOING_TRANSFER_ID :: transfer_number(),
145153
remote_incoming_window = 0 :: non_neg_integer(),
146154
remote_outgoing_window = 0 :: non_neg_integer(),
@@ -200,7 +208,17 @@ transfer(Session, Amqp10Msg, Timeout) ->
200208
[Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
201209
gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout).
202210

203-
flow(Session, Handle, Flow, RenewWhenBelow) ->
211+
-spec flow(pid(), non_neg_integer(), never | pos_integer()) -> ok.
212+
flow(Session, IncomingWindow, RenewWhenBelow) when
213+
%% Check that the RenewWhenBelow value make sense.
214+
RenewWhenBelow =:= never orelse
215+
is_integer(RenewWhenBelow) andalso
216+
RenewWhenBelow > 0 andalso
217+
RenewWhenBelow =< IncomingWindow ->
218+
gen_statem:cast(Session, {flow_session, IncomingWindow, RenewWhenBelow}).
219+
220+
-spec flow_link(pid(), link_handle(), #'v1_0.flow'{}, never | pos_integer()) -> ok.
221+
flow_link(Session, Handle, Flow, RenewWhenBelow) ->
204222
gen_statem:cast(Session, {flow_link, Handle, Flow, RenewWhenBelow}).
205223

206224
%% Sending a disposition on a sender link (with receiver-settle-mode = second)
@@ -239,6 +257,9 @@ init([FromPid, Channel, Reader, ConnConfig]) ->
239257
channel = Channel,
240258
reader = Reader,
241259
connection_config = ConnConfig,
260+
incoming_window = ?DEFAULT_MAX_INCOMING_WINDOW,
261+
auto_flow = {?DEFAULT_MAX_INCOMING_WINDOW div 2,
262+
?DEFAULT_MAX_INCOMING_WINDOW},
242263
early_attach_requests = []},
243264
{ok, unmapped, State}.
244265

@@ -282,15 +303,15 @@ mapped(cast, 'end', State) ->
282303
mapped(cast, {flow_link, OutHandle, Flow0, RenewWhenBelow}, State0) ->
283304
State = send_flow_link(OutHandle, Flow0, RenewWhenBelow, State0),
284305
{keep_state, State};
285-
mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, IncomingWindow}}},
286-
#state{next_incoming_id = NII,
287-
next_outgoing_id = NOI} = State) ->
288-
Flow = Flow0#'v1_0.flow'{
289-
next_incoming_id = maybe_uint(NII),
290-
next_outgoing_id = uint(NOI),
291-
outgoing_window = ?UINT_OUTGOING_WINDOW},
292-
ok = send(Flow, State),
293-
{keep_state, State#state{incoming_window = IncomingWindow}};
306+
mapped(cast, {flow_session, IncomingWindow, RenewWhenBelow}, State0) ->
307+
AutoFlow = case RenewWhenBelow of
308+
never -> never;
309+
_ -> {RenewWhenBelow, IncomingWindow}
310+
end,
311+
State = State0#state{incoming_window = IncomingWindow,
312+
auto_flow = AutoFlow},
313+
send_flow_session(State),
314+
{keep_state, State};
294315
mapped(cast, #'v1_0.end'{} = End, State) ->
295316
%% We receive the first end frame, reply and terminate.
296317
_ = send_end(State),
@@ -656,35 +677,44 @@ is_bare_message_section(_Section) ->
656677

657678
send_flow_link(OutHandle,
658679
#'v1_0.flow'{link_credit = {uint, Credit}} = Flow0, RenewWhenBelow,
659-
#state{links = Links,
660-
next_incoming_id = NII,
661-
next_outgoing_id = NOI,
662-
incoming_window = InWin} = State) ->
680+
#state{links = Links} = State) ->
663681
AutoFlow = case RenewWhenBelow of
664682
never -> never;
665-
Limit -> {auto, Limit, Credit}
683+
_ -> {RenewWhenBelow, Credit}
666684
end,
667685
#{OutHandle := #link{output_handle = H,
668686
role = receiver,
669687
delivery_count = DeliveryCount,
670688
available = Available} = Link} = Links,
671-
Flow = Flow0#'v1_0.flow'{
672-
handle = uint(H),
673-
%% "This value MUST be set if the peer has received the begin
674-
%% frame for the session, and MUST NOT be set if it has not." [2.7.4]
675-
next_incoming_id = maybe_uint(NII),
676-
next_outgoing_id = uint(NOI),
677-
outgoing_window = ?UINT_OUTGOING_WINDOW,
678-
incoming_window = uint(InWin),
679-
%% "In the event that the receiving link endpoint has not yet seen the
680-
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
681-
delivery_count = maybe_uint(DeliveryCount),
682-
available = uint(Available)},
689+
Flow1 = Flow0#'v1_0.flow'{
690+
handle = uint(H),
691+
%% "In the event that the receiving link endpoint has not yet seen the
692+
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
693+
delivery_count = maybe_uint(DeliveryCount),
694+
available = uint(Available)},
695+
Flow = set_flow_session_fields(Flow1, State),
683696
ok = send(Flow, State),
684697
State#state{links = Links#{OutHandle =>
685698
Link#link{link_credit = Credit,
686699
auto_flow = AutoFlow}}}.
687700

701+
send_flow_session(State) ->
702+
Flow = set_flow_session_fields(#'v1_0.flow'{}, State),
703+
ok = send(Flow, State).
704+
705+
set_flow_session_fields(Flow, #state{next_incoming_id = NID,
706+
incoming_window = IW,
707+
next_outgoing_id = NOI}) ->
708+
Flow#'v1_0.flow'{
709+
%% "This value MUST be set if the peer has received the begin
710+
%% frame for the session, and MUST NOT be set if it has not." [2.7.4]
711+
next_incoming_id = maybe_uint(NID),
712+
%% IncomingWindow0 can be negative when the sending server overshoots our window.
713+
%% We must set a floor of 0 in the FLOW frame because field incoming-window is an uint.
714+
incoming_window = uint(max(0, IW)),
715+
next_outgoing_id = uint(NOI),
716+
outgoing_window = ?UINT_OUTGOING_WINDOW}.
717+
688718
build_frames(Channel, Trf, Bin, MaxPayloadSize, Acc)
689719
when byte_size(Bin) =< MaxPayloadSize ->
690720
T = amqp10_framing:encode_bin(Trf#'v1_0.transfer'{more = false}),
@@ -1059,17 +1089,21 @@ book_transfer_send(Num, #link{output_handle = Handle} = Link,
10591089
links = Links#{Handle => book_link_transfer_send(Link)}}.
10601090

10611091
book_partial_transfer_received(#state{next_incoming_id = NID,
1062-
remote_outgoing_window = ROW} = State) ->
1063-
State#state{next_incoming_id = add(NID, 1),
1064-
remote_outgoing_window = ROW - 1}.
1092+
incoming_window = IW,
1093+
remote_outgoing_window = ROW} = State0) ->
1094+
State = State0#state{next_incoming_id = add(NID, 1),
1095+
incoming_window = IW - 1,
1096+
remote_outgoing_window = ROW - 1},
1097+
maybe_widen_incoming_window(State).
10651098

10661099
book_transfer_received(State = #state{connection_config =
10671100
#{transfer_limit_margin := Margin}},
10681101
#link{link_credit = Margin} = Link) ->
10691102
{transfer_limit_exceeded, Link, State};
10701103
book_transfer_received(#state{next_incoming_id = NID,
1104+
incoming_window = IW,
10711105
remote_outgoing_window = ROW,
1072-
links = Links} = State,
1106+
links = Links} = State0,
10731107
#link{output_handle = OutHandle,
10741108
delivery_count = DC,
10751109
link_credit = LC,
@@ -1079,19 +1113,31 @@ book_transfer_received(#state{next_incoming_id = NID,
10791113
%% "the receiver MUST maintain a floor of zero in its
10801114
%% calculation of the value of available" [2.6.7]
10811115
available = max(0, Avail - 1)},
1082-
State1 = State#state{links = Links#{OutHandle => Link1},
1083-
next_incoming_id = add(NID, 1),
1084-
remote_outgoing_window = ROW - 1},
1116+
State1 = State0#state{links = Links#{OutHandle => Link1},
1117+
next_incoming_id = add(NID, 1),
1118+
incoming_window = IW - 1,
1119+
remote_outgoing_window = ROW - 1},
1120+
State = maybe_widen_incoming_window(State1),
10851121
case Link1 of
10861122
#link{link_credit = 0,
10871123
auto_flow = never} ->
1088-
{credit_exhausted, Link1, State1};
1124+
{credit_exhausted, Link1, State};
10891125
_ ->
1090-
{ok, Link1, State1}
1126+
{ok, Link1, State}
10911127
end.
10921128

1129+
maybe_widen_incoming_window(
1130+
State0 = #state{incoming_window = IncomingWindow,
1131+
auto_flow = {RenewWhenBelow, NewWindowSize}})
1132+
when IncomingWindow < RenewWhenBelow ->
1133+
State = State0#state{incoming_window = NewWindowSize},
1134+
send_flow_session(State),
1135+
State;
1136+
maybe_widen_incoming_window(State) ->
1137+
State.
1138+
10931139
auto_flow(#link{link_credit = LC,
1094-
auto_flow = {auto, RenewWhenBelow, Credit},
1140+
auto_flow = {RenewWhenBelow, Credit},
10951141
output_handle = OutHandle,
10961142
incoming_unsettled = Unsettled},
10971143
State)
@@ -1230,6 +1276,7 @@ format_status(Status = #{data := Data0}) ->
12301276
remote_channel = RemoteChannel,
12311277
next_incoming_id = NextIncomingId,
12321278
incoming_window = IncomingWindow,
1279+
auto_flow = SessionAutoFlow,
12331280
next_outgoing_id = NextOutgoingId,
12341281
remote_incoming_window = RemoteIncomingWindow,
12351282
remote_outgoing_window = RemoteOutgoingWindow,
@@ -1294,6 +1341,7 @@ format_status(Status = #{data := Data0}) ->
12941341
remote_channel => RemoteChannel,
12951342
next_incoming_id => NextIncomingId,
12961343
incoming_window => IncomingWindow,
1344+
auto_flow => SessionAutoFlow,
12971345
next_outgoing_id => NextOutgoingId,
12981346
remote_incoming_window => RemoteIncomingWindow,
12991347
remote_outgoing_window => RemoteOutgoingWindow,

0 commit comments

Comments
 (0)