diff --git a/in_session.go b/in_session.go index 5445967cd..69b110530 100644 --- a/in_session.go +++ b/in_session.go @@ -208,8 +208,10 @@ func (state inSession) handleResendRequest(session *session, msg *Message) (next } if err := state.resendMessages(session, int(beginSeqNo), endSeqNo, *msg); err != nil { + session.isResendRequestActive = false return handleStateError(session, err) } + session.isResendRequestActive = false if err := session.checkTargetTooLow(msg); err != nil { return state @@ -291,7 +293,7 @@ func (state inSession) processReject(session *session, msg *Message, rej Message nextState = currentState default: var err error - if nextState, err = session.doTargetTooHigh(TypedError); err != nil { + if nextState, err = session.doTargetTooHigh(TypedError, true); err != nil { return handleStateError(session, err) } } diff --git a/in_session_test.go b/in_session_test.go index 8973e4ef0..3740bad3f 100644 --- a/in_session_test.go +++ b/in_session_test.go @@ -373,6 +373,26 @@ func (s *InSessionTestSuite) TestFIXMsgInResendRequestDoNotSendApp() { s.State(inSession{}) } +func (s *InSessionTestSuite) TestIsResendRequestActive() { + s.MockApp.On("FromAdmin").Return(nil) + s.MockApp.On("ToApp").Return(nil) + s.MockApp.On("ToAdmin") + s.sendResendRequest(5, 10, false) + s.True(s.isResendRequestActive) + + nos := s.NewOrderSingle() + err := s.session.send(nos) + + s.Error(err) + s.EqualError(err, "cannot send message while resend request is active") + s.fixMsgIn(s.session, s.ResendRequest(5)) + s.False(s.isResendRequestActive) + + err = s.session.send(nos) + + s.NoError(err) +} + func (s *InSessionTestSuite) TestFIXMsgInTargetTooLow() { s.IncrNextTargetMsgSeqNum() diff --git a/logon_state.go b/logon_state.go index 7ad8b6602..cb0fc54bb 100644 --- a/logon_state.go +++ b/logon_state.go @@ -46,7 +46,7 @@ func (s logonState) FixMsgIn(session *session, msg *Message) (nextState sessionS case targetTooHigh: var tooHighErr error - if nextState, tooHighErr = session.doTargetTooHigh(err); tooHighErr != nil { + if nextState, tooHighErr = session.doTargetTooHigh(err, false); tooHighErr != nil { return shutdownWithReason(session, msg, false, tooHighErr.Error()) } diff --git a/resend_state.go b/resend_state.go index f3f601e9c..0ac782e6e 100644 --- a/resend_state.go +++ b/resend_state.go @@ -48,7 +48,7 @@ func (s resendState) FixMsgIn(session *session, msg *Message) (nextState session } if s.currentResendRangeEnd != 0 && s.currentResendRangeEnd < session.store.NextTargetMsgSeqNum() { - nextResendState, err := session.sendResendRequest(session.store.NextTargetMsgSeqNum(), s.resendRangeEnd) + nextResendState, err := session.sendResendRequest(session.store.NextTargetMsgSeqNum(), s.resendRangeEnd, false) if err != nil { return handleStateError(session, err) } @@ -64,7 +64,7 @@ func (s resendState) FixMsgIn(session *session, msg *Message) (nextState session } if bool(gapFillFlag) && s.currentResendRangeEnd != 0 && s.currentResendRangeEnd == session.store.NextTargetMsgSeqNum() { - nextResendState, err := session.sendResendRequest(session.store.NextTargetMsgSeqNum(), s.resendRangeEnd) + nextResendState, err := session.sendResendRequest(session.store.NextTargetMsgSeqNum(), s.resendRangeEnd, false) if err != nil { return handleStateError(session, err) } diff --git a/session.go b/session.go index 8213f8486..ef2c1dcbf 100644 --- a/session.go +++ b/session.go @@ -52,6 +52,8 @@ type session struct { sentReset bool stopOnce sync.Once + isResendRequestActive bool + targetDefaultApplVerID string admin chan interface{} @@ -295,6 +297,10 @@ func (s *session) notifyMessageOut() { // send will validate, persist, queue the message. If the session is logged on, send all messages in the queue. func (s *session) send(msg *Message) error { + if err := s.isResendRequestBlocking(msg); err != nil { + return err + } + return s.sendInReplyTo(msg, nil) } func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error { @@ -316,6 +322,20 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error { return nil } +func (s *session) isResendRequestBlocking(msg *Message) error { + msgType, err := msg.Header.GetBytes(tagMsgType) + if err != nil { + return err + } + + if s.isResendRequestActive && !bytes.Equal(msgType, msgTypeResendRequest) { + s.log.OnEvent("Message blocked: resend request in progress") + return errors.New("cannot send message while resend request is active") + } + + return nil +} + // dropAndReset will drop the send queue and reset the message store. func (s *session) dropAndReset() error { s.sendMutex.Lock() @@ -443,12 +463,12 @@ func (s *session) sendBytes(msg []byte, blockUntilSent bool) bool { } } -func (s *session) doTargetTooHigh(reject targetTooHigh) (nextState resendState, err error) { +func (s *session) doTargetTooHigh(reject targetTooHigh, isReject bool) (nextState resendState, err error) { s.log.OnEventf("MsgSeqNum too high, expecting %v but received %v", reject.ExpectedTarget, reject.ReceivedTarget) - return s.sendResendRequest(reject.ExpectedTarget, reject.ReceivedTarget-1) + return s.sendResendRequest(reject.ExpectedTarget, reject.ReceivedTarget-1, isReject) } -func (s *session) sendResendRequest(beginSeq, endSeq int) (nextState resendState, err error) { +func (s *session) sendResendRequest(beginSeq, endSeq int, isReject bool) (nextState resendState, err error) { nextState.resendRangeEnd = endSeq resend := NewMessage() @@ -477,6 +497,9 @@ func (s *session) sendResendRequest(beginSeq, endSeq int) (nextState resendState return } s.log.OnEventf("Sent ResendRequest FROM: %v TO: %v", beginSeq, endSeqNo) + if !isReject { + s.isResendRequestActive = true + } return }