Skip to content

Commit 5e11b35

Browse files
authored
Merge pull request #17 from larsnovikov/consistency
Consistency
2 parents e3259a2 + 07c99fd commit 5e11b35

File tree

23 files changed

+116
-124
lines changed

23 files changed

+116
-124
lines changed

src/.env.dist

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,19 @@ MASTER_USER=root
44
MASTER_PASS=123456
55
MASTER_TYPE=mysql
66
MASTER_DBNAME=test
7-
MASTER_RETRY_TIMEOUT = 30
8-
MASTER_RETRY_ATTEMPTS = 5
97

108
SLAVE_HOST=localhost
119
SLAVE_PORT=33062
1210
SLAVE_USER=root
1311
SLAVE_PASS=123456
1412
SLAVE_TYPE=mysql
1513
SLAVE_DBNAME=test
16-
SLAVE_RETRY_TIMEOUT = 30
17-
SLAVE_RETRY_ATTEMPTS = 5
1814

1915
REPLICATOR_HOST=localhost
2016
REPLICATOR_PORT=33061
2117
REPLICATOR_USER=root
2218
REPLICATOR_PASS=123456
2319
REPLICATOR_DBNAME=replicator
24-
REPLICATOR_RETRY_TIMEOUT = 30
25-
REPLICATOR_RETRY_ATTEMPTS = 5
2620

2721
ALLOWED_TABLES="user,post"
2822
CHANNEL_SIZE=99999999

src/connectors/clickhouse/connector.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/siddontang/go-log/log"
88
"horgh-replicator/src/constants"
99
"horgh-replicator/src/helpers"
10+
"horgh-replicator/src/tools/exit"
1011
"strconv"
1112
)
1213

@@ -45,7 +46,7 @@ func GetConnection(connection helpers.Storage, storageType string) interface{} {
4546
cred := helpers.GetCredentials(storageType).(helpers.CredentialsDB)
4647
conn, err := sqlx.Open("clickhouse", buildDSN(cred))
4748
if err != nil || conn.Ping() != nil {
48-
connection = helpers.Retry(storageType, cred.Credentials, connection, GetConnection).(helpers.Storage)
49+
exit.Fatal(constants.ErrorDBConnect, storageType)
4950
} else {
5051
connection = connect{conn}
5152
}

src/connectors/mysql/connector.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/siddontang/go-log/log"
88
"horgh-replicator/src/constants"
99
"horgh-replicator/src/helpers"
10+
"horgh-replicator/src/tools/exit"
1011
"strconv"
1112
)
1213

@@ -37,7 +38,7 @@ func (conn connect) Exec(params map[string]interface{}) bool {
3738
func (conn connect) Get(params map[string]interface{}) *sql.Rows {
3839
rows, err := conn.base.Query(fmt.Sprintf("%v", params["query"]), helpers.MakeSlice(params["params"])...)
3940
if err != nil {
40-
log.Fatal(err)
41+
exit.Fatal(err.Error())
4142
}
4243

4344
return rows
@@ -48,7 +49,7 @@ func GetConnection(connection helpers.Storage, storageType string) interface{} {
4849
cred := helpers.GetCredentials(storageType).(helpers.CredentialsDB)
4950
conn, err := sql.Open("mysql", buildDSN(cred))
5051
if err != nil || conn.Ping() != nil {
51-
connection = helpers.Retry(storageType, cred.Credentials, connection, GetConnection).(helpers.Storage)
52+
exit.Fatal(constants.ErrorDBConnect, storageType)
5253
} else {
5354
connection = connect{conn}
5455
}

src/connectors/postgresql/connector.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/siddontang/go-log/log"
99
"horgh-replicator/src/constants"
1010
"horgh-replicator/src/helpers"
11+
"horgh-replicator/src/tools/exit"
1112
"strconv"
1213
)
1314

@@ -40,7 +41,7 @@ func GetConnection(connection helpers.Storage, storageType string) interface{} {
4041
cred := helpers.GetCredentials(storageType).(helpers.CredentialsDB)
4142
conn, err := sql.Open("postgres", buildDSN(cred))
4243
if err != nil || conn.Ping() != nil {
43-
connection = helpers.Retry(storageType, cred.Credentials, connection, GetConnection).(helpers.Storage)
44+
exit.Fatal(constants.ErrorDBConnect, storageType)
4445
} else {
4546
connection = connect{conn}
4647
}

src/connectors/rabbitmq/connector.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"github.com/streadway/amqp"
66
"horgh-replicator/src/constants"
77
"horgh-replicator/src/helpers"
8+
"horgh-replicator/src/tools/exit"
89
"strconv"
910
)
1011

@@ -42,9 +43,9 @@ func GetConnection(connection helpers.Storage, storageType string) interface{} {
4243
if connection == nil || connection.Ping() == false {
4344
helpers.ParseAMQPConfig()
4445
cred := helpers.GetCredentials(storageType).(helpers.CredentialsAMQP)
45-
conn, err := amqp.Dial(buildRabbitmqString(cred))
46+
conn, err := amqp.Dial(buildDsn(cred))
4647
if err != nil {
47-
connection = helpers.Retry(storageType, cred.Credentials, connection, GetConnection).(helpers.Storage)
48+
exit.Fatal(constants.ErrorDBConnect, storageType)
4849
} else {
4950
connection = rabbitmqConnection{conn}
5051
}

src/connectors/vertica/vertica.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/siddontang/go-log/log"
88
"horgh-replicator/src/constants"
99
"horgh-replicator/src/helpers"
10+
"horgh-replicator/src/tools/exit"
1011
"strconv"
1112
)
1213

@@ -33,12 +34,12 @@ func (conn verticaConnection) Exec(params map[string]interface{}) bool {
3334
return true
3435
}
3536

36-
func GetConnection(connection helpers.Storage, dbName string) interface{} {
37+
func GetConnection(connection helpers.Storage, storageType string) interface{} {
3738
if connection == nil || connection.Ping() == false {
38-
cred := helpers.GetCredentials(dbName).(helpers.CredentialsDB)
39+
cred := helpers.GetCredentials(storageType).(helpers.CredentialsDB)
3940
conn, err := sqlx.Open("odbc", buildDSN(cred))
4041
if err != nil || conn.Ping() != nil {
41-
connection = helpers.Retry(dbName, cred.Credentials, connection, GetConnection).(helpers.Storage)
42+
exit.Fatal(constants.ErrorDBConnect, storageType)
4243
} else {
4344
connection = verticaConnection{conn}
4445
}

src/constants/messages.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ const (
99
MessageIgnoreDelete = "[time: %v][model: %s][pos: %v] ignore delete row"
1010
MessageIgnoreUpdate = "[time: %v][model: %s][pos: %v] ignore update row"
1111
MessageDeletedAll = "[time: %v][model: %s] delete all"
12-
MessageRetryConnect = "Retry to connect to \"%s\" after %s seconds..."
1312
MessageLogFileChanged = "Log file changed [model: %s] to \"%s\""
1413
MessageStopHandlingBinlog = "Stopping binlog handling..."
1514
MessageStopHandlingSave = "Stopping replication handling..."
@@ -21,4 +20,5 @@ const (
2120
MessageStopTableDestroy = "Can't stop table destroy"
2221
MessageTableDestroyed = "Table: \"%s\" destroyed"
2322
MessagePositionUpdated = "Position for table \"%s\" updated"
23+
MessageSysCallStop = "Stopped by syscall"
2424
)

src/helpers/config.go

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,15 @@ package helpers
22

33
import (
44
"github.com/joho/godotenv"
5-
"github.com/siddontang/go-log/log"
65
"horgh-replicator/src/constants"
6+
"horgh-replicator/src/tools/exit"
77
"os"
88
"strconv"
99
"strings"
1010
)
1111

1212
type Credentials struct {
13-
Type string
14-
RetryTimeout int
15-
RetryAttempts int
13+
Type string
1614
}
1715

1816
type CredentialsDB struct {
@@ -45,20 +43,13 @@ func MakeCredentials() {
4543
err := godotenv.Load()
4644

4745
if err != nil {
48-
log.Fatal("Error loading .env file")
46+
exit.Fatal("Error loading .env file")
4947
}
5048

51-
var timeout, attempts int
52-
53-
timeout, _ = strconv.Atoi(os.Getenv("MASTER_RETRY_TIMEOUT"))
54-
attempts, _ = strconv.Atoi(os.Getenv("MASTER_RETRY_ATTEMPTS"))
55-
5649
masterPort, _ := strconv.Atoi(os.Getenv("MASTER_PORT"))
5750
master = CredentialsDB{
5851
Credentials{
5952
os.Getenv("MASTER_TYPE"),
60-
timeout,
61-
attempts,
6253
},
6354
os.Getenv("MASTER_HOST"),
6455
masterPort,
@@ -67,14 +58,10 @@ func MakeCredentials() {
6758
os.Getenv("MASTER_DBNAME"),
6859
}
6960

70-
timeout, _ = strconv.Atoi(os.Getenv("REPLICATOR_RETRY_TIMEOUT"))
71-
attempts, _ = strconv.Atoi(os.Getenv("REPLICATOR_RETRY_ATTEMPTS"))
7261
replicationPort, _ := strconv.Atoi(os.Getenv("REPLICATOR_PORT"))
7362
replicator = CredentialsDB{
7463
Credentials{
7564
"mysql",
76-
timeout,
77-
attempts,
7865
},
7966
os.Getenv("REPLICATOR_HOST"),
8067
replicationPort,
@@ -136,15 +123,11 @@ func GetMasterLogFilePrefix() string {
136123
}
137124

138125
func ParseDBConfig() {
139-
timeout, _ := strconv.Atoi(os.Getenv("SLAVE_RETRY_TIMEOUT"))
140-
attempts, _ := strconv.Atoi(os.Getenv("SLAVE_RETRY_ATTEMPTS"))
141126
slavePort, _ := strconv.Atoi(os.Getenv("SLAVE_PORT"))
142127

143128
slave = CredentialsDB{
144129
Credentials{
145130
os.Getenv("SLAVE_TYPE"),
146-
timeout,
147-
attempts,
148131
},
149132
os.Getenv("SLAVE_HOST"),
150133
slavePort,
@@ -155,15 +138,11 @@ func ParseDBConfig() {
155138
}
156139

157140
func ParseAMQPConfig() {
158-
timeout, _ := strconv.Atoi(os.Getenv("SLAVE_RETRY_TIMEOUT"))
159-
attempts, _ := strconv.Atoi(os.Getenv("SLAVE_RETRY_ATTEMPTS"))
160141
slavePort, _ := strconv.Atoi(os.Getenv("SLAVE_PORT"))
161142

162143
slave = CredentialsAMQP{
163144
Credentials{
164145
os.Getenv("SLAVE_TYPE"),
165-
timeout,
166-
attempts,
167146
},
168147
os.Getenv("SLAVE_HOST"),
169148
slavePort,

src/helpers/functions.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,16 @@ package helpers
22

33
import (
44
"fmt"
5-
"github.com/siddontang/go-log/log"
65
"horgh-replicator/src/constants"
6+
"horgh-replicator/src/tools/exit"
77
"os"
88
"reflect"
99
)
1010

1111
func MakeSlice(input interface{}) []interface{} {
1212
s := reflect.ValueOf(input)
1313
if s.Kind() != reflect.Slice {
14-
log.Fatal(constants.ErrorSliceCreation)
14+
exit.Fatal(constants.ErrorSliceCreation)
1515
}
1616

1717
ret := make([]interface{}, s.Len())
@@ -37,13 +37,13 @@ func MakeHash(dbName string, table string) string {
3737
func ReadConfig(configName string) *os.File {
3838
fileName := fmt.Sprintf(constants.ConfigPath, configName)
3939
if _, err := os.Stat(fileName); os.IsNotExist(err) {
40-
log.Fatalf(constants.ErrorNoModelFile, fileName)
40+
exit.Fatal(constants.ErrorNoModelFile, fileName)
4141
}
4242

4343
jsonFile, err := os.Open(fileName)
4444

4545
if err != nil {
46-
log.Fatal(err)
46+
exit.Fatal(err.Error())
4747
}
4848

4949
return jsonFile

src/helpers/storage.go

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,6 @@
11
package helpers
22

3-
import (
4-
"fmt"
5-
"github.com/siddontang/go-log/log"
6-
"horgh-replicator/src/constants"
7-
"strconv"
8-
"time"
9-
)
10-
113
type Storage interface {
124
Ping() bool
135
Exec(params map[string]interface{}) bool
146
}
15-
16-
var retryCounter = map[string]int{
17-
constants.DBReplicator: 0,
18-
constants.DBSlave: 0,
19-
constants.DBMaster: 0,
20-
}
21-
22-
func Retry(storageType string, cred Credentials, connection Storage, method func(connection Storage, dbName string) interface{}) interface{} {
23-
if retryCounter[storageType] > cred.RetryAttempts {
24-
log.Fatal(fmt.Sprintf(constants.ErrorDBConnect, storageType))
25-
}
26-
27-
log.Infof(constants.MessageRetryConnect, storageType, strconv.Itoa(cred.RetryTimeout))
28-
29-
time.Sleep(time.Duration(cred.RetryTimeout) * time.Second)
30-
retryCounter[storageType]++
31-
32-
return method(connection, storageType)
33-
}

0 commit comments

Comments
 (0)