Skip to content

Commit 74f9d04

Browse files
author
Sébastien GLON
committed
add read Handshake Response
1 parent 4d3728f commit 74f9d04

File tree

5 files changed

+133
-17
lines changed

5 files changed

+133
-17
lines changed

Transceiver.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ import (
55
"net"
66
"encoding/binary"
77
"fmt"
8+
"os"
9+
"io"
810
)
911

1012
type Transceiver interface {
11-
Transceive(request []bytes.Buffer) ([]byte, error)
13+
Transceive(request []bytes.Buffer) ([]io.Reader, error)
1214
RemoteName() string
1315
SetRemoteName(string)
1416
}
@@ -33,7 +35,7 @@ func (t NettyTransceiver) SetRemoteName(name string) {
3335
t.remoteName = name
3436
}
3537

36-
func (t NettyTransceiver) Transceive(requests []bytes.Buffer) ([]byte, error){
38+
func (t NettyTransceiver) Transceive(requests []bytes.Buffer) ([]io.Reader, error){
3739
nettyFrame := new(bytes.Buffer)
3840
t.Pack(nettyFrame, requests)
3941

@@ -42,11 +44,14 @@ func (t NettyTransceiver) Transceive(requests []bytes.Buffer) ([]byte, error){
4244
if err!=nil {
4345
return nil, fmt.Errorf("Fail to write on socket %v", err)
4446
}
45-
//sfmt.Fprintf(os.Stdout, "BufferSize %v", nettyFrame)
47+
4648
// Read Response
4749
bodyBytes := make([]byte, 1024)
48-
t.sock.Read(bodyBytes)
49-
return bodyBytes, nil
50+
_, err = t.sock.Read(bodyBytes)
51+
if err!=nil {
52+
return nil, fmt.Errorf("Fail to read on socket %v", err)
53+
}
54+
return t.Unpack(bodyBytes)
5055
}
5156

5257
func (t *NettyTransceiver) Pack(frame *bytes.Buffer, requests []bytes.Buffer) {
@@ -67,4 +72,26 @@ func (t *NettyTransceiver) Pack(frame *bytes.Buffer, requests []bytes.Buffer) {
6772
frame.Write(requestSize)
6873
frame.Write(request.Bytes())
6974
}
75+
}
76+
77+
func (t *NettyTransceiver) Unpack(frame []byte) ([]io.Reader, error) {
78+
79+
nettyNumberFame := binary.BigEndian.Uint32(frame[4:8])
80+
result := make([]io.Reader, nettyNumberFame)
81+
startFrame := uint32(8)
82+
i:=uint32(0)
83+
for i < nettyNumberFame {
84+
85+
86+
messageSize := uint32(binary.BigEndian.Uint32(frame[startFrame:startFrame+4]))
87+
fmt.Fprintf(os.Stdout, "\nnettyNumberFrame %v %v ", startFrame, frame[startFrame:startFrame+4])
88+
message := frame[startFrame+4:startFrame+4+messageSize]
89+
fmt.Fprintf(os.Stdout, "\nmessage: %v", message)
90+
startFrame = startFrame+4+messageSize
91+
br := bytes.NewReader(message)
92+
result[i] = br
93+
i++
94+
}
95+
96+
return result, nil
7097
}

Transceiver_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"bytes"
77
"reflect"
8+
"io/ioutil"
89
)
910

1011
func TestPack(t *testing.T) {
@@ -26,3 +27,25 @@ func TestPack(t *testing.T) {
2627
t.Fatalf("Frame not equals to %x: %x / %x",expectedSize, frame.Len(), frame.Bytes())
2728
}
2829
}
30+
31+
func TestUnpack(t *testing.T) {
32+
transceiver := new(NettyTransceiver)
33+
frame := []byte("\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x01\x0a\x00\x00\x00\x01\x0b")
34+
respons, err := transceiver.Unpack(frame)
35+
if err != nil {
36+
t.Fatalf("%v",err)
37+
}
38+
39+
if len(respons)!=2 {
40+
t.Fatalf("Number of reponse frame not equals %x / %x",2, len(respons))
41+
}
42+
43+
var resp1 []byte
44+
var resp2 []byte
45+
resp1, _ =ioutil.ReadAll(respons[0])
46+
respons[1].Read(resp2)
47+
if !reflect.DeepEqual(resp1, []byte("\x0a")) && !reflect.DeepEqual(resp2, []byte("\x0b")) {
48+
t.Fatalf("Reponse message not equals (0) %x/%x; (1) %x/%x","\x0a", respons[0], "\x0b", respons[1])
49+
}
50+
51+
}

ipc_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,11 @@ func TestRequestor(t *testing.T) {
3535
if err != nil {
3636
t.Fatal("Request: ", err)
3737
}
38+
39+
err = requestor.Request("append", flumeRecord)
40+
41+
if err != nil {
42+
t.Fatal("Request: ", err)
43+
}
3844
}
3945

requestor.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"bytes"
55
"fmt"
66
"io"
7-
"io/ioutil"
87
"log"
8+
"os"
99
)
1010

1111
var REMOTE_HASHES map[string][]byte
@@ -25,6 +25,7 @@ type Requestor struct {
2525
remote_protocol Protocol
2626
remote_hash []byte
2727
send_protocol bool
28+
send_handshake bool
2829
}
2930
func init() {
3031
var err error
@@ -49,7 +50,8 @@ func NewRequestor(localProto Protocol, transceiver Transceiver) *Requestor {
4950
transceiver: transceiver,
5051
// remote_protocol: nil,
5152
// remote_hash: nil,
52-
// send_protocol: nil,
53+
send_protocol: false,
54+
send_handshake: true,
5355
}
5456
}
5557

@@ -86,25 +88,32 @@ func (a *Requestor) Request(message_name string, request_datum interface{}) er
8688

8789
// sen the handshake and call request; block until call response
8890
buffer_writers := []bytes.Buffer{*frame1, *frame2}
89-
decoder, err := a.transceiver.Transceive(buffer_writers)
91+
responses, err := a.transceiver.Transceive(buffer_writers)
92+
9093
if err!=nil {
9194
return err
9295
}
93-
buffer_decoder := bytes.NewBuffer(decoder)
96+
//buffer_decoder := bytes.NewBuffer(decoder)
9497
// process the handshake and call response
95-
//ok, err := a.read_handshake_response(buffer_decoder)
96-
fmt.Sprintf("Response %v", buffer_decoder)
97-
//if err!=nil {
98-
// return err
99-
//} else if ok {
98+
fmt.Fprintf(os.Stdout, "\nresponsee %#v", responses)
99+
ok, err := a.read_handshake_response(responses[0])
100+
if err!=nil {
101+
return err
102+
}
103+
a.send_handshake= !ok
104+
105+
if ok {
100106
// a.read_call_response(message_name, buffer_decoder)
101107
//} else {
102108
// a.Request(message_name, request_datum)
103-
//}
109+
}
104110
return nil
105111
}
106112

107113
func (a *Requestor) write_handshake_request( buffer io.Writer ) (err error) {
114+
if !a.send_handshake {
115+
return nil
116+
}
108117
local_hash :=a.local_protocol.MD5
109118
remote_name := a.remote_protocol.Name
110119
remote_hash := REMOTE_HASHES[remote_name]
@@ -180,11 +189,14 @@ func (a *Requestor) write_request(request_codec Codec, request_datum interface{}
180189
}
181190

182191
func (a *Requestor) read_handshake_response(decoder io.Reader) (bool, error) {
183-
resp, _ := ioutil.ReadAll(decoder)
192+
if !a.send_handshake {
193+
return true, nil
194+
}
195+
184196
datum, err := HANDSHAKE_REQUESTOR_READER.Decode(decoder)
185197
if err != nil {
186198

187-
return false,fmt.Errorf("Fail to decode %v with error %v", resp, err)
199+
return false,fmt.Errorf("Fail to decode %v with error %v", decoder, err)
188200
}
189201

190202
record, ok := datum.(*Record)

requestor_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,54 @@ func TestWrite_handshake_request(t *testing.T) {
4848

4949
}
5050

51+
func TestRead_handshake_reponse(t *testing.T) {
52+
codecHandshake, err := NewCodec(handshakeResponseshema)
53+
if err != nil {
54+
t.Fatal(err)
55+
}
56+
record, err := NewRecord(RecordSchema(handshakeResponseshema))
57+
if err != nil {
58+
t.Fatal(err)
59+
}
60+
record.Set("match", Enum{"match","BOTH"})
61+
record.Set("serverProtocol", nil)
62+
record.Set("serverHash", nil)
63+
record.Set("meta", nil)
64+
65+
bb := new(bytes.Buffer)
66+
err = codecHandshake.Encode(bb, record)
67+
if err != nil {
68+
t.Fatal(err)
69+
}
70+
t.Logf("Encode HandshakeResponse %v", bb.Bytes())
71+
72+
73+
_, err = codecHandshake.Decode(bytes.NewReader(bb.Bytes()))
74+
if err != nil {
75+
t.Fatal(err)
76+
}
77+
78+
rAddr, err := net.ResolveTCPAddr("tcp", "10.98.80.113:63001")
79+
conn, err := net.DialTCP("tcp", nil, rAddr)
80+
if err != nil {
81+
t.Fatal(err)
82+
}
83+
defer conn.Close()
84+
85+
transceiver := NewNettyTransceiver(conn)
86+
protocol, err := NewProtocol()
87+
if err != nil {
88+
t.Fatal(err)
89+
}
90+
requestor := NewRequestor(protocol, transceiver)
91+
92+
_, err = requestor.read_handshake_response(bytes.NewReader(bb.Bytes()))
93+
if err != nil {
94+
t.Fatal(err)
95+
}
96+
97+
}
98+
5199

52100
func TestWrite_call_request(t *testing.T) {
53101
//t.SkipNow()

0 commit comments

Comments
 (0)