Skip to content

Make DB Store table names configurable #711

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,28 @@ const (
// - A valid go time.Duration
SQLStoreConnMaxLifetime string = "SQLStoreConnMaxLifetime"

// SQLStoreMessagesTableName defines the table name for the messages table. Default is "messages".
// If you use a different table name, you must set up your database accordingly.
//
// Required: No
//
// Default: messages
//
// Valid Values:
// - A valid string
SQLStoreMessagesTableName = "SQLStoreMessagesTableName"

// SQLStoreSessionsTableName defines the table name for the messages table. Default is "sessions".
// If you use a different table name, you must set up your database accordingly.
//
// Required: No
//
// Default: sessions
//
// Valid Values:
// - A valid string
SQLStoreSessionsTableName = "SQLStoreSessionsTableName"

// MongoStoreConnection sets the MongoDB connection URL to use for message storage.
//
// See https://pkg.go.dev/go.mongodb.org/mongo-driver/mongo#Connect for more information.
Expand Down
137 changes: 85 additions & 52 deletions store/sql/sql_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ import (
"github.com/quickfixgo/quickfix/config"
)

const (
defaultMessagesTable = "messages"
defaultSessionsTable = "sessions"
)

type sqlStoreFactory struct {
settings *quickfix.Settings
}
Expand All @@ -39,6 +44,19 @@ type sqlStore struct {
sqlConnMaxLifetime time.Duration
db *sql.DB
placeholder placeholderFunc
messagesTable string
sessionsTable string

sqlUpdateSeqNums string
sqlInsertSession string
sqlGetSeqNums string
sqlUpdateMessage string
sqlInsertMessage string
sqlGetMessages string
sqlUpdateSession string
sqlUpdateSenderSeqNum string
sqlUpdateTargetSeqNum string
sqlDeleteMessages string
}

type placeholderFunc func(int) string
Expand Down Expand Up @@ -88,17 +106,29 @@ func (f sqlStoreFactory) Create(sessionID quickfix.SessionID) (msgStore quickfix
if err != nil {
return nil, err
}

messagesTableName := defaultMessagesTable
if name, err := sessionSettings.Setting(config.SQLStoreMessagesTableName); err == nil {
messagesTableName = name
}

sessionsTableName := defaultSessionsTable
if name, err := sessionSettings.Setting(config.SQLStoreSessionsTableName); err == nil {
sessionsTableName = name
}

sqlConnMaxLifetime := 0 * time.Second
if sessionSettings.HasSetting(config.SQLStoreConnMaxLifetime) {
sqlConnMaxLifetime, err = sessionSettings.DurationSetting(config.SQLStoreConnMaxLifetime)
if err != nil {
return nil, err
}
}
return newSQLStore(sessionID, sqlDriver, sqlDataSourceName, sqlConnMaxLifetime)

return newSQLStore(sessionID, sqlDriver, sqlDataSourceName, messagesTableName, sessionsTableName, sqlConnMaxLifetime)
}

func newSQLStore(sessionID quickfix.SessionID, driver string, dataSourceName string, connMaxLifetime time.Duration) (store *sqlStore, err error) {
func newSQLStore(sessionID quickfix.SessionID, driver, dataSourceName, messagesTableName, sessionsTableName string, connMaxLifetime time.Duration) (store *sqlStore, err error) {

memStore, memErr := quickfix.NewMemoryStoreFactory().Create(sessionID)
if memErr != nil {
Expand All @@ -112,6 +142,8 @@ func newSQLStore(sessionID quickfix.SessionID, driver string, dataSourceName str
sqlDriver: driver,
sqlDataSourceName: dataSourceName,
sqlConnMaxLifetime: connMaxLifetime,
messagesTable: messagesTableName,
sessionsTable: sessionsTableName,
}
if err = store.cache.Reset(); err != nil {
err = errors.Wrap(err, "cache reset")
Expand All @@ -130,20 +162,58 @@ func newSQLStore(sessionID quickfix.SessionID, driver string, dataSourceName str
if err = store.db.Ping(); err != nil { // ensure immediate connection
return nil, err
}

store.setSQLStatements()

if err = store.populateCache(); err != nil {
return nil, err
}

return store, nil
}

func (store *sqlStore) setSQLStatements() {
idColumns := `beginstring, session_qualifier, sendercompid, sendersubid, senderlocid, targetcompid, targetsubid, targetlocid`
idPlaceholders := `?,?,?,?,?,?,?,?`
idWhereClause := `beginstring=? AND session_qualifier=? AND sendercompid=? AND sendersubid=? AND senderlocid=? AND targetcompid=? AND targetsubid=? AND targetlocid=?`

store.sqlInsertMessage = fmt.Sprintf(`INSERT INTO %s (
msgseqnum, message, %s) VALUES (?, ?, %s)`,
store.messagesTable, idColumns, idPlaceholders)

store.sqlUpdateMessage = fmt.Sprintf(`UPDATE %s SET message=? WHERE %s AND msgseqnum=?`,
store.messagesTable, idWhereClause)

store.sqlGetMessages = fmt.Sprintf(`SELECT message FROM %s WHERE %s AND msgseqnum>=? AND msgseqnum<=? ORDER BY msgseqnum`,
store.messagesTable, idWhereClause)

store.sqlDeleteMessages = fmt.Sprintf(`DELETE FROM %s WHERE %s`,
store.messagesTable, idWhereClause)

store.sqlInsertSession = fmt.Sprintf(`INSERT INTO %s (
creation_time, incoming_seqnum, outgoing_seqnum, %s) VALUES (?, ?, ?, %s)`,
store.sessionsTable, idColumns, idPlaceholders)

store.sqlGetSeqNums = fmt.Sprintf(`SELECT creation_time, incoming_seqnum, outgoing_seqnum FROM %s WHERE %s`,
store.sessionsTable, idWhereClause)

store.sqlUpdateSession = fmt.Sprintf(`UPDATE %s SET creation_time=?, incoming_seqnum=?, outgoing_seqnum=? WHERE %s`,
store.sessionsTable, idWhereClause)

store.sqlUpdateSenderSeqNum = fmt.Sprintf(`UPDATE %s SET outgoing_seqnum=? WHERE %s`,
store.sessionsTable, idWhereClause)

store.sqlUpdateTargetSeqNum = fmt.Sprintf(`UPDATE %s SET incoming_seqnum=? WHERE %s`,
store.sessionsTable, idWhereClause)

store.sqlUpdateSeqNums = fmt.Sprintf(`UPDATE %s SET incoming_seqnum=?, outgoing_seqnum=? WHERE %s`,
store.sessionsTable, idWhereClause)
}

// Reset deletes the store records and sets the seqnums back to 1.
func (store *sqlStore) Reset() error {
s := store.sessionID
_, err := store.db.Exec(sqlString(`DELETE FROM messages
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder),
_, err := store.db.Exec(sqlString(store.sqlDeleteMessages, store.placeholder),
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
Expand All @@ -155,11 +225,7 @@ func (store *sqlStore) Reset() error {
return err
}

_, err = store.db.Exec(sqlString(`UPDATE sessions
SET creation_time=?, incoming_seqnum=?, outgoing_seqnum=?
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder),
_, err = store.db.Exec(sqlString(store.sqlUpdateSession, store.placeholder),
store.cache.CreationTime(), store.cache.NextTargetMsgSeqNum(), store.cache.NextSenderMsgSeqNum(),
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
Expand All @@ -180,11 +246,7 @@ func (store *sqlStore) populateCache() error {
s := store.sessionID
var creationTime time.Time
var incomingSeqNum, outgoingSeqNum int
row := store.db.QueryRow(sqlString(`SELECT creation_time, incoming_seqnum, outgoing_seqnum
FROM sessions
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder),
row := store.db.QueryRow(sqlString(store.sqlGetSeqNums, store.placeholder),
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
Expand All @@ -209,12 +271,7 @@ func (store *sqlStore) populateCache() error {
}

// session record not found, create it
_, err = store.db.Exec(sqlString(`INSERT INTO sessions (
creation_time, incoming_seqnum, outgoing_seqnum,
beginstring, session_qualifier,
sendercompid, sendersubid, senderlocid,
targetcompid, targetsubid, targetlocid)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, store.placeholder),
_, err = store.db.Exec(sqlString(store.sqlInsertSession, store.placeholder),
store.cache.CreationTime(),
store.cache.NextTargetMsgSeqNum(),
store.cache.NextSenderMsgSeqNum(),
Expand All @@ -238,10 +295,7 @@ func (store *sqlStore) NextTargetMsgSeqNum() int {
// SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent.
func (store *sqlStore) SetNextSenderMsgSeqNum(next int) error {
s := store.sessionID
_, err := store.db.Exec(sqlString(`UPDATE sessions SET outgoing_seqnum = ?
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder),
_, err := store.db.Exec(sqlString(store.sqlUpdateSenderSeqNum, store.placeholder),
next, s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
Expand All @@ -254,10 +308,7 @@ func (store *sqlStore) SetNextSenderMsgSeqNum(next int) error {
// SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received.
func (store *sqlStore) SetNextTargetMsgSeqNum(next int) error {
s := store.sessionID
_, err := store.db.Exec(sqlString(`UPDATE sessions SET incoming_seqnum = ?
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder),
_, err := store.db.Exec(sqlString(store.sqlUpdateTargetSeqNum, store.placeholder),
next, s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
Expand Down Expand Up @@ -295,12 +346,7 @@ func (store *sqlStore) SetCreationTime(_ time.Time) {
func (store *sqlStore) SaveMessage(seqNum int, msg []byte) error {
s := store.sessionID

_, err := store.db.Exec(sqlString(`INSERT INTO messages (
msgseqnum, message,
beginstring, session_qualifier,
sendercompid, sendersubid, senderlocid,
targetcompid, targetsubid, targetlocid)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, store.placeholder),
_, err := store.db.Exec(sqlString(store.sqlInsertMessage, store.placeholder),
seqNum, string(msg),
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
Expand All @@ -318,12 +364,7 @@ func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []b
}
defer tx.Rollback()

_, err = tx.Exec(sqlString(`INSERT INTO messages (
msgseqnum, message,
beginstring, session_qualifier,
sendercompid, sendersubid, senderlocid,
targetcompid, targetsubid, targetlocid)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, store.placeholder),
_, err = tx.Exec(sqlString(store.sqlInsertMessage, store.placeholder),
seqNum, string(msg),
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
Expand All @@ -333,10 +374,7 @@ func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []b
}

next := store.cache.NextSenderMsgSeqNum() + 1
_, err = tx.Exec(sqlString(`UPDATE sessions SET outgoing_seqnum = ?
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder),
_, err = tx.Exec(sqlString(store.sqlUpdateSenderSeqNum, store.placeholder),
next, s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID)
Expand All @@ -354,12 +392,7 @@ func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []b

func (store *sqlStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error {
s := store.sessionID
rows, err := store.db.Query(sqlString(`SELECT message FROM messages
WHERE beginstring=? AND session_qualifier=?
AND sendercompid=? AND sendersubid=? AND senderlocid=?
AND targetcompid=? AND targetsubid=? AND targetlocid=?
AND msgseqnum>=? AND msgseqnum<=?
ORDER BY msgseqnum`, store.placeholder),
rows, err := store.db.Query(sqlString(store.sqlGetMessages, store.placeholder),
s.BeginString, s.Qualifier,
s.SenderCompID, s.SenderSubID, s.SenderLocationID,
s.TargetCompID, s.TargetSubID, s.TargetLocationID,
Expand Down
78 changes: 78 additions & 0 deletions store/sql/sql_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,84 @@ func (suite *SQLStoreTestSuite) TestSqlPlaceholderReplacement() {
suite.Equal("A $1 B $2 C $3", got)
}

func (suite *SQLStoreTestSuite) TestStoreTableRenameOverride() {
sqlDriver := "sqlite3"
sqlDsn := path.Join(suite.sqlStoreRootPath, fmt.Sprintf("rename-override-%d.db", time.Now().UnixNano()))

// Create DB with original schema
db, err := sql.Open(sqlDriver, sqlDsn)
require.NoError(suite.T(), err)

ddlFnames, err := filepath.Glob(fmt.Sprintf("../../_sql/%s/*.sql", sqlDriver))
require.NoError(suite.T(), err)
for _, fname := range ddlFnames {
sqlBytes, err := os.ReadFile(fname)
require.NoError(suite.T(), err)
_, err = db.Exec(string(sqlBytes))
require.NoError(suite.T(), err)
}

// Rename tables
_, err = db.Exec(`ALTER TABLE sessions RENAME TO renamed_sessions`)
require.NoError(suite.T(), err)
_, err = db.Exec(`ALTER TABLE messages RENAME TO renamed_messages`)
require.NoError(suite.T(), err)

// Set config to use renamed tables
sessionID := quickfix.SessionID{BeginString: "FIX.4.4", SenderCompID: "SENDER", TargetCompID: "TARGET"}
settings, err := quickfix.ParseSettings(strings.NewReader(fmt.Sprintf(`
[DEFAULT]
SQLStoreDriver=%s
SQLStoreDataSourceName=%s
SQLStoreSessionsTableName=renamed_sessions
SQLStoreMessagesTableName=renamed_messages

[SESSION]
BeginString=%s
SenderCompID=%s
TargetCompID=%s
`, sqlDriver, sqlDsn, sessionID.BeginString, sessionID.SenderCompID, sessionID.TargetCompID)))
require.NoError(suite.T(), err)

// Create store with renamed table config
store, err := NewStoreFactory(settings).Create(sessionID)
require.NoError(suite.T(), err)
defer store.Close()

// SaveMessage + SetNextSenderMsgSeqNum
msg := []byte("8=FIX.4.4\x019=12\x0135=0\x01")
require.NoError(suite.T(), store.SaveMessage(1, msg))
require.NoError(suite.T(), store.SetNextSenderMsgSeqNum(2))
require.NoError(suite.T(), store.SetNextTargetMsgSeqNum(2))

// SaveMessageAndIncrNextSenderMsgSeqNum
require.NoError(suite.T(), store.SaveMessageAndIncrNextSenderMsgSeqNum(2, msg))

// Get and check sequence numbers
nextSender := store.NextSenderMsgSeqNum()
suite.Equal(3, nextSender)
nextTarget := store.NextTargetMsgSeqNum()
suite.Equal(2, nextTarget)

// IterateMessages
count := 0
err = store.IterateMessages(1, 2, func(_ []byte) error {
count++
return nil
})
require.NoError(suite.T(), err)
suite.Equal(2, count)

// Reset
require.NoError(suite.T(), store.Reset())

// After reset, sequence numbers should be 1
nextSender = store.NextSenderMsgSeqNum()
suite.Equal(1, nextSender)
nextTarget = store.NextTargetMsgSeqNum()
suite.Equal(1, nextTarget)
}

func (suite *SQLStoreTestSuite) TearDownTest() {
suite.MsgStore.Close()
os.RemoveAll(suite.sqlStoreRootPath)
Expand Down
Loading