Skip to content

Go HTTP如何正确重试 #11

@ma6174

Description

@ma6174

问题来源

为了提高系统可靠性,当一个HTTP请求因为某些原因失败了之后,需要重新发送请求进行尝试。如果是GET请求,重试很简单,再重新发一下GET请求就可以了,如果是带有BodyPOST请求,在重发请求的时候需要把Body也重发一遍。我们先看看Go HTTP中对Request定义

type Request struct {
    Method string
    URL *url.URL
    Header Header
    Body io.ReadCloser
    ContentLength int64
    TransferEncoding []string
    Close bool
    Host string
    Form url.Values
    PostForm url.Values
    MultipartForm *multipart.Form
    Trailer Header
    RemoteAddr string
    RequestURI string
    TLS *tls.ConnectionState
}

这里的Body类型是io.ReadCloser,再简化一些就是一个Reader,服务端收到请求之后就会从这个Reader中调用Read()函数去读取数据,通常情况当服务端去读取数据的时候,offset会随之改变,下一次再读的时候会从offset位置继续向后读取。

当一个请求失败了我们要进行重试的时候,就必须重新发送Body,因为Body有可能已经被读过了,因此需要将Body这个Reader进行重置,我们可能首先想到的就是将Body执行Seek操作,Seek到Reader的起始位置,这样做的前提是这个Reader支持Seek操作,也就是说这个Reader是一个ReadSeeker。这样做有没有问题呢?我们先写一个脚本测试一下:

package main

import (
    "io"
    "log"
    "net/http"
    "os"
    "strings"
    "time"

    "github.com/ma6174/slowrw"
)

func handle(rw http.ResponseWriter, req *http.Request) {
    go func() {
        buf := make([]byte, 1)
        n, err := io.ReadFull(req.Body, buf)
        if err != nil || n != 1 {
            log.Fatal("read failed:", n, err)
        }
        if buf[0] != '1' {
            log.Fatalf("buf not start at 1: %#v", string(buf))
        }
        out, err := os.Open(os.DevNull)
        if err != nil {
            log.Fatal(err)
        }
        io.Copy(out, req.Body)
    }()
    time.Sleep(time.Second)
}

func main() {
    log.SetFlags(log.Lshortfile | log.LstdFlags)
    go func() {
        http.HandleFunc("/", handle)
        log.Fatal(http.ListenAndServe(":9999", nil))
    }()
    reader := strings.NewReader("1234567890abcdefghigklmnopqrst")
    sr := slowrw.NewReadSeeker(reader, time.Millisecond)
    req, err := http.NewRequest("POST", "http://127.0.0.1:9999/", sr)
    if err != nil {
        log.Fatal(err)
    }
    client := http.Client{
        Timeout: time.Millisecond * 10,
    }
    for {
        _, err := client.Do(req)
        if err != nil {
            log.Println(err)
        }
        _, err = sr.Seek(0, 0)
        if err != nil {
            log.Fatal("seek failed", err)
        }
    }
}

对上面的测试简单说明一下:这里定义了客户端和服务端,先说服务端,收到请求后会开一个goutine去异步地读取数据,读数据的时候先读一个字节,正常情况下我们认为第一个字节数据是1,如果读到的数据非1则认为body已经被读过了,数据是错的。

通常情况下读数据和收数据非常快,为了放慢速度方便测试,这里引用了我写的一个 github.com/ma6174/slowrw 库,这里我们用的是SlowReader,也就是说客户端在发数据的时候,1毫秒才能发一个字符。服务端读数据不限速。

另外客户端有设置一个超时时间:Timeout: time.Millisecond * 10,因为上面说了客户端没1毫秒才能发送一个字符,我们设置的字符有30个,所以全部发送完至少要30毫秒,但是我们客户端限制整个请求时间是10毫秒,也就是说客户端请求最终肯定会超时失败,这样做的目的是模拟客户端请求失败情况,并触发重试。

先看一下执行结果:

$ go run seek_http.go
2015/01/11 16:35:39 seek_http.go:51: Post http://127.0.0.1:9999/: read tcp 127.0.0.1:9999: use of closed network connection
2015/01/11 16:35:39 seek_http.go:51: Post http://127.0.0.1:9999/: read tcp 127.0.0.1:9999: use of closed network connection
2015/01/11 16:35:39 seek_http.go:51: Post http://127.0.0.1:9999/: read tcp 127.0.0.1:9999: use of closed network connection
2015/01/11 16:35:39 seek_http.go:51: Post http://127.0.0.1:9999/: read tcp 127.0.0.1:9999: use of closed network connection
2015/01/11 16:35:39 seek_http.go:51: Post http://127.0.0.1:9999/: read tcp 127.0.0.1:9999: use of closed network connection
2015/01/11 16:35:39 seek_http.go:51: Post http://127.0.0.1:9999/: read tcp 127.0.0.1:9999: use of closed network connection
2015/01/11 16:35:39 seek_http.go:22: buf not start at 1: "4"
exit status 1
$
$ go run seek_http.go
2015/01/11 16:35:42 seek_http.go:51: Post http://127.0.0.1:9999/: read tcp 127.0.0.1:9999: use of closed network connection
2015/01/11 16:35:42 seek_http.go:51: Post http://127.0.0.1:9999/: read tcp 127.0.0.1:9999: use of closed network connection
2015/01/11 16:35:42 seek_http.go:51: Post http://127.0.0.1:9999/: read tcp 127.0.0.1:9999: use of closed network connection
2015/01/11 16:35:42 seek_http.go:51: Post http://127.0.0.1:9999/: read tcp 127.0.0.1:9999: use of closed network connection
2015/01/11 16:35:42 seek_http.go:22: buf not start at 1: "5"
exit status 1
$
$ go run seek_http.go
2015/01/11 16:35:46 seek_http.go:51: Post http://127.0.0.1:9999/: read tcp 127.0.0.1:9999: use of closed network connection
2015/01/11 16:35:46 seek_http.go:51: Post http://127.0.0.1:9999/: read tcp 127.0.0.1:9999: use of closed network connection
2015/01/11 16:35:46 seek_http.go:51: Post http://127.0.0.1:9999/: read tcp 127.0.0.1:9999: use of closed network connection
2015/01/11 16:35:46 seek_http.go:22: buf not start at 1: "4"
exit status 1

看程序运行结果能看出,在经过几次正常(数据以1开始,最终超时)的请求之后,服务端就会出现收到不是以1开始的Body,这就意味着请求失败了,这样证实了通过每次Seek到起始位置的方法是不靠谱的。

问题分析

HTTP Server在读取数据的时候可能是在一个goroutine里面进行,当一个请求失败之后,虽然客户端调用了CancelRequest将请求取消,但是服务端可能还会继续读取部分数据。如果我们调用Seek之后服务端还在读取数据的话,就会造成数据偏移,导致服务端下次再读数据的时候读到的数据不完整。当Body中数据量比较大或者网络不稳定或者客户端速度限制等原因造成读取数据传输时间很长的情况,用Seek出现问题的可能性就非常大。

问题解决

解决方案也很多,既然同一个reader可能被并发读取,那我们只要保证一个reader只能被读取一次就可以了,最简单的方法是每次都重新构造一个reader。上面的代码简单修改是这样的:

package main

import (
    "io"
    "log"
    "net/http"
    "os"
    "strings"
    "time"

    "github.com/ma6174/slowrw"
)

func handle(rw http.ResponseWriter, req *http.Request) {
    go func() {
        buf := make([]byte, 1)
        n, err := io.ReadFull(req.Body, buf)
        if err != nil || n != 1 {
            log.Fatal("read failed:", n, err)
        }
        if buf[0] != '1' {
            log.Fatalf("buf not start at 1: %#v", string(buf))
        }
        out, err := os.Open(os.DevNull)
        if err != nil {
            log.Fatal(err)
        }
        io.Copy(out, req.Body)
    }()
    time.Sleep(time.Second)
}

func main() {
    log.SetFlags(log.Lshortfile | log.LstdFlags)
    go func() {
        http.HandleFunc("/", handle)
        log.Fatal(http.ListenAndServe(":9999", nil))
    }()
    reader := strings.NewReader("1234567890abcdefghigklmnopqrst")
    sr := slowrw.NewReader(reader, time.Millisecond)
    req, err := http.NewRequest("POST", "http://127.0.0.1:9999/", sr)
    if err != nil {
        log.Fatal(err)
    }
    client := http.Client{
        Timeout: time.Millisecond * 10,
    }
    for {
        _, err := client.Do(req)
        if err != nil {
            log.Println(err)
        }
        reader = strings.NewReader("1234567890abcdefghigklmnopqrst")
        sr = slowrw.NewReader(reader, time.Millisecond)
        req, err = http.NewRequest("POST", "http://127.0.0.1:9999/", sr)
        if err != nil {
            log.Fatal(err)
        }
    }
}

这段代码执行没有任何问题,但是存在隐患,主要体现在reader每次都是通过通过数据新生成的,这里用的是strings.NewReader,这就意味着客户端必须持有原始数据,直到请求成功或者放弃重试。如果请求的数据比较大或者请求量大的话可能会占用大量的内存。

另外一种解决方案是用io.ReaderAt,我们只需要对现有的reader进行封装,对于每一个请求,保证数据是从起始位置开始读取即可,先上代码:

package main

import (
    "io"
    "log"
    "net/http"
    "os"
    "strings"
    "time"

    "github.com/ma6174/slowrw"
)

func handle(rw http.ResponseWriter, req *http.Request) {
    go func() {
        buf := make([]byte, 1)
        n, err := io.ReadFull(req.Body, buf)
        if err != nil || n != 1 {
            log.Fatal("read failed:", n, err)
        }
        if buf[0] != '1' {
            log.Fatalf("buf not start at 1: %#v", string(buf))
        }
        out, err := os.Open(os.DevNull)
        if err != nil {
            log.Fatal(err)
        }
        io.Copy(out, req.Body)
    }()
    time.Sleep(time.Second)
}

type Reader struct {
    Reader io.ReaderAt
    Offset int64
}

func (p *Reader) Read(val []byte) (n int, err error) {
    n, err = p.Reader.ReadAt(val, p.Offset)
    p.Offset += int64(n)
    return
}

func (p *Reader) Close() error {
    if rc, ok := p.Reader.(io.ReadCloser); ok {
        return rc.Close()
    }
    return nil
}

func main() {
    log.SetFlags(log.Lshortfile | log.LstdFlags)
    go func() {
        http.HandleFunc("/", handle)
        log.Fatal(http.ListenAndServe(":9999", nil))
    }()
    reader := strings.NewReader("1234567890abcdefghigklmnopqrst")
    sr := slowrw.NewReaderAt(reader, time.Millisecond)
    ra := &Reader{reader, 0}
    req, err := http.NewRequest("POST", "http://127.0.0.1:9999/", ra)
    if err != nil {
        log.Fatal(err)
    }
    client := http.Client{
        Timeout: time.Millisecond * 10,
    }
    for {
        _, err := client.Do(req)
        if err != nil {
            log.Println(err)
        }
        sr = slowrw.NewReaderAt(reader, time.Millisecond)
        reader2 := &Reader{sr, 0}
        req.Body = reader2
    }
}

上面代码中,reader只有一个,但是每次都用&Reader{sr, 0}去生成一个从起点开始读数据的reader2,因为reader2每次都是从起点开始读取的,所以不会出现一个reader被多次读取的情况。当然用这种方式的前提是这个reader必须是io.ReaderAt即支持ReadAt()方法。

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions