|
|
@@ -29,6 +29,12 @@ type Client struct {
|
|
|
id *string
|
|
|
correlation_id int32
|
|
|
conn net.Conn
|
|
|
+ responses chan response
|
|
|
+}
|
|
|
+
|
|
|
+type response struct {
|
|
|
+ correlation_id int32
|
|
|
+ buf []byte
|
|
|
}
|
|
|
|
|
|
func NewClient(addr string) (client *Client, err error) {
|
|
|
@@ -36,7 +42,8 @@ func NewClient(addr string) (client *Client, err error) {
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- client = &Client{addr, nil, 0, conn}
|
|
|
+ client = &Client{addr, nil, 0, conn, make(chan response)}
|
|
|
+ go client.readLoop()
|
|
|
return client, err
|
|
|
}
|
|
|
|
|
|
@@ -54,25 +61,37 @@ func (client *Client) write(buf []byte) (err error) {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (client *Client) read() (buf []byte, err error) {
|
|
|
- size := make([]byte, 4)
|
|
|
- n, err := client.conn.Read(size)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- if n != 4 {
|
|
|
- return nil, nil
|
|
|
- }
|
|
|
- s := int32(binary.BigEndian.Uint32(size))
|
|
|
- buf = make([]byte, s)
|
|
|
- n, err = client.conn.Read(buf)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- if n != int(s) {
|
|
|
- return nil, nil
|
|
|
+func (client *Client) readLoop() {
|
|
|
+ var resp response
|
|
|
+ header := make([]byte, 4)
|
|
|
+ for {
|
|
|
+ n, err := client.conn.Read(header)
|
|
|
+ if err != nil || n != 4 {
|
|
|
+ close(client.responses)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ length := int32(binary.BigEndian.Uint32(header))
|
|
|
+ if length <= 4 {
|
|
|
+ close(client.responses)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ n, err = client.conn.Read(header)
|
|
|
+ if err != nil || n != 4 {
|
|
|
+ close(client.responses)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ resp.correlation_id = int32(binary.BigEndian.Uint32(header))
|
|
|
+
|
|
|
+ resp.buf = make([]byte, length-4)
|
|
|
+ n, err = client.conn.Read(resp.buf)
|
|
|
+ if err != nil || n != int(length-4) {
|
|
|
+ close(client.responses)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ client.responses <- resp
|
|
|
}
|
|
|
- return buf, nil
|
|
|
}
|
|
|
|
|
|
func (client *Client) sendRequest(api API, body []byte) (err error) {
|