Skip to content

Block messages when a Resend Request is active #713

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion in_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,10 @@ func (state inSession) handleResendRequest(session *session, msg *Message) (next
}

if err := state.resendMessages(session, int(beginSeqNo), endSeqNo, *msg); err != nil {
session.isResendRequestActive = false
return handleStateError(session, err)
}
session.isResendRequestActive = false

if err := session.checkTargetTooLow(msg); err != nil {
return state
Expand Down Expand Up @@ -291,7 +293,7 @@ func (state inSession) processReject(session *session, msg *Message, rej Message
nextState = currentState
default:
var err error
if nextState, err = session.doTargetTooHigh(TypedError); err != nil {
if nextState, err = session.doTargetTooHigh(TypedError, true); err != nil {
return handleStateError(session, err)
}
}
Expand Down
20 changes: 20 additions & 0 deletions in_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,26 @@ func (s *InSessionTestSuite) TestFIXMsgInResendRequestDoNotSendApp() {
s.State(inSession{})
}

func (s *InSessionTestSuite) TestIsResendRequestActive() {
s.MockApp.On("FromAdmin").Return(nil)
s.MockApp.On("ToApp").Return(nil)
s.MockApp.On("ToAdmin")
s.sendResendRequest(5, 10, false)
s.True(s.isResendRequestActive)

nos := s.NewOrderSingle()
err := s.session.send(nos)

s.Error(err)
s.EqualError(err, "cannot send message while resend request is active")
s.fixMsgIn(s.session, s.ResendRequest(5))
s.False(s.isResendRequestActive)

err = s.session.send(nos)

s.NoError(err)
}

func (s *InSessionTestSuite) TestFIXMsgInTargetTooLow() {
s.IncrNextTargetMsgSeqNum()

Expand Down
2 changes: 1 addition & 1 deletion logon_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s logonState) FixMsgIn(session *session, msg *Message) (nextState sessionS

case targetTooHigh:
var tooHighErr error
if nextState, tooHighErr = session.doTargetTooHigh(err); tooHighErr != nil {
if nextState, tooHighErr = session.doTargetTooHigh(err, false); tooHighErr != nil {
return shutdownWithReason(session, msg, false, tooHighErr.Error())
}

Expand Down
4 changes: 2 additions & 2 deletions resend_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s resendState) FixMsgIn(session *session, msg *Message) (nextState session
}

if s.currentResendRangeEnd != 0 && s.currentResendRangeEnd < session.store.NextTargetMsgSeqNum() {
nextResendState, err := session.sendResendRequest(session.store.NextTargetMsgSeqNum(), s.resendRangeEnd)
nextResendState, err := session.sendResendRequest(session.store.NextTargetMsgSeqNum(), s.resendRangeEnd, false)
if err != nil {
return handleStateError(session, err)
}
Expand All @@ -64,7 +64,7 @@ func (s resendState) FixMsgIn(session *session, msg *Message) (nextState session
}

if bool(gapFillFlag) && s.currentResendRangeEnd != 0 && s.currentResendRangeEnd == session.store.NextTargetMsgSeqNum() {
nextResendState, err := session.sendResendRequest(session.store.NextTargetMsgSeqNum(), s.resendRangeEnd)
nextResendState, err := session.sendResendRequest(session.store.NextTargetMsgSeqNum(), s.resendRangeEnd, false)
if err != nil {
return handleStateError(session, err)
}
Expand Down
29 changes: 26 additions & 3 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type session struct {
sentReset bool
stopOnce sync.Once

isResendRequestActive bool

targetDefaultApplVerID string

admin chan interface{}
Expand Down Expand Up @@ -295,6 +297,10 @@ func (s *session) notifyMessageOut() {

// send will validate, persist, queue the message. If the session is logged on, send all messages in the queue.
func (s *session) send(msg *Message) error {
if err := s.isResendRequestBlocking(msg); err != nil {
return err
}

return s.sendInReplyTo(msg, nil)
}
func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error {
Expand All @@ -316,6 +322,20 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error {
return nil
}

func (s *session) isResendRequestBlocking(msg *Message) error {
msgType, err := msg.Header.GetBytes(tagMsgType)
if err != nil {
return err
}

if s.isResendRequestActive && !bytes.Equal(msgType, msgTypeResendRequest) {
s.log.OnEvent("Message blocked: resend request in progress")
return errors.New("cannot send message while resend request is active")
}

return nil
}

// dropAndReset will drop the send queue and reset the message store.
func (s *session) dropAndReset() error {
s.sendMutex.Lock()
Expand Down Expand Up @@ -443,12 +463,12 @@ func (s *session) sendBytes(msg []byte, blockUntilSent bool) bool {
}
}

func (s *session) doTargetTooHigh(reject targetTooHigh) (nextState resendState, err error) {
func (s *session) doTargetTooHigh(reject targetTooHigh, isReject bool) (nextState resendState, err error) {
s.log.OnEventf("MsgSeqNum too high, expecting %v but received %v", reject.ExpectedTarget, reject.ReceivedTarget)
return s.sendResendRequest(reject.ExpectedTarget, reject.ReceivedTarget-1)
return s.sendResendRequest(reject.ExpectedTarget, reject.ReceivedTarget-1, isReject)
}

func (s *session) sendResendRequest(beginSeq, endSeq int) (nextState resendState, err error) {
func (s *session) sendResendRequest(beginSeq, endSeq int, isReject bool) (nextState resendState, err error) {
nextState.resendRangeEnd = endSeq

resend := NewMessage()
Expand Down Expand Up @@ -477,6 +497,9 @@ func (s *session) sendResendRequest(beginSeq, endSeq int) (nextState resendState
return
}
s.log.OnEventf("Sent ResendRequest FROM: %v TO: %v", beginSeq, endSeqNo)
if !isReject {
s.isResendRequestActive = true
}

return
}
Expand Down
Loading