Skip to content

Only lock fieldmap once during message parsing #658

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 55 additions & 3 deletions field_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,20 @@ func (m FieldMap) GetField(tag Tag, parser FieldValueReader) MessageRejectError
return nil
}

// GetField parses of a field with Tag tag. Returned reject may indicate the field is not present, or the field value is invalid.
func (m FieldMap) getFieldNoLock(tag Tag, parser FieldValueReader) MessageRejectError {
f, ok := m.tagLookup[tag]
if !ok {
return ConditionallyRequiredFieldMissing(tag)
}

if err := parser.Read(f[0].value); err != nil {
return IncorrectDataFormatForValue(tag)
}

return nil
}

// GetBytes is a zero-copy GetField wrapper for []bytes fields.
func (m FieldMap) GetBytes(tag Tag) ([]byte, MessageRejectError) {
m.rwLock.RLock()
Expand All @@ -128,6 +142,16 @@ func (m FieldMap) GetBytes(tag Tag) ([]byte, MessageRejectError) {
return f[0].value, nil
}

// getBytesNoLock is a lock free zero-copy GetField wrapper for []bytes fields.
func (m FieldMap) getBytesNoLock(tag Tag) ([]byte, MessageRejectError) {
f, ok := m.tagLookup[tag]
if !ok {
return nil, ConditionallyRequiredFieldMissing(tag)
}

return f[0].value, nil
}

// GetBool is a GetField wrapper for bool fields.
func (m FieldMap) GetBool(tag Tag) (bool, MessageRejectError) {
var val FIXBoolean
Expand All @@ -152,6 +176,21 @@ func (m FieldMap) GetInt(tag Tag) (int, MessageRejectError) {
return int(val), err
}

// GetInt is a lock free GetField wrapper for int fields.
func (m FieldMap) getIntNoLock(tag Tag) (int, MessageRejectError) {
bytes, err := m.getBytesNoLock(tag)
if err != nil {
return 0, err
}

var val FIXInt
if val.Read(bytes) != nil {
err = IncorrectDataFormatForValue(tag)
}

return int(val), err
}

// GetTime is a GetField wrapper for utc timestamp fields.
func (m FieldMap) GetTime(tag Tag) (t time.Time, err MessageRejectError) {
m.rwLock.RLock()
Expand Down Expand Up @@ -179,6 +218,15 @@ func (m FieldMap) GetString(tag Tag) (string, MessageRejectError) {
return string(val), nil
}

// GetString is a GetField wrapper for string fields.
func (m FieldMap) getStringNoLock(tag Tag) (string, MessageRejectError) {
var val FIXString
if err := m.getFieldNoLock(tag, &val); err != nil {
return "", err
}
return string(val), nil
}

// GetGroup is a Get function specific to Group Fields.
func (m FieldMap) GetGroup(parser FieldGroupReader) MessageRejectError {
m.rwLock.RLock()
Expand Down Expand Up @@ -246,6 +294,13 @@ func (m *FieldMap) Clear() {
}
}

func (m *FieldMap) clearNoLock() {
m.tags = m.tags[0:0]
for k := range m.tagLookup {
delete(m.tagLookup, k)
}
}

// CopyInto overwrites the given FieldMap with this one.
func (m *FieldMap) CopyInto(to *FieldMap) {
m.rwLock.RLock()
Expand All @@ -263,9 +318,6 @@ func (m *FieldMap) CopyInto(to *FieldMap) {
}

func (m *FieldMap) add(f field) {
m.rwLock.Lock()
defer m.rwLock.Unlock()

t := fieldTag(f)
if _, ok := m.tagLookup[t]; !ok {
m.tags = append(m.tags, t)
Expand Down
25 changes: 18 additions & 7 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,17 @@ func ParseMessageWithDataDictionary(

// doParsing executes the message parsing process.
func doParsing(mp *msgParser) (err error) {
mp.msg.Header.rwLock.Lock()
defer mp.msg.Header.rwLock.Unlock()
mp.msg.Body.rwLock.Lock()
defer mp.msg.Body.rwLock.Unlock()
mp.msg.Trailer.rwLock.Lock()
defer mp.msg.Trailer.rwLock.Unlock()

// Initialize for parsing.
mp.msg.Header.Clear()
mp.msg.Body.Clear()
mp.msg.Trailer.Clear()
mp.msg.Header.clearNoLock()
mp.msg.Body.clearNoLock()
mp.msg.Trailer.clearNoLock()

// Allocate expected message fields in one chunk.
fieldCount := 0
Expand Down Expand Up @@ -267,7 +274,7 @@ func doParsing(mp *msgParser) (err error) {
}

if mp.parsedFieldBytes.tag == tagXMLDataLen {
xmlDataLen, _ = mp.msg.Header.GetInt(tagXMLDataLen)
xmlDataLen, _ = mp.msg.Header.getIntNoLock(tagXMLDataLen)
}
mp.fieldIndex++
}
Expand All @@ -292,7 +299,7 @@ func doParsing(mp *msgParser) (err error) {
}
}

bodyLength, err := mp.msg.Header.GetInt(tagBodyLength)
bodyLength, err := mp.msg.Header.getIntNoLock(tagBodyLength)
if err != nil {
err = parseError{OrigError: err.Error()}
} else if length != bodyLength && !xmlDataMsg {
Expand Down Expand Up @@ -373,7 +380,7 @@ func parseGroup(mp *msgParser, tags []Tag) {
// tags slice will contain multiple tags if the tag in question is found while processing a group already.
func isNumInGroupField(msg *Message, tags []Tag, appDataDictionary *datadictionary.DataDictionary) bool {
if appDataDictionary != nil {
msgt, err := msg.MsgType()
msgt, err := msg.msgTypeNoLock()
if err != nil {
return false
}
Expand Down Expand Up @@ -406,7 +413,7 @@ func isNumInGroupField(msg *Message, tags []Tag, appDataDictionary *datadictiona
// tags slice will contain multiple tags if the tag in question is found while processing a group already.
func getGroupFields(msg *Message, tags []Tag, appDataDictionary *datadictionary.DataDictionary) (fields []*datadictionary.FieldDef) {
if appDataDictionary != nil {
msgt, err := msg.MsgType()
msgt, err := msg.msgTypeNoLock()
if err != nil {
return
}
Expand Down Expand Up @@ -476,6 +483,10 @@ func (m *Message) MsgType() (string, MessageRejectError) {
return m.Header.GetString(tagMsgType)
}

func (m *Message) msgTypeNoLock() (string, MessageRejectError) {
return m.Header.getStringNoLock(tagMsgType)
}

// IsMsgTypeOf returns true if the Header contains MsgType (tag 35) field and its value is the specified one.
func (m *Message) IsMsgTypeOf(msgType string) bool {
if v, err := m.MsgType(); err == nil {
Expand Down
Loading