Skip to content

Adding AUTH support, more error handling, and timeouts #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module github.com/RedisLabs/sentinel_tunnel
3 changes: 3 additions & 0 deletions sentinel_tunnel_configuration_example.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
"node1.local:8001",
"node2.local:8001"
],
"Sentinel_dial_timeout": 1000,
"Sentinel_command_timeout": 1000,
"Sentinel_proxy_timeout": 1000,
"Databases":[
{
"Name":"db1",
Expand Down
34 changes: 21 additions & 13 deletions sentinel_tunnelling_client.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package main

import (
// "bufio"
"encoding/json"
"fmt"
"github.com/RedisLabs/sentinel_tunnel/st_logger"
"github.com/RedisLabs/sentinel_tunnel/st_sentinel_connection"
"io"
"io/ioutil"
"net"
"os"
"os/signal"
"time"

"github.com/RedisLabs/sentinel_tunnel/st_logger"
"github.com/RedisLabs/sentinel_tunnel/st_sentinel_connection"
)

type SentinelTunnellingDbConfig struct {
Expand All @@ -20,6 +21,9 @@ type SentinelTunnellingDbConfig struct {

type SentinelTunnellingConfiguration struct {
Sentinels_addresses_list []string
Sentinel_dial_timeout time.Duration
Sentinel_command_timeout time.Duration
Sentinel_proxy_timeout time.Duration
Databases []SentinelTunnellingDbConfig
}

Expand All @@ -45,7 +49,7 @@ func NewSentinelTunnellingClient(config_file_location string) *SentinelTunnellin
}

Tunnelling_client.sentinel_connection, err =
st_sentinel_connection.NewSentinelConnection(Tunnelling_client.configuration.Sentinels_addresses_list)
st_sentinel_connection.NewSentinelConnection(Tunnelling_client.configuration.Sentinels_addresses_list, Tunnelling_client.configuration.Sentinel_dial_timeout, Tunnelling_client.configuration.Sentinel_command_timeout)
if err != nil {
st_logger.WriteLogMessage(st_logger.FATAL, "an error has occur, ",
err.Error())
Expand All @@ -57,21 +61,24 @@ func NewSentinelTunnellingClient(config_file_location string) *SentinelTunnellin
}

func createTunnelling(conn1 net.Conn, conn2 net.Conn) {
io.Copy(conn1, conn2)
_, err := io.Copy(conn1, conn2)
if err != nil {
st_logger.WriteLogMessage(st_logger.ERROR, "tunneling failed, ", err.Error())
}
conn1.Close()
conn2.Close()
}

func handleConnection(c net.Conn, db_name string,
get_db_address_by_name get_db_address_by_name_function) {
get_db_address_by_name get_db_address_by_name_function, proxy_timeout time.Duration) {
db_address, err := get_db_address_by_name(db_name)
if err != nil {
st_logger.WriteLogMessage(st_logger.ERROR, "cannot get db address for ", db_name,
",", err.Error())
c.Close()
return
}
db_conn, err := net.Dial("tcp", db_address)
db_conn, err := net.DialTimeout("tcp", db_address, proxy_timeout)
if err != nil {
st_logger.WriteLogMessage(st_logger.ERROR, "cannot connect to db ", db_name,
",", err.Error())
Expand All @@ -83,7 +90,7 @@ func handleConnection(c net.Conn, db_name string,
}

func handleSigleDbConnections(listening_port string, db_name string,
get_db_address_by_name get_db_address_by_name_function) {
get_db_address_by_name get_db_address_by_name_function, proxy_timeout time.Duration) {

listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%s", listening_port))
if err != nil {
Expand All @@ -99,15 +106,15 @@ func handleSigleDbConnections(listening_port string, db_name string,
st_logger.WriteLogMessage(st_logger.FATAL, "cannot accept connections on port ",
listening_port, err.Error())
}
go handleConnection(conn, db_name, get_db_address_by_name)
go handleConnection(conn, db_name, get_db_address_by_name, proxy_timeout)
}

}

func (st_client *SentinelTunnellingClient) Start() {
for _, db_conf := range st_client.configuration.Databases {
go handleSigleDbConnections(db_conf.Local_port, db_conf.Name,
st_client.sentinel_connection.GetAddressByDbName)
st_client.sentinel_connection.GetAddressByDbName, st_client.configuration.Sentinel_proxy_timeout*time.Millisecond)
}
}

Expand All @@ -119,7 +126,8 @@ func main() {
st_logger.InitializeLogger(os.Args[2])
st_client := NewSentinelTunnellingClient(os.Args[1])
st_client.Start()
for {
time.Sleep(1000 * time.Millisecond)
}

quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)
<-quit
}
105 changes: 86 additions & 19 deletions st_sentinel_connection/st_sentinel_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package st_sentinel_connection

import (
"bufio"
"bytes"
"errors"
"fmt"
"net"
"net/url"
"strconv"
"time"
)
Expand All @@ -16,6 +18,8 @@ type Get_master_addr_reply struct {

type Sentinel_connection struct {
sentinels_addresses []string
sentinel_dial_timeout time.Duration
sentinel_command_timeout time.Duration
current_sentinel_connection net.Conn
reader *bufio.Reader
writer *bufio.Writer
Expand All @@ -32,7 +36,7 @@ func (c *Sentinel_connection) parseResponse() (request []string, err error, is_c
var ret []string
buf, _, e := c.reader.ReadLine()
if e != nil {
return nil, errors.New("failed read line from client"), client_closed
return nil, fmt.Errorf("failed read line from client: %v", e), client_closed
}
if len(buf) == 0 {
return nil, errors.New("failed read line from client"), client_closed
Expand All @@ -48,7 +52,7 @@ func (c *Sentinel_connection) parseResponse() (request []string, err error, is_c
for i := 0; i < mbulk_size; i++ {
buf1, _, e1 := c.reader.ReadLine()
if e1 != nil {
return nil, errors.New("failed read line from client"), client_closed
return nil, fmt.Errorf("failed read line from client: %v", e1), client_closed
}
if len(buf1) == 0 {
return nil, errors.New("failed read line from client"), client_closed
Expand All @@ -59,7 +63,7 @@ func (c *Sentinel_connection) parseResponse() (request []string, err error, is_c
bulk_size, _ := strconv.Atoi(string(buf1[1:]))
buf2, _, e2 := c.reader.ReadLine()
if e2 != nil {
return nil, errors.New("failed read line from client"), client_closed
return nil, fmt.Errorf("failed read line from client: %v", e2), client_closed
}
bulk := string(buf2)
if len(bulk) != bulk_size {
Expand All @@ -71,15 +75,22 @@ func (c *Sentinel_connection) parseResponse() (request []string, err error, is_c
}

func (c *Sentinel_connection) getMasterAddrByNameFromSentinel(db_name string) (addr []string, returned_err error, is_client_closed bool) {
c.writer.WriteString("*3\r\n")
c.writer.WriteString("$8\r\n")
c.writer.WriteString("sentinel\r\n")
c.writer.WriteString("$23\r\n")
c.writer.WriteString("get-master-addr-by-name\r\n")
c.writer.WriteString(fmt.Sprintf("$%d\r\n", len(db_name)))
c.writer.WriteString(db_name)
c.writer.WriteString("\r\n")
c.writer.Flush()
err := c.current_sentinel_connection.SetDeadline(time.Now().Add(c.sentinel_command_timeout))
if err != nil {
return nil, err, false
}
_, err = fmt.Fprintf(c.writer, "*3\r\n$8\r\nsentinel\r\n$23\r\nget-master-addr-by-name\r\n$%d\r\n%s\r\n", len(db_name), db_name)
if err != nil {
return nil, err, false
}
err = c.writer.Flush()
if err != nil {
return nil, err, false
}
err = c.current_sentinel_connection.SetDeadline(time.Time{})
if err != nil {
return nil, err, false
}

return c.parseResponse()
}
Expand All @@ -88,7 +99,7 @@ func (c *Sentinel_connection) retrieveAddressByDbName() {
for db_name := range c.get_master_address_by_name {
addr, err, is_client_closed := c.getMasterAddrByNameFromSentinel(db_name)
if err != nil {
fmt.Println("err: ", err.Error())
fmt.Println("failed to get master addresses: ", err.Error())
if !is_client_closed {
c.get_master_address_by_name_reply <- &Get_master_addr_reply{
reply: "",
Expand Down Expand Up @@ -120,32 +131,88 @@ func (c *Sentinel_connection) reconnectToSentinel() bool {
c.current_sentinel_connection = nil
}

var err error
c.current_sentinel_connection, err = net.DialTimeout("tcp", sentinelAddr, 300*time.Millisecond)
u, err := url.Parse("redis://" + sentinelAddr)
if err != nil {
fmt.Printf("failed to parse address %s: %v\n", sentinelAddr, err)
return false
}

c.current_sentinel_connection, err = net.DialTimeout("tcp", u.Host, c.sentinel_dial_timeout)
if err == nil {
c.reader = bufio.NewReader(c.current_sentinel_connection)
c.writer = bufio.NewWriter(c.current_sentinel_connection)

pass, ok := u.User.Password()
if ok {
err = c.auth(pass)
if err != nil {
fmt.Printf("failed to auth: %v\n", err)
return false
}
}

return true
}
fmt.Println(err.Error())
fmt.Printf("failed to dial sentinel %s: %v\n", u.Host, err)
}
return false
}

func (c *Sentinel_connection) auth(pass string) error {
if pass == "" {
return errors.New("password is not supplied")
}
err := c.current_sentinel_connection.SetDeadline(time.Now().Add(c.sentinel_command_timeout))
if err != nil {
return err
}
_, err = fmt.Fprintf(c.writer, "*2\r\n$4\r\nauth\r\n$%d\r\n%s\r\n", len(pass), pass)
if err != nil {
return err
}
err = c.writer.Flush()
if err != nil {
return err
}
err = c.current_sentinel_connection.SetDeadline(time.Time{})
if err != nil {
return err
}

err = c.parseAuthResponse()

return err
}

func (c *Sentinel_connection) parseAuthResponse() error {
buf, _, err := c.reader.ReadLine()
if err != nil {
return fmt.Errorf("failed read line from client: %v", err)
}

if !bytes.Equal([]byte("+OK"), buf) {
return errors.New("failed to authenticate")
}

return nil
}

func (c *Sentinel_connection) GetAddressByDbName(name string) (string, error) {
c.get_master_address_by_name <- name
reply := <-c.get_master_address_by_name_reply
return reply.reply, reply.err
}

func NewSentinelConnection(addresses []string) (*Sentinel_connection, error) {
func NewSentinelConnection(addresses []string, dialTimeout, commandTimeout time.Duration) (*Sentinel_connection, error) {
connection := Sentinel_connection{
sentinels_addresses: addresses,
get_master_address_by_name: make(chan string),
get_master_address_by_name_reply: make(chan *Get_master_addr_reply),
current_sentinel_connection: nil,
reader: nil,
writer: nil,
reader: nil,
writer: nil,
sentinel_dial_timeout: time.Millisecond * dialTimeout,
sentinel_command_timeout: time.Millisecond * commandTimeout,
}

if !connection.reconnectToSentinel() {
Expand Down