Skip to content

Commit bf94905

Browse files
authored
Merge pull request #21 from larsnovikov/refactoring_and_pg_carcass
Refactoring and pg carcass
2 parents 540a2d1 + 1cf01aa commit bf94905

File tree

36 files changed

+570
-326
lines changed

36 files changed

+570
-326
lines changed

files/sql/pg_create_log_func.sql

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
CREATE OR REPLACE FUNCTION horgh.if_modified_func() RETURNS TRIGGER AS $body$
2+
DECLARE
3+
v_old_data TEXT;
4+
v_new_data TEXT;
5+
BEGIN
6+
IF (TG_OP = 'UPDATE') THEN
7+
v_old_data := ROW(OLD.*);
8+
v_new_data := ROW(NEW.*);
9+
INSERT INTO horgh.logged_actions (schema_name,table_name,user_name,action,original_data,new_data,query)
10+
VALUES (TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,session_user::TEXT,substring(TG_OP,1,1),v_old_data,v_new_data, current_query());
11+
RETURN NEW;
12+
ELSIF (TG_OP = 'DELETE') THEN
13+
v_old_data := ROW(OLD.*);
14+
INSERT INTO horgh.logged_actions (schema_name,table_name,user_name,action,original_data,query)
15+
VALUES (TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,session_user::TEXT,substring(TG_OP,1,1),v_old_data, current_query());
16+
RETURN OLD;
17+
ELSIF (TG_OP = 'INSERT') THEN
18+
v_new_data := ROW(NEW.*);
19+
INSERT INTO horgh.logged_actions (schema_name,table_name,user_name,action,new_data,query)
20+
VALUES (TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,session_user::TEXT,substring(TG_OP,1,1),v_new_data, current_query());
21+
RETURN NEW;
22+
ELSE
23+
RAISE WARNING '[HORGH.IF_MODIFIED_FUNC] - Other action occurred: %, at %',TG_OP,now();
24+
RETURN NULL;
25+
END IF;
26+
27+
EXCEPTION
28+
WHEN data_exception THEN
29+
RAISE WARNING '[HORGH.IF_MODIFIED_FUNC] - UDF ERROR [DATA EXCEPTION] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;
30+
RETURN NULL;
31+
WHEN unique_violation THEN
32+
RAISE WARNING '[HORGH.IF_MODIFIED_FUNC] - UDF ERROR [UNIQUE] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;
33+
RETURN NULL;
34+
WHEN OTHERS THEN
35+
RAISE WARNING '[HORGH.IF_MODIFIED_FUNC] - UDF ERROR [OTHER] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;
36+
RETURN NULL;
37+
END;
38+
$body$
39+
LANGUAGE plpgsql
40+
SECURITY DEFINER
41+
SET search_path = pg_catalog, horgh;

files/sql/pg_create_log_table.sql

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
CREATE schema horgh;
2+
REVOKE CREATE ON schema horgh FROM public;
3+
4+
CREATE TABLE IF NOT EXISTS horgh.%s (
5+
id SERIAL,
6+
schema_name text NOT NULL,
7+
TABLE_NAME text NOT NULL,
8+
user_name text,
9+
action_tstamp TIMESTAMP WITH TIME zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
10+
action TEXT NOT NULL CHECK (action IN ('I','D','U')),
11+
original_data text,
12+
new_data text,
13+
query text
14+
) WITH (fillfactor=100);
15+
16+
REVOKE ALL ON horgh.%s FROM public;
17+
18+
GRANT SELECT ON horgh.%s TO public;
19+
20+
CREATE INDEX %s_action_idx
21+
ON horgh.logged_actions(action);
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
CREATE TRIGGER %s_horgh
2+
AFTER INSERT OR UPDATE OR DELETE ON horgh.%s
3+
FOR EACH ROW EXECUTE PROCEDURE horgh.if_modified_func();

src/Gopkg.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/connectors/clickhouse/model.go renamed to src/connectors/clickhouse/slave/model.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
package clickhouse
1+
package slave
22

33
import (
44
"fmt"
55
"horgh-replicator/src/connectors"
6+
"horgh-replicator/src/connectors/clickhouse"
67
"horgh-replicator/src/constants"
78
"horgh-replicator/src/helpers"
89
"strings"
@@ -149,6 +150,6 @@ func (model *Model) Exec(params helpers.Query) bool {
149150
}
150151

151152
func (model *Model) Connection() helpers.Storage {
152-
helpers.ConnPool.Slave = GetConnection(helpers.ConnPool.Slave, constants.DBSlave).(helpers.Storage)
153+
helpers.ConnPool.Slave = clickhouse.GetConnection(helpers.ConnPool.Slave, constants.DBSlave).(helpers.Storage)
153154
return helpers.ConnPool.Slave
154155
}
File renamed without changes.
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package master
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"github.com/siddontang/go-log/log"
7+
"github.com/siddontang/go-mysql/mysql"
8+
"horgh-replicator/src/constants"
9+
"horgh-replicator/src/helpers"
10+
"horgh-replicator/src/models/slave"
11+
"horgh-replicator/src/tools/exit"
12+
toolsHelper "horgh-replicator/src/tools/helpers"
13+
"os/exec"
14+
"regexp"
15+
"strconv"
16+
"strings"
17+
)
18+
19+
const (
20+
createDump = "mysqldump --extended-insert=FALSE --no-create-info --master-data=1 --port=%s -u%s -p%s -h %s %s %s"
21+
insertRegexp = `VALUES \([A-Za-z0-9,\s,\S]+\)`
22+
positionRegexp = `MASTER_LOG_FILE=\'([a-zA-Z\-\.0-9]+)\', MASTER_LOG_POS=([0-9]+)`
23+
parseStringSize = 99999999
24+
)
25+
26+
func buildModel(tableName string) {
27+
toolsHelper.Table = tableName
28+
if canHandle() == true {
29+
toolsHelper.ParseStrings = make(chan string, parseStringSize)
30+
go parseLine(toolsHelper.ParseStrings)
31+
32+
readDump()
33+
34+
toolsHelper.Wait(func() bool {
35+
return slave.GetSlaveByName(toolsHelper.Table).GetChannelLen() == 0 && len(toolsHelper.ParseStrings) == 0
36+
})
37+
}
38+
}
39+
40+
func canHandle() bool {
41+
savedPos := GetSavedPos(toolsHelper.Table)
42+
if savedPos.Name == "" && savedPos.Pos == 0 {
43+
return true
44+
}
45+
46+
exit.Fatal(constants.ErrorSlaveBuilt, toolsHelper.Table, toolsHelper.Table)
47+
return false
48+
}
49+
50+
func readDump() {
51+
log.Infof(constants.MessageStartReadDump, toolsHelper.Table)
52+
cred := helpers.GetCredentials(constants.DBMaster).(helpers.CredentialsDB)
53+
54+
dumpCmd := fmt.Sprintf(createDump, strconv.Itoa(cred.Port), cred.User, cred.Pass, cred.Host, cred.DBname, toolsHelper.Table)
55+
cmdArgs := strings.Fields(dumpCmd)
56+
cmd := exec.Command(cmdArgs[0], cmdArgs[1:len(cmdArgs)]...)
57+
// create a pipe for the output of the script
58+
cmdReader, err := cmd.StdoutPipe()
59+
if err != nil {
60+
exit.Fatal(constants.ErrorDumpRead, err)
61+
}
62+
63+
scanner := bufio.NewScanner(cmdReader)
64+
65+
go func() {
66+
for scanner.Scan() {
67+
toolsHelper.ParseStrings <- scanner.Text()
68+
}
69+
}()
70+
71+
err = cmd.Start()
72+
if err != nil {
73+
exit.Fatal(constants.ErrorDumpRead, err)
74+
}
75+
76+
log.Infof(constants.MessageDumpRead, toolsHelper.Table)
77+
}
78+
79+
func parseLine(c chan string) {
80+
for {
81+
line := <-c
82+
83+
// try to parse like insert
84+
if parseInsert(line) == true {
85+
continue
86+
}
87+
88+
// try to parse like position setter
89+
if parsePosition(line) == true {
90+
continue
91+
}
92+
}
93+
}
94+
95+
func parseInsert(line string) bool {
96+
re := regexp.MustCompile(insertRegexp)
97+
match := re.FindStringSubmatch(line)
98+
if len(match) > 0 {
99+
// TODO fix me
100+
r := strings.NewReplacer("VALUES", "",
101+
"'", "",
102+
"(", "",
103+
")", "")
104+
105+
params := strings.Split(strings.TrimSpace(r.Replace(match[0])), ",")
106+
107+
slave.GetSlaveByName(toolsHelper.Table).ClearParams()
108+
109+
interfaceParams := make([]interface{}, len(params))
110+
for i := range params {
111+
interfaceParams[i] = params[i]
112+
}
113+
err := ParseRow(slave.GetSlaveByName(toolsHelper.Table), interfaceParams)
114+
if err != nil {
115+
exit.Fatal(constants.ErrorParseLine, line, err)
116+
}
117+
118+
header, _ := toolsHelper.GetHeader()
119+
120+
slave.GetSlaveByName(toolsHelper.Table).Insert(&header)
121+
122+
return true
123+
}
124+
125+
return false
126+
}
127+
128+
func parsePosition(line string) bool {
129+
re := regexp.MustCompile(positionRegexp)
130+
match := re.FindStringSubmatch(line)
131+
132+
if len(match) > 0 {
133+
pos, _ := strconv.Atoi(match[2])
134+
toolsHelper.Position = mysql.Position{
135+
Name: match[1],
136+
Pos: uint32(pos),
137+
}
138+
139+
toolsHelper.SetPosition()
140+
141+
return true
142+
}
143+
144+
return false
145+
}

src/parser/binlog.go renamed to src/connectors/mysql/master/listener.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package parser
1+
package master
22

33
import (
44
"fmt"
@@ -25,13 +25,7 @@ var AllowHandling = true
2525
var curCanal *canal.Canal
2626

2727
func (h *binlogHandler) canOperate(logTableName string) bool {
28-
for _, tableName := range helpers.GetTables() {
29-
if tableName == logTableName {
30-
return true
31-
}
32-
}
33-
34-
return false
28+
return helpers.GetTable() == logTableName
3529
}
3630

3731
func (h *binlogHandler) prepareCanal() {
@@ -110,7 +104,7 @@ func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {
110104

111105
for i := n; i < len(e.Rows); i += k {
112106
if h.ParseBinLog(currentSlave, e, i) != nil {
113-
exit.Fatal(constants.ErrorBinlogParsing)
107+
exit.Fatal(constants.ErrorLogParsing)
114108
}
115109

116110
if e.Action == canal.UpdateAction {
@@ -163,7 +157,7 @@ func (h *binlogHandler) String() string {
163157
return "binlogHandler"
164158
}
165159

166-
func BinlogListener() {
160+
func Listen() {
167161
c, err := getDefaultCanal()
168162
if err == nil {
169163
position, err := c.GetMasterPos()
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package master
2+
3+
import (
4+
"horgh-replicator/src/models/slave"
5+
"horgh-replicator/src/tools/exit"
6+
)
7+
8+
type Model struct {
9+
}
10+
11+
func (model *Model) Listen() {
12+
exit.BeforeExitPool = append(exit.BeforeExitPool, stop)
13+
exit.BeforeExitPool = append(exit.BeforeExitPool, slave.Stop)
14+
Listen()
15+
}
16+
17+
func (model *Model) BuildSlave(table string) {
18+
buildModel(table)
19+
}

src/parser/parser.go renamed to src/connectors/mysql/master/parser.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package parser
1+
package master
22

33
import (
44
"github.com/siddontang/go-log/log"
@@ -127,7 +127,7 @@ func prepareType(fieldName string, fieldType string, value interface{}, params m
127127
}
128128
}
129129

130-
func Stop() bool {
130+
func stop() bool {
131131
// stop handle binlog
132132
log.Infof(constants.MessageStopHandlingBinlog)
133133
AllowHandling = false

0 commit comments

Comments
 (0)