From 0379b05fff0b89ca212d6fb300fffe543f3a3d7a Mon Sep 17 00:00:00 2001 From: hlibman-connamara Date: Fri, 9 May 2025 12:06:52 -0500 Subject: [PATCH 1/2] add logic to block messages when a rResend Request is active --- in_session.go | 2 ++ in_session_test.go | 20 ++++++++++++++++++++ session.go | 23 +++++++++++++++++++++++ 3 files changed, 45 insertions(+) diff --git a/in_session.go b/in_session.go index 5445967cd..2c93324ab 100644 --- a/in_session.go +++ b/in_session.go @@ -210,6 +210,7 @@ func (state inSession) handleResendRequest(session *session, msg *Message) (next if err := state.resendMessages(session, int(beginSeqNo), endSeqNo, *msg); err != nil { return handleStateError(session, err) } + session.isResendRequestActive = false if err := session.checkTargetTooLow(msg); err != nil { return state @@ -281,6 +282,7 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int } func (state inSession) processReject(session *session, msg *Message, rej MessageRejectError) sessionState { + session.isResendRequestActive = false switch TypedError := rej.(type) { case targetTooHigh: diff --git a/in_session_test.go b/in_session_test.go index 8973e4ef0..c445a0970 100644 --- a/in_session_test.go +++ b/in_session_test.go @@ -373,6 +373,26 @@ func (s *InSessionTestSuite) TestFIXMsgInResendRequestDoNotSendApp() { s.State(inSession{}) } +func (s *InSessionTestSuite) TestIsResendRequestActive() { + s.MockApp.On("FromAdmin").Return(nil) + s.MockApp.On("ToApp").Return(nil) + s.MockApp.On("ToAdmin") + s.sendResendRequest(5, 10) + s.True(s.isResendRequestActive) + + nos := s.NewOrderSingle() + err := s.session.sendInReplyTo(nos, nil) + + s.Error(err) + s.EqualError(err, "cannot send message while resend request is active") + s.fixMsgIn(s.session, s.ResendRequest(5)) + s.False(s.isResendRequestActive) + + err = s.session.sendInReplyTo(nos, nil) + + s.NoError(err) +} + func (s *InSessionTestSuite) TestFIXMsgInTargetTooLow() { s.IncrNextTargetMsgSeqNum() diff --git a/session.go b/session.go index 8213f8486..04864b8ed 100644 --- a/session.go +++ b/session.go @@ -52,6 +52,8 @@ type session struct { sentReset bool stopOnce sync.Once + isResendRequestActive bool + targetDefaultApplVerID string admin chan interface{} @@ -305,6 +307,12 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error { s.sendMutex.Lock() defer s.sendMutex.Unlock() + if blocked, err := s.isResendRequestBlocking(msg); err != nil { + return err + } else if blocked { + return nil + } + msgBytes, err := s.prepMessageForSend(msg, inReplyTo) if err != nil { return err @@ -316,6 +324,20 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error { return nil } +func (s *session) isResendRequestBlocking(msg *Message) (bool, error) { + msgType, err := msg.Header.GetBytes(tagMsgType) + if err != nil { + return false, err + } + + if s.isResendRequestActive && !bytes.Equal(msgType, msgTypeResendRequest) { + s.log.OnEvent("Message blocked: resend request in progress") + return true, errors.New("cannot send message while resend request is active") + } + + return false, nil +} + // dropAndReset will drop the send queue and reset the message store. func (s *session) dropAndReset() error { s.sendMutex.Lock() @@ -477,6 +499,7 @@ func (s *session) sendResendRequest(beginSeq, endSeq int) (nextState resendState return } s.log.OnEventf("Sent ResendRequest FROM: %v TO: %v", beginSeq, endSeqNo) + s.isResendRequestActive = true return } From 03c38dcb2c0ac4243674b8e3301051ba5ecb2ec9 Mon Sep 17 00:00:00 2001 From: hlibman-connamara Date: Fri, 9 May 2025 12:56:11 -0500 Subject: [PATCH 2/2] update logic to fix failing tests --- in_session.go | 4 ++-- in_session_test.go | 6 +++--- logon_state.go | 2 +- resend_state.go | 4 ++-- session.go | 28 ++++++++++++++-------------- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/in_session.go b/in_session.go index 2c93324ab..69b110530 100644 --- a/in_session.go +++ b/in_session.go @@ -208,6 +208,7 @@ func (state inSession) handleResendRequest(session *session, msg *Message) (next } if err := state.resendMessages(session, int(beginSeqNo), endSeqNo, *msg); err != nil { + session.isResendRequestActive = false return handleStateError(session, err) } session.isResendRequestActive = false @@ -282,7 +283,6 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int } func (state inSession) processReject(session *session, msg *Message, rej MessageRejectError) sessionState { - session.isResendRequestActive = false switch TypedError := rej.(type) { case targetTooHigh: @@ -293,7 +293,7 @@ func (state inSession) processReject(session *session, msg *Message, rej Message nextState = currentState default: var err error - if nextState, err = session.doTargetTooHigh(TypedError); err != nil { + if nextState, err = session.doTargetTooHigh(TypedError, true); err != nil { return handleStateError(session, err) } } diff --git a/in_session_test.go b/in_session_test.go index c445a0970..3740bad3f 100644 --- a/in_session_test.go +++ b/in_session_test.go @@ -377,18 +377,18 @@ func (s *InSessionTestSuite) TestIsResendRequestActive() { s.MockApp.On("FromAdmin").Return(nil) s.MockApp.On("ToApp").Return(nil) s.MockApp.On("ToAdmin") - s.sendResendRequest(5, 10) + s.sendResendRequest(5, 10, false) s.True(s.isResendRequestActive) nos := s.NewOrderSingle() - err := s.session.sendInReplyTo(nos, nil) + err := s.session.send(nos) s.Error(err) s.EqualError(err, "cannot send message while resend request is active") s.fixMsgIn(s.session, s.ResendRequest(5)) s.False(s.isResendRequestActive) - err = s.session.sendInReplyTo(nos, nil) + err = s.session.send(nos) s.NoError(err) } diff --git a/logon_state.go b/logon_state.go index 7ad8b6602..cb0fc54bb 100644 --- a/logon_state.go +++ b/logon_state.go @@ -46,7 +46,7 @@ func (s logonState) FixMsgIn(session *session, msg *Message) (nextState sessionS case targetTooHigh: var tooHighErr error - if nextState, tooHighErr = session.doTargetTooHigh(err); tooHighErr != nil { + if nextState, tooHighErr = session.doTargetTooHigh(err, false); tooHighErr != nil { return shutdownWithReason(session, msg, false, tooHighErr.Error()) } diff --git a/resend_state.go b/resend_state.go index f3f601e9c..0ac782e6e 100644 --- a/resend_state.go +++ b/resend_state.go @@ -48,7 +48,7 @@ func (s resendState) FixMsgIn(session *session, msg *Message) (nextState session } if s.currentResendRangeEnd != 0 && s.currentResendRangeEnd < session.store.NextTargetMsgSeqNum() { - nextResendState, err := session.sendResendRequest(session.store.NextTargetMsgSeqNum(), s.resendRangeEnd) + nextResendState, err := session.sendResendRequest(session.store.NextTargetMsgSeqNum(), s.resendRangeEnd, false) if err != nil { return handleStateError(session, err) } @@ -64,7 +64,7 @@ func (s resendState) FixMsgIn(session *session, msg *Message) (nextState session } if bool(gapFillFlag) && s.currentResendRangeEnd != 0 && s.currentResendRangeEnd == session.store.NextTargetMsgSeqNum() { - nextResendState, err := session.sendResendRequest(session.store.NextTargetMsgSeqNum(), s.resendRangeEnd) + nextResendState, err := session.sendResendRequest(session.store.NextTargetMsgSeqNum(), s.resendRangeEnd, false) if err != nil { return handleStateError(session, err) } diff --git a/session.go b/session.go index 04864b8ed..ef2c1dcbf 100644 --- a/session.go +++ b/session.go @@ -297,6 +297,10 @@ func (s *session) notifyMessageOut() { // send will validate, persist, queue the message. If the session is logged on, send all messages in the queue. func (s *session) send(msg *Message) error { + if err := s.isResendRequestBlocking(msg); err != nil { + return err + } + return s.sendInReplyTo(msg, nil) } func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error { @@ -307,12 +311,6 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error { s.sendMutex.Lock() defer s.sendMutex.Unlock() - if blocked, err := s.isResendRequestBlocking(msg); err != nil { - return err - } else if blocked { - return nil - } - msgBytes, err := s.prepMessageForSend(msg, inReplyTo) if err != nil { return err @@ -324,18 +322,18 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error { return nil } -func (s *session) isResendRequestBlocking(msg *Message) (bool, error) { +func (s *session) isResendRequestBlocking(msg *Message) error { msgType, err := msg.Header.GetBytes(tagMsgType) if err != nil { - return false, err + return err } if s.isResendRequestActive && !bytes.Equal(msgType, msgTypeResendRequest) { s.log.OnEvent("Message blocked: resend request in progress") - return true, errors.New("cannot send message while resend request is active") + return errors.New("cannot send message while resend request is active") } - return false, nil + return nil } // dropAndReset will drop the send queue and reset the message store. @@ -465,12 +463,12 @@ func (s *session) sendBytes(msg []byte, blockUntilSent bool) bool { } } -func (s *session) doTargetTooHigh(reject targetTooHigh) (nextState resendState, err error) { +func (s *session) doTargetTooHigh(reject targetTooHigh, isReject bool) (nextState resendState, err error) { s.log.OnEventf("MsgSeqNum too high, expecting %v but received %v", reject.ExpectedTarget, reject.ReceivedTarget) - return s.sendResendRequest(reject.ExpectedTarget, reject.ReceivedTarget-1) + return s.sendResendRequest(reject.ExpectedTarget, reject.ReceivedTarget-1, isReject) } -func (s *session) sendResendRequest(beginSeq, endSeq int) (nextState resendState, err error) { +func (s *session) sendResendRequest(beginSeq, endSeq int, isReject bool) (nextState resendState, err error) { nextState.resendRangeEnd = endSeq resend := NewMessage() @@ -499,7 +497,9 @@ func (s *session) sendResendRequest(beginSeq, endSeq int) (nextState resendState return } s.log.OnEventf("Sent ResendRequest FROM: %v TO: %v", beginSeq, endSeqNo) - s.isResendRequestActive = true + if !isReject { + s.isResendRequestActive = true + } return }