From c527fc8ecf81f098850c2ce5b334dc8188ea1e68 Mon Sep 17 00:00:00 2001 From: hlibman-connamara Date: Wed, 14 May 2025 13:16:56 -0500 Subject: [PATCH 1/5] block messages during resend request --- in_session.go | 11 +++++++++++ in_session_test.go | 36 ++++++++++++++++++++++++++++++++++++ quickfix_test.go | 2 ++ session.go | 10 ++++++++++ session_factory.go | 1 + 5 files changed, 60 insertions(+) diff --git a/in_session.go b/in_session.go index 5445967cd..5c4e8b336 100644 --- a/in_session.go +++ b/in_session.go @@ -230,6 +230,17 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int return state.generateSequenceReset(session, beginSeqNo, endSeqNo+1, inReplyTo) } + session.resendMutex.Lock() + session.resendRequestActive = true + session.resendMutex.Unlock() + + defer func() { + session.resendMutex.Lock() + session.resendRequestActive = false + session.resendMutex.Unlock() + session.resendCond.Broadcast() + }() + seqNum := beginSeqNo nextSeqNum := seqNum msg := NewMessage() diff --git a/in_session_test.go b/in_session_test.go index 8973e4ef0..27287fa68 100644 --- a/in_session_test.go +++ b/in_session_test.go @@ -373,6 +373,42 @@ func (s *InSessionTestSuite) TestFIXMsgInResendRequestDoNotSendApp() { s.State(inSession{}) } +func (s *InSessionTestSuite) TestSendBlockedWhenResendRequestActive() { + s.MockApp.On("ToApp").Return(nil) + + s.session.resendMutex.Lock() + s.session.resendRequestActive = true + s.session.resendMutex.Unlock() + + sendCompleted := make(chan struct{}) + go func() { + err := s.session.send(s.NewOrderSingle()) + s.Require().NoError(err) + close(sendCompleted) + }() + + select { + case <-sendCompleted: + s.Fail("send should be blocked during active resend") + case <-time.After(50 * time.Millisecond): + s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 0) + } + + s.session.resendMutex.Lock() + s.session.resendRequestActive = false + s.session.resendMutex.Unlock() + s.session.resendCond.Broadcast() + + select { + case <-sendCompleted: + s.LastToAppMessageSent() + s.MessageType("D", s.MockApp.lastToApp) + s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1) + case <-time.After(100 * time.Millisecond): + s.Fail("send did not proceed after resend was cleared") + } +} + func (s *InSessionTestSuite) TestFIXMsgInTargetTooLow() { s.IncrNextTargetMsgSeqNum() diff --git a/quickfix_test.go b/quickfix_test.go index db308b48e..21a4f3467 100644 --- a/quickfix_test.go +++ b/quickfix_test.go @@ -16,6 +16,7 @@ package quickfix import ( + "sync" "time" "github.com/stretchr/testify/mock" @@ -221,6 +222,7 @@ func (s *SessionSuiteRig) Init() { messageOut: s.Receiver.sendChannel, sessionEvent: make(chan internal.Event), } + s.session.resendCond = sync.NewCond(&s.resendMutex) s.MaxLatency = 120 * time.Second } diff --git a/session.go b/session.go index 8213f8486..b803cd3f4 100644 --- a/session.go +++ b/session.go @@ -42,6 +42,10 @@ type session struct { // Mutex for access to toSend. sendMutex sync.Mutex + resendRequestActive bool + resendMutex sync.Mutex + resendCond *sync.Cond + sessionEvent chan internal.Event messageEvent chan bool application Application @@ -302,6 +306,12 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error { return s.queueForSend(msg) } + s.resendMutex.Lock() + for s.resendRequestActive { + s.resendCond.Wait() + } + s.resendMutex.Unlock() + s.sendMutex.Lock() defer s.sendMutex.Unlock() diff --git a/session_factory.go b/session_factory.go index dbb8e0a87..6ceaa522a 100644 --- a/session_factory.go +++ b/session_factory.go @@ -89,6 +89,7 @@ func (f sessionFactory) newSession( sessionID: sessionID, stopOnce: sync.Once{}, } + s.resendCond = sync.NewCond(&s.resendMutex) var validatorSettings = defaultValidatorSettings if settings.HasSetting(config.ValidateFieldsOutOfOrder) { From 4d324d509aa383bdd1bdf441e5fd470ba67ad980 Mon Sep 17 00:00:00 2001 From: hlibman-connamara Date: Thu, 15 May 2025 11:01:06 -0500 Subject: [PATCH 2/5] add resend mutex and unit test for blocking sends during resend request --- in_session_test.go | 52 +++++++++++++++++++--------------------------- session.go | 22 ++++++++++---------- 2 files changed, 32 insertions(+), 42 deletions(-) diff --git a/in_session_test.go b/in_session_test.go index 27287fa68..73b371b92 100644 --- a/in_session_test.go +++ b/in_session_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/quickfixgo/quickfix/internal" @@ -373,40 +374,29 @@ func (s *InSessionTestSuite) TestFIXMsgInResendRequestDoNotSendApp() { s.State(inSession{}) } -func (s *InSessionTestSuite) TestSendBlockedWhenResendRequestActive() { +func (s *InSessionTestSuite) TestFIXMsgInResendRequestBlocksSend() { s.MockApp.On("ToApp").Return(nil) + s.Require().Nil(s.session.send(s.NewOrderSingle())) + s.LastToAppMessageSent() + s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1) + s.NextSenderMsgSeqNum(2) - s.session.resendMutex.Lock() - s.session.resendRequestActive = true - s.session.resendMutex.Unlock() - - sendCompleted := make(chan struct{}) - go func() { - err := s.session.send(s.NewOrderSingle()) - s.Require().NoError(err) - close(sendCompleted) - }() - - select { - case <-sendCompleted: - s.Fail("send should be blocked during active resend") - case <-time.After(50 * time.Millisecond): - s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 0) - } + s.MockStore.On("IterateMessages", mock.Anything, mock.Anything, mock.AnythingOfType("func([]byte) error")). + Run(func(args mock.Arguments) { + s.Require().Nil(s.session.send(s.NewOrderSingle())) + }). + Return(nil) - s.session.resendMutex.Lock() - s.session.resendRequestActive = false - s.session.resendMutex.Unlock() - s.session.resendCond.Broadcast() - - select { - case <-sendCompleted: - s.LastToAppMessageSent() - s.MessageType("D", s.MockApp.lastToApp) - s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1) - case <-time.After(100 * time.Millisecond): - s.Fail("send did not proceed after resend was cleared") - } + s.MockApp.On("FromAdmin").Return(nil) + go s.fixMsgIn(s.session, s.ResendRequest(1)) + + s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1) + s.NextSenderMsgSeqNum(2) + + s.Require().Nil(s.session.send(s.NewOrderSingle())) + s.LastToAppMessageSent() + s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 2) + s.NextSenderMsgSeqNum(3) } func (s *InSessionTestSuite) TestFIXMsgInTargetTooLow() { diff --git a/session.go b/session.go index b803cd3f4..ae4e7614a 100644 --- a/session.go +++ b/session.go @@ -40,7 +40,7 @@ type session struct { toSend [][]byte // Mutex for access to toSend. - sendMutex sync.Mutex + sendMutex sync.RWMutex resendRequestActive bool resendMutex sync.Mutex @@ -275,8 +275,8 @@ func (s *session) resend(msg *Message) bool { // queueForSend will validate, persist, and queue the message for send. func (s *session) queueForSend(msg *Message) error { - s.sendMutex.Lock() - defer s.sendMutex.Unlock() + s.sendMutex.RLock() + defer s.sendMutex.RUnlock() msgBytes, err := s.prepMessageForSend(msg, nil) if err != nil { @@ -312,8 +312,8 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error { } s.resendMutex.Unlock() - s.sendMutex.Lock() - defer s.sendMutex.Unlock() + s.sendMutex.RLock() + defer s.sendMutex.RUnlock() msgBytes, err := s.prepMessageForSend(msg, inReplyTo) if err != nil { @@ -328,8 +328,8 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error { // dropAndReset will drop the send queue and reset the message store. func (s *session) dropAndReset() error { - s.sendMutex.Lock() - defer s.sendMutex.Unlock() + s.sendMutex.RLock() + defer s.sendMutex.RUnlock() s.dropQueued() return s.store.Reset() @@ -340,8 +340,8 @@ func (s *session) dropAndSend(msg *Message) error { return s.dropAndSendInReplyTo(msg, nil) } func (s *session) dropAndSendInReplyTo(msg *Message, inReplyTo *Message) error { - s.sendMutex.Lock() - defer s.sendMutex.Unlock() + s.sendMutex.RLock() + defer s.sendMutex.RUnlock() msgBytes, err := s.prepMessageForSend(msg, inReplyTo) if err != nil { @@ -423,8 +423,8 @@ func (s *session) dropQueued() { } func (s *session) EnqueueBytesAndSend(msg []byte) { - s.sendMutex.Lock() - defer s.sendMutex.Unlock() + s.sendMutex.RLock() + defer s.sendMutex.RUnlock() s.toSend = append(s.toSend, msg) s.sendQueued(true) From bbefdb78b95d9a5dad3ede608a00b93e98d67f1a Mon Sep 17 00:00:00 2001 From: hlibman-connamara Date: Thu, 15 May 2025 11:06:35 -0500 Subject: [PATCH 3/5] fix lint --- in_session_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/in_session_test.go b/in_session_test.go index 73b371b92..ef98e8ef6 100644 --- a/in_session_test.go +++ b/in_session_test.go @@ -382,7 +382,7 @@ func (s *InSessionTestSuite) TestFIXMsgInResendRequestBlocksSend() { s.NextSenderMsgSeqNum(2) s.MockStore.On("IterateMessages", mock.Anything, mock.Anything, mock.AnythingOfType("func([]byte) error")). - Run(func(args mock.Arguments) { + Run(func(_ mock.Arguments) { s.Require().Nil(s.session.send(s.NewOrderSingle())) }). Return(nil) From 1607e8b0663e8bfb4cb6d502787078c4a6facee4 Mon Sep 17 00:00:00 2001 From: hlibman-connamara Date: Thu, 15 May 2025 12:21:29 -0500 Subject: [PATCH 4/5] update resendMutex and unit test --- in_session.go | 10 +--------- in_session_test.go | 8 +------- quickfix_test.go | 2 -- session.go | 34 ++++++++++++++-------------------- session_factory.go | 1 - 5 files changed, 16 insertions(+), 39 deletions(-) diff --git a/in_session.go b/in_session.go index 5c4e8b336..b79e9c832 100644 --- a/in_session.go +++ b/in_session.go @@ -231,15 +231,7 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int } session.resendMutex.Lock() - session.resendRequestActive = true - session.resendMutex.Unlock() - - defer func() { - session.resendMutex.Lock() - session.resendRequestActive = false - session.resendMutex.Unlock() - session.resendCond.Broadcast() - }() + defer session.resendMutex.Unlock() seqNum := beginSeqNo nextSeqNum := seqNum diff --git a/in_session_test.go b/in_session_test.go index ef98e8ef6..d20979508 100644 --- a/in_session_test.go +++ b/in_session_test.go @@ -388,15 +388,9 @@ func (s *InSessionTestSuite) TestFIXMsgInResendRequestBlocksSend() { Return(nil) s.MockApp.On("FromAdmin").Return(nil) - go s.fixMsgIn(s.session, s.ResendRequest(1)) + s.fixMsgIn(s.session, s.ResendRequest(1)) - s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1) s.NextSenderMsgSeqNum(2) - - s.Require().Nil(s.session.send(s.NewOrderSingle())) - s.LastToAppMessageSent() - s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 2) - s.NextSenderMsgSeqNum(3) } func (s *InSessionTestSuite) TestFIXMsgInTargetTooLow() { diff --git a/quickfix_test.go b/quickfix_test.go index 21a4f3467..db308b48e 100644 --- a/quickfix_test.go +++ b/quickfix_test.go @@ -16,7 +16,6 @@ package quickfix import ( - "sync" "time" "github.com/stretchr/testify/mock" @@ -222,7 +221,6 @@ func (s *SessionSuiteRig) Init() { messageOut: s.Receiver.sendChannel, sessionEvent: make(chan internal.Event), } - s.session.resendCond = sync.NewCond(&s.resendMutex) s.MaxLatency = 120 * time.Second } diff --git a/session.go b/session.go index ae4e7614a..18b24fc82 100644 --- a/session.go +++ b/session.go @@ -40,11 +40,8 @@ type session struct { toSend [][]byte // Mutex for access to toSend. - sendMutex sync.RWMutex - - resendRequestActive bool - resendMutex sync.Mutex - resendCond *sync.Cond + sendMutex sync.Mutex + resendMutex sync.RWMutex sessionEvent chan internal.Event messageEvent chan bool @@ -275,8 +272,8 @@ func (s *session) resend(msg *Message) bool { // queueForSend will validate, persist, and queue the message for send. func (s *session) queueForSend(msg *Message) error { - s.sendMutex.RLock() - defer s.sendMutex.RUnlock() + s.sendMutex.Lock() + defer s.sendMutex.Unlock() msgBytes, err := s.prepMessageForSend(msg, nil) if err != nil { @@ -306,14 +303,11 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error { return s.queueForSend(msg) } - s.resendMutex.Lock() - for s.resendRequestActive { - s.resendCond.Wait() - } - s.resendMutex.Unlock() + s.resendMutex.RLock() + defer s.resendMutex.RUnlock() - s.sendMutex.RLock() - defer s.sendMutex.RUnlock() + s.sendMutex.Lock() + defer s.sendMutex.Unlock() msgBytes, err := s.prepMessageForSend(msg, inReplyTo) if err != nil { @@ -328,8 +322,8 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error { // dropAndReset will drop the send queue and reset the message store. func (s *session) dropAndReset() error { - s.sendMutex.RLock() - defer s.sendMutex.RUnlock() + s.sendMutex.Lock() + defer s.sendMutex.Unlock() s.dropQueued() return s.store.Reset() @@ -340,8 +334,8 @@ func (s *session) dropAndSend(msg *Message) error { return s.dropAndSendInReplyTo(msg, nil) } func (s *session) dropAndSendInReplyTo(msg *Message, inReplyTo *Message) error { - s.sendMutex.RLock() - defer s.sendMutex.RUnlock() + s.sendMutex.Lock() + defer s.sendMutex.Unlock() msgBytes, err := s.prepMessageForSend(msg, inReplyTo) if err != nil { @@ -423,8 +417,8 @@ func (s *session) dropQueued() { } func (s *session) EnqueueBytesAndSend(msg []byte) { - s.sendMutex.RLock() - defer s.sendMutex.RUnlock() + s.sendMutex.Lock() + defer s.sendMutex.Unlock() s.toSend = append(s.toSend, msg) s.sendQueued(true) diff --git a/session_factory.go b/session_factory.go index 6ceaa522a..dbb8e0a87 100644 --- a/session_factory.go +++ b/session_factory.go @@ -89,7 +89,6 @@ func (f sessionFactory) newSession( sessionID: sessionID, stopOnce: sync.Once{}, } - s.resendCond = sync.NewCond(&s.resendMutex) var validatorSettings = defaultValidatorSettings if settings.HasSetting(config.ValidateFieldsOutOfOrder) { From e9545667ae75262599cb429b010a84a58a848509 Mon Sep 17 00:00:00 2001 From: hlibman-connamara Date: Thu, 15 May 2025 16:15:37 -0500 Subject: [PATCH 5/5] add comments around resendMutex logic --- in_session.go | 2 ++ session.go | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/in_session.go b/in_session.go index b79e9c832..0ed53200c 100644 --- a/in_session.go +++ b/in_session.go @@ -230,6 +230,8 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int return state.generateSequenceReset(session, beginSeqNo, endSeqNo+1, inReplyTo) } + // resendMutex must always be locked before sendMutex to prevent a potential deadlock + // sendMutex is locked below in session.EnqueueBytesAndSend() session.resendMutex.Lock() defer session.resendMutex.Unlock() diff --git a/session.go b/session.go index 18b24fc82..35281e98e 100644 --- a/session.go +++ b/session.go @@ -40,7 +40,9 @@ type session struct { toSend [][]byte // Mutex for access to toSend. - sendMutex sync.Mutex + sendMutex sync.Mutex + // Mutex to prevent messages being sent when resendRequest is active + // Must be locked before sendMutex to prevent a potential deadlock resendMutex sync.RWMutex sessionEvent chan internal.Event @@ -303,6 +305,7 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error { return s.queueForSend(msg) } + // resendMutex must always be locked before sendMutex to prevent a potential deadlock s.resendMutex.RLock() defer s.resendMutex.RUnlock()