diff --git a/config/configuration.go b/config/configuration.go index 2d95bf11d..c2c89b931 100644 --- a/config/configuration.go +++ b/config/configuration.go @@ -1048,6 +1048,28 @@ const ( // - A valid go time.Duration SQLStoreConnMaxLifetime string = "SQLStoreConnMaxLifetime" + // SQLStoreMessagesTableName defines the table name for the messages table. Default is "messages". + // If you use a different table name, you must set up your database accordingly. + // + // Required: No + // + // Default: messages + // + // Valid Values: + // - A valid string + SQLStoreMessagesTableName = "SQLStoreMessagesTableName" + + // SQLStoreSessionsTableName defines the table name for the messages table. Default is "sessions". + // If you use a different table name, you must set up your database accordingly. + // + // Required: No + // + // Default: sessions + // + // Valid Values: + // - A valid string + SQLStoreSessionsTableName = "SQLStoreSessionsTableName" + // MongoStoreConnection sets the MongoDB connection URL to use for message storage. // // See https://pkg.go.dev/go.mongodb.org/mongo-driver/mongo#Connect for more information. diff --git a/store/sql/sql_store.go b/store/sql/sql_store.go index aeaeb6eb3..c1162464a 100644 --- a/store/sql/sql_store.go +++ b/store/sql/sql_store.go @@ -27,6 +27,11 @@ import ( "github.com/quickfixgo/quickfix/config" ) +const ( + defaultMessagesTable = "messages" + defaultSessionsTable = "sessions" +) + type sqlStoreFactory struct { settings *quickfix.Settings } @@ -39,6 +44,19 @@ type sqlStore struct { sqlConnMaxLifetime time.Duration db *sql.DB placeholder placeholderFunc + messagesTable string + sessionsTable string + + sqlUpdateSeqNums string + sqlInsertSession string + sqlGetSeqNums string + sqlUpdateMessage string + sqlInsertMessage string + sqlGetMessages string + sqlUpdateSession string + sqlUpdateSenderSeqNum string + sqlUpdateTargetSeqNum string + sqlDeleteMessages string } type placeholderFunc func(int) string @@ -88,6 +106,17 @@ func (f sqlStoreFactory) Create(sessionID quickfix.SessionID) (msgStore quickfix if err != nil { return nil, err } + + messagesTableName := defaultMessagesTable + if name, err := sessionSettings.Setting(config.SQLStoreMessagesTableName); err == nil { + messagesTableName = name + } + + sessionsTableName := defaultSessionsTable + if name, err := sessionSettings.Setting(config.SQLStoreSessionsTableName); err == nil { + sessionsTableName = name + } + sqlConnMaxLifetime := 0 * time.Second if sessionSettings.HasSetting(config.SQLStoreConnMaxLifetime) { sqlConnMaxLifetime, err = sessionSettings.DurationSetting(config.SQLStoreConnMaxLifetime) @@ -95,10 +124,11 @@ func (f sqlStoreFactory) Create(sessionID quickfix.SessionID) (msgStore quickfix return nil, err } } - return newSQLStore(sessionID, sqlDriver, sqlDataSourceName, sqlConnMaxLifetime) + + return newSQLStore(sessionID, sqlDriver, sqlDataSourceName, messagesTableName, sessionsTableName, sqlConnMaxLifetime) } -func newSQLStore(sessionID quickfix.SessionID, driver string, dataSourceName string, connMaxLifetime time.Duration) (store *sqlStore, err error) { +func newSQLStore(sessionID quickfix.SessionID, driver, dataSourceName, messagesTableName, sessionsTableName string, connMaxLifetime time.Duration) (store *sqlStore, err error) { memStore, memErr := quickfix.NewMemoryStoreFactory().Create(sessionID) if memErr != nil { @@ -112,6 +142,8 @@ func newSQLStore(sessionID quickfix.SessionID, driver string, dataSourceName str sqlDriver: driver, sqlDataSourceName: dataSourceName, sqlConnMaxLifetime: connMaxLifetime, + messagesTable: messagesTableName, + sessionsTable: sessionsTableName, } if err = store.cache.Reset(); err != nil { err = errors.Wrap(err, "cache reset") @@ -130,6 +162,9 @@ func newSQLStore(sessionID quickfix.SessionID, driver string, dataSourceName str if err = store.db.Ping(); err != nil { // ensure immediate connection return nil, err } + + store.setSQLStatements() + if err = store.populateCache(); err != nil { return nil, err } @@ -137,13 +172,48 @@ func newSQLStore(sessionID quickfix.SessionID, driver string, dataSourceName str return store, nil } +func (store *sqlStore) setSQLStatements() { + idColumns := `beginstring, session_qualifier, sendercompid, sendersubid, senderlocid, targetcompid, targetsubid, targetlocid` + idPlaceholders := `?,?,?,?,?,?,?,?` + idWhereClause := `beginstring=? AND session_qualifier=? AND sendercompid=? AND sendersubid=? AND senderlocid=? AND targetcompid=? AND targetsubid=? AND targetlocid=?` + + store.sqlInsertMessage = fmt.Sprintf(`INSERT INTO %s ( + msgseqnum, message, %s) VALUES (?, ?, %s)`, + store.messagesTable, idColumns, idPlaceholders) + + store.sqlUpdateMessage = fmt.Sprintf(`UPDATE %s SET message=? WHERE %s AND msgseqnum=?`, + store.messagesTable, idWhereClause) + + store.sqlGetMessages = fmt.Sprintf(`SELECT message FROM %s WHERE %s AND msgseqnum>=? AND msgseqnum<=? ORDER BY msgseqnum`, + store.messagesTable, idWhereClause) + + store.sqlDeleteMessages = fmt.Sprintf(`DELETE FROM %s WHERE %s`, + store.messagesTable, idWhereClause) + + store.sqlInsertSession = fmt.Sprintf(`INSERT INTO %s ( + creation_time, incoming_seqnum, outgoing_seqnum, %s) VALUES (?, ?, ?, %s)`, + store.sessionsTable, idColumns, idPlaceholders) + + store.sqlGetSeqNums = fmt.Sprintf(`SELECT creation_time, incoming_seqnum, outgoing_seqnum FROM %s WHERE %s`, + store.sessionsTable, idWhereClause) + + store.sqlUpdateSession = fmt.Sprintf(`UPDATE %s SET creation_time=?, incoming_seqnum=?, outgoing_seqnum=? WHERE %s`, + store.sessionsTable, idWhereClause) + + store.sqlUpdateSenderSeqNum = fmt.Sprintf(`UPDATE %s SET outgoing_seqnum=? WHERE %s`, + store.sessionsTable, idWhereClause) + + store.sqlUpdateTargetSeqNum = fmt.Sprintf(`UPDATE %s SET incoming_seqnum=? WHERE %s`, + store.sessionsTable, idWhereClause) + + store.sqlUpdateSeqNums = fmt.Sprintf(`UPDATE %s SET incoming_seqnum=?, outgoing_seqnum=? WHERE %s`, + store.sessionsTable, idWhereClause) +} + // Reset deletes the store records and sets the seqnums back to 1. func (store *sqlStore) Reset() error { s := store.sessionID - _, err := store.db.Exec(sqlString(`DELETE FROM messages - WHERE beginstring=? AND session_qualifier=? - AND sendercompid=? AND sendersubid=? AND senderlocid=? - AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder), + _, err := store.db.Exec(sqlString(store.sqlDeleteMessages, store.placeholder), s.BeginString, s.Qualifier, s.SenderCompID, s.SenderSubID, s.SenderLocationID, s.TargetCompID, s.TargetSubID, s.TargetLocationID) @@ -155,11 +225,7 @@ func (store *sqlStore) Reset() error { return err } - _, err = store.db.Exec(sqlString(`UPDATE sessions - SET creation_time=?, incoming_seqnum=?, outgoing_seqnum=? - WHERE beginstring=? AND session_qualifier=? - AND sendercompid=? AND sendersubid=? AND senderlocid=? - AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder), + _, err = store.db.Exec(sqlString(store.sqlUpdateSession, store.placeholder), store.cache.CreationTime(), store.cache.NextTargetMsgSeqNum(), store.cache.NextSenderMsgSeqNum(), s.BeginString, s.Qualifier, s.SenderCompID, s.SenderSubID, s.SenderLocationID, @@ -180,11 +246,7 @@ func (store *sqlStore) populateCache() error { s := store.sessionID var creationTime time.Time var incomingSeqNum, outgoingSeqNum int - row := store.db.QueryRow(sqlString(`SELECT creation_time, incoming_seqnum, outgoing_seqnum - FROM sessions - WHERE beginstring=? AND session_qualifier=? - AND sendercompid=? AND sendersubid=? AND senderlocid=? - AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder), + row := store.db.QueryRow(sqlString(store.sqlGetSeqNums, store.placeholder), s.BeginString, s.Qualifier, s.SenderCompID, s.SenderSubID, s.SenderLocationID, s.TargetCompID, s.TargetSubID, s.TargetLocationID) @@ -209,12 +271,7 @@ func (store *sqlStore) populateCache() error { } // session record not found, create it - _, err = store.db.Exec(sqlString(`INSERT INTO sessions ( - creation_time, incoming_seqnum, outgoing_seqnum, - beginstring, session_qualifier, - sendercompid, sendersubid, senderlocid, - targetcompid, targetsubid, targetlocid) - VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, store.placeholder), + _, err = store.db.Exec(sqlString(store.sqlInsertSession, store.placeholder), store.cache.CreationTime(), store.cache.NextTargetMsgSeqNum(), store.cache.NextSenderMsgSeqNum(), @@ -238,10 +295,7 @@ func (store *sqlStore) NextTargetMsgSeqNum() int { // SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent. func (store *sqlStore) SetNextSenderMsgSeqNum(next int) error { s := store.sessionID - _, err := store.db.Exec(sqlString(`UPDATE sessions SET outgoing_seqnum = ? - WHERE beginstring=? AND session_qualifier=? - AND sendercompid=? AND sendersubid=? AND senderlocid=? - AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder), + _, err := store.db.Exec(sqlString(store.sqlUpdateSenderSeqNum, store.placeholder), next, s.BeginString, s.Qualifier, s.SenderCompID, s.SenderSubID, s.SenderLocationID, s.TargetCompID, s.TargetSubID, s.TargetLocationID) @@ -254,10 +308,7 @@ func (store *sqlStore) SetNextSenderMsgSeqNum(next int) error { // SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received. func (store *sqlStore) SetNextTargetMsgSeqNum(next int) error { s := store.sessionID - _, err := store.db.Exec(sqlString(`UPDATE sessions SET incoming_seqnum = ? - WHERE beginstring=? AND session_qualifier=? - AND sendercompid=? AND sendersubid=? AND senderlocid=? - AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder), + _, err := store.db.Exec(sqlString(store.sqlUpdateTargetSeqNum, store.placeholder), next, s.BeginString, s.Qualifier, s.SenderCompID, s.SenderSubID, s.SenderLocationID, s.TargetCompID, s.TargetSubID, s.TargetLocationID) @@ -295,12 +346,7 @@ func (store *sqlStore) SetCreationTime(_ time.Time) { func (store *sqlStore) SaveMessage(seqNum int, msg []byte) error { s := store.sessionID - _, err := store.db.Exec(sqlString(`INSERT INTO messages ( - msgseqnum, message, - beginstring, session_qualifier, - sendercompid, sendersubid, senderlocid, - targetcompid, targetsubid, targetlocid) - VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, store.placeholder), + _, err := store.db.Exec(sqlString(store.sqlInsertMessage, store.placeholder), seqNum, string(msg), s.BeginString, s.Qualifier, s.SenderCompID, s.SenderSubID, s.SenderLocationID, @@ -318,12 +364,7 @@ func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []b } defer tx.Rollback() - _, err = tx.Exec(sqlString(`INSERT INTO messages ( - msgseqnum, message, - beginstring, session_qualifier, - sendercompid, sendersubid, senderlocid, - targetcompid, targetsubid, targetlocid) - VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, store.placeholder), + _, err = tx.Exec(sqlString(store.sqlInsertMessage, store.placeholder), seqNum, string(msg), s.BeginString, s.Qualifier, s.SenderCompID, s.SenderSubID, s.SenderLocationID, @@ -333,10 +374,7 @@ func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []b } next := store.cache.NextSenderMsgSeqNum() + 1 - _, err = tx.Exec(sqlString(`UPDATE sessions SET outgoing_seqnum = ? - WHERE beginstring=? AND session_qualifier=? - AND sendercompid=? AND sendersubid=? AND senderlocid=? - AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder), + _, err = tx.Exec(sqlString(store.sqlUpdateSenderSeqNum, store.placeholder), next, s.BeginString, s.Qualifier, s.SenderCompID, s.SenderSubID, s.SenderLocationID, s.TargetCompID, s.TargetSubID, s.TargetLocationID) @@ -354,12 +392,7 @@ func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []b func (store *sqlStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error { s := store.sessionID - rows, err := store.db.Query(sqlString(`SELECT message FROM messages - WHERE beginstring=? AND session_qualifier=? - AND sendercompid=? AND sendersubid=? AND senderlocid=? - AND targetcompid=? AND targetsubid=? AND targetlocid=? - AND msgseqnum>=? AND msgseqnum<=? - ORDER BY msgseqnum`, store.placeholder), + rows, err := store.db.Query(sqlString(store.sqlGetMessages, store.placeholder), s.BeginString, s.Qualifier, s.SenderCompID, s.SenderSubID, s.SenderLocationID, s.TargetCompID, s.TargetSubID, s.TargetLocationID, diff --git a/store/sql/sql_store_test.go b/store/sql/sql_store_test.go index 73c1d747a..e988e807e 100644 --- a/store/sql/sql_store_test.go +++ b/store/sql/sql_store_test.go @@ -81,6 +81,84 @@ func (suite *SQLStoreTestSuite) TestSqlPlaceholderReplacement() { suite.Equal("A $1 B $2 C $3", got) } +func (suite *SQLStoreTestSuite) TestStoreTableRenameOverride() { + sqlDriver := "sqlite3" + sqlDsn := path.Join(suite.sqlStoreRootPath, fmt.Sprintf("rename-override-%d.db", time.Now().UnixNano())) + + // Create DB with original schema + db, err := sql.Open(sqlDriver, sqlDsn) + require.NoError(suite.T(), err) + + ddlFnames, err := filepath.Glob(fmt.Sprintf("../../_sql/%s/*.sql", sqlDriver)) + require.NoError(suite.T(), err) + for _, fname := range ddlFnames { + sqlBytes, err := os.ReadFile(fname) + require.NoError(suite.T(), err) + _, err = db.Exec(string(sqlBytes)) + require.NoError(suite.T(), err) + } + + // Rename tables + _, err = db.Exec(`ALTER TABLE sessions RENAME TO renamed_sessions`) + require.NoError(suite.T(), err) + _, err = db.Exec(`ALTER TABLE messages RENAME TO renamed_messages`) + require.NoError(suite.T(), err) + + // Set config to use renamed tables + sessionID := quickfix.SessionID{BeginString: "FIX.4.4", SenderCompID: "SENDER", TargetCompID: "TARGET"} + settings, err := quickfix.ParseSettings(strings.NewReader(fmt.Sprintf(` +[DEFAULT] +SQLStoreDriver=%s +SQLStoreDataSourceName=%s +SQLStoreSessionsTableName=renamed_sessions +SQLStoreMessagesTableName=renamed_messages + +[SESSION] +BeginString=%s +SenderCompID=%s +TargetCompID=%s +`, sqlDriver, sqlDsn, sessionID.BeginString, sessionID.SenderCompID, sessionID.TargetCompID))) + require.NoError(suite.T(), err) + + // Create store with renamed table config + store, err := NewStoreFactory(settings).Create(sessionID) + require.NoError(suite.T(), err) + defer store.Close() + + // SaveMessage + SetNextSenderMsgSeqNum + msg := []byte("8=FIX.4.4\x019=12\x0135=0\x01") + require.NoError(suite.T(), store.SaveMessage(1, msg)) + require.NoError(suite.T(), store.SetNextSenderMsgSeqNum(2)) + require.NoError(suite.T(), store.SetNextTargetMsgSeqNum(2)) + + // SaveMessageAndIncrNextSenderMsgSeqNum + require.NoError(suite.T(), store.SaveMessageAndIncrNextSenderMsgSeqNum(2, msg)) + + // Get and check sequence numbers + nextSender := store.NextSenderMsgSeqNum() + suite.Equal(3, nextSender) + nextTarget := store.NextTargetMsgSeqNum() + suite.Equal(2, nextTarget) + + // IterateMessages + count := 0 + err = store.IterateMessages(1, 2, func(_ []byte) error { + count++ + return nil + }) + require.NoError(suite.T(), err) + suite.Equal(2, count) + + // Reset + require.NoError(suite.T(), store.Reset()) + + // After reset, sequence numbers should be 1 + nextSender = store.NextSenderMsgSeqNum() + suite.Equal(1, nextSender) + nextTarget = store.NextTargetMsgSeqNum() + suite.Equal(1, nextTarget) +} + func (suite *SQLStoreTestSuite) TearDownTest() { suite.MsgStore.Close() os.RemoveAll(suite.sqlStoreRootPath)