Skip to content

Commit cbbbd1d

Browse files
authored
Merge pull request #18 from larsnovikov/consistency
Consistency
2 parents 5e11b35 + 17553f4 commit cbbbd1d

File tree

14 files changed

+52
-111
lines changed

14 files changed

+52
-111
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ build/
77
.DS_Store
88
src/.env
99
src/vendor
10-
src/configs/*.json
10+
src/system/configs/*.json
11+
src/system/positions/*.txt
1112
src/plugins/user/*.go
1213
src/plugins/user/*.so
1314
src/horgh-replicator

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616
- Configure your my MySQL master as `/mysql/mysql.conf`.
1717
Don't forget to set `binlog_do_db=<master_db_name>` and restart MySQL service.
1818
- Execute `sql/structure.sql` in your MySQL master and slave.
19-
- Execute `sql/replicator.sql` in your MySQL. It will create database for system values.
2019
- Start Docker as `make start-dev`
2120
- Run as `cd src` and `go run main.go listen` in docker container.
2221

2322
### Testing
2423

25-
- Copy `examples/user.json` and `examples/post.json` to `src/configs`
24+
- Copy `examples/user.json` and `examples/post.json` to `src/system/configs`
2625
- Execute `cd src` and `go run main.go load`
2726

2827
### Add tables to replicator

sql/replicator.sql

Lines changed: 0 additions & 7 deletions
This file was deleted.

src/.env.dist

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,6 @@ SLAVE_PASS=123456
1212
SLAVE_TYPE=mysql
1313
SLAVE_DBNAME=test
1414

15-
REPLICATOR_HOST=localhost
16-
REPLICATOR_PORT=33061
17-
REPLICATOR_USER=root
18-
REPLICATOR_PASS=123456
19-
REPLICATOR_DBNAME=replicator
20-
2115
ALLOWED_TABLES="user,post"
2216
CHANNEL_SIZE=99999999
2317
SLAVE_ID=1

src/constants/constants.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ const (
44
PositionPosPrefix = "position_pos_%s"
55
PositionNamePrefix = "position_name_%s"
66
DBSlave = "slave"
7-
DBReplicator = "replicator"
87
DBMaster = "master"
9-
ConfigPath = "configs/%s.json"
8+
ConfigPath = "system/configs/%s.json"
109
PluginPath = "plugins/%s/handler.so"
10+
PositionsPath = "system/positions/%s.txt"
1111
)

src/constants/errors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package constants
22

33
const (
44
ErrorMysqlCanal = "Invalid canal"
5-
ErrorMysqlPosition = "Invalid position"
5+
ErrorGetPosition = "Invalid position. Error: \"%s\""
66
ErrorSliceCreation = "InterfaceSlice() given a non-slice type"
77
ErrorNoColumn = "There is no column %s in %s.%s"
88
ErrorDBConnect = "Can't connect to \"%s\""

src/helpers/config.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ type CredentialsAMQP struct {
3232

3333
var master CredentialsDB
3434
var slave interface{}
35-
var replicator CredentialsDB
3635

3736
var tables []string
3837
var channelSize int
@@ -58,18 +57,6 @@ func MakeCredentials() {
5857
os.Getenv("MASTER_DBNAME"),
5958
}
6059

61-
replicationPort, _ := strconv.Atoi(os.Getenv("REPLICATOR_PORT"))
62-
replicator = CredentialsDB{
63-
Credentials{
64-
"mysql",
65-
},
66-
os.Getenv("REPLICATOR_HOST"),
67-
replicationPort,
68-
os.Getenv("REPLICATOR_USER"),
69-
os.Getenv("REPLICATOR_PASS"),
70-
os.Getenv("REPLICATOR_DBNAME"),
71-
}
72-
7360
if os.Getenv("ALLOWED_TABLES") != "" {
7461
for _, tableName := range strings.Split(os.Getenv("ALLOWED_TABLES"), ",") {
7562
tables = append(tables, strings.TrimSpace(tableName))
@@ -87,8 +74,6 @@ func GetCredentials(storageType string) interface{} {
8774
return getMaster()
8875
case constants.DBSlave:
8976
return getSlave()
90-
case constants.DBReplicator:
91-
return getReplicator()
9277
default:
9378
return Credentials{}
9479
}
@@ -102,10 +87,6 @@ func getSlave() interface{} {
10287
return slave
10388
}
10489

105-
func getReplicator() CredentialsDB {
106-
return replicator
107-
}
108-
10990
func GetTables() []string {
11091
return tables
11192
}

src/helpers/connection_pool.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,9 @@ type ConnectionMaster interface {
99
Get(map[string]interface{}) *sql.Rows
1010
}
1111

12-
type ConnectionReplicator interface {
13-
ConnectionMaster
14-
}
15-
1612
type ConnectionPool struct {
17-
Master ConnectionMaster
18-
Slave Storage
19-
Replicator ConnectionReplicator
13+
Master ConnectionMaster
14+
Slave Storage
2015
}
2116

2217
var ConnPool ConnectionPool

src/models/system/connector.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,15 @@ func Exec(mode string, params map[string]interface{}) bool {
1212
case constants.DBMaster:
1313
helpers.ConnPool.Master = mysql.GetConnection(helpers.ConnPool.Master, constants.DBMaster).(helpers.ConnectionMaster)
1414
return helpers.ConnPool.Master.Exec(params)
15-
case constants.DBReplicator:
16-
helpers.ConnPool.Replicator = mysql.GetConnection(helpers.ConnPool.Replicator, constants.DBReplicator).(helpers.ConnectionReplicator)
17-
return helpers.ConnPool.Replicator.Exec(params)
1815
}
1916

2017
return false
2118
}
2219

2320
func Get(mode string, params map[string]interface{}) *sql.Rows {
2421
switch mode {
25-
case constants.DBMaster:
22+
default:
2623
helpers.ConnPool.Master = mysql.GetConnection(helpers.ConnPool.Master, constants.DBMaster).(helpers.ConnectionMaster)
2724
return helpers.ConnPool.Master.Get(params)
28-
default:
29-
helpers.ConnPool.Replicator = mysql.GetConnection(helpers.ConnPool.Replicator, constants.DBReplicator).(helpers.ConnectionReplicator)
30-
return helpers.ConnPool.Replicator.Get(params)
3125
}
3226
}

src/models/system/model.go

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,51 @@
11
package system
22

33
import (
4+
"fmt"
5+
"github.com/siddontang/go-mysql/mysql"
46
"horgh-replicator/src/constants"
7+
"horgh-replicator/src/tools/exit"
8+
"io/ioutil"
9+
"os"
10+
"strconv"
11+
"strings"
512
)
613

7-
type replicator struct {
8-
Key string `gorm:"column:param_key"`
9-
Value string `gorm:"column:param_value"`
10-
}
11-
12-
func GetValue(key string) string {
13-
query := `SELECT * FROM param_values WHERE param_key=? LIMIT 1;`
14-
params := []interface{}{
15-
key,
16-
}
17-
18-
res := Get(constants.DBReplicator, map[string]interface{}{
19-
"query": query,
20-
"params": params,
21-
})
22-
23-
var row replicator
24-
for res.Next() {
25-
err := res.Scan(&row.Key, &row.Value)
26-
if err != nil {
27-
panic(err.Error())
28-
}
29-
}
30-
result := row.Value
14+
const (
15+
PositionMask = "%s:%s"
16+
)
3117

32-
defer func() {
33-
_ = res.Close()
34-
}()
18+
func SetPosition(hash string, position mysql.Position) error {
19+
content := fmt.Sprintf(PositionMask, position.Name, strconv.Itoa(int(position.Pos)))
20+
fileName := fmt.Sprintf(constants.PositionsPath, hash)
21+
err := ioutil.WriteFile(fileName, []byte(content), 0644)
3522

36-
return result
23+
return err
3724
}
3825

39-
func SetValue(key string, value string) bool {
40-
query := `INSERT INTO param_values(param_key, param_value) VALUES(?, ?) ON DUPLICATE KEY UPDATE param_value=?;`
41-
params := []interface{}{
42-
key,
43-
value,
44-
value,
26+
func GetPosition(hash string) mysql.Position {
27+
fileName := fmt.Sprintf(constants.PositionsPath, hash)
28+
29+
if _, err := os.Stat(fileName); os.IsNotExist(err) {
30+
return mysql.Position{}
4531
}
4632

47-
res := Exec(constants.DBReplicator, map[string]interface{}{
48-
"query": query,
49-
"params": params,
50-
})
33+
content, err := ioutil.ReadFile(fileName)
34+
if err != nil {
35+
exit.Fatal(constants.ErrorGetPosition, err.Error())
36+
}
37+
data := strings.Split(string(content[:]), ":")
38+
if len(data) != 2 {
39+
exit.Fatal(constants.ErrorGetPosition, "Can't parse position")
40+
}
41+
position, err := strconv.Atoi(data[1])
42+
if err != nil {
43+
exit.Fatal(constants.ErrorGetPosition, err.Error())
44+
}
45+
pos := mysql.Position{
46+
Name: data[0],
47+
Pos: uint32(position),
48+
}
5149

52-
return res
50+
return pos
5351
}

0 commit comments

Comments
 (0)