|
|
@@ -2,17 +2,13 @@ package kafka
|
|
|
|
|
|
import (
|
|
|
"encoding/binary"
|
|
|
+ "math"
|
|
|
"net"
|
|
|
)
|
|
|
|
|
|
-type (
|
|
|
- ApiKey int16
|
|
|
- ApiVersion int16
|
|
|
-)
|
|
|
-
|
|
|
type API struct {
|
|
|
- key ApiKey
|
|
|
- version ApiVersion
|
|
|
+ key int16
|
|
|
+ version int16
|
|
|
}
|
|
|
|
|
|
var (
|
|
|
@@ -31,12 +27,13 @@ type Client struct {
|
|
|
id *string
|
|
|
correlation_id int32
|
|
|
conn net.Conn
|
|
|
- responses chan response
|
|
|
+ requests chan reqResPair
|
|
|
+ responses chan reqResPair
|
|
|
}
|
|
|
|
|
|
-type response struct {
|
|
|
+type reqResPair struct {
|
|
|
correlation_id int32
|
|
|
- buf []byte
|
|
|
+ packets chan []byte
|
|
|
}
|
|
|
|
|
|
func NewClient(addr string) (client *Client, err error) {
|
|
|
@@ -44,36 +41,45 @@ func NewClient(addr string) (client *Client, err error) {
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- client = &Client{addr, nil, 0, conn, make(chan response)}
|
|
|
- go client.readLoop()
|
|
|
+ client = &Client{addr: addr, conn: conn, requests: make(chan reqResPair), responses: make(chan reqResPair)}
|
|
|
+ go client.sendRequestLoop()
|
|
|
+ go client.rcvResponseLoop()
|
|
|
return client, err
|
|
|
}
|
|
|
|
|
|
-func (client *Client) write(buf []byte) (err error) {
|
|
|
- size := make([]byte, 4)
|
|
|
- binary.BigEndian.PutUint32(size, uint32(len(buf)))
|
|
|
- _, err = client.conn.Write(size)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- _, err = client.conn.Write(buf)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
+func (client *Client) sendRequestLoop() {
|
|
|
+ var request reqResPair
|
|
|
+ var n int
|
|
|
+ var err error
|
|
|
+ var buf []byte
|
|
|
+ for {
|
|
|
+ request = <-client.requests
|
|
|
+ buf = <-request.packets
|
|
|
+ n, err = client.conn.Write(buf)
|
|
|
+ if err != nil || n != len(buf) {
|
|
|
+ close(client.requests)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ client.responses <- request
|
|
|
}
|
|
|
- return nil
|
|
|
}
|
|
|
|
|
|
-func (client *Client) readLoop() {
|
|
|
- var resp response
|
|
|
+func (client *Client) rcvResponseLoop() {
|
|
|
+ var response reqResPair
|
|
|
+ var n int
|
|
|
+ var length int32
|
|
|
+ var err error
|
|
|
+ var buf []byte
|
|
|
header := make([]byte, 4)
|
|
|
for {
|
|
|
- n, err := client.conn.Read(header)
|
|
|
+ response = <-client.responses
|
|
|
+ n, err = client.conn.Read(header)
|
|
|
if err != nil || n != 4 {
|
|
|
close(client.responses)
|
|
|
return
|
|
|
}
|
|
|
- length := int32(binary.BigEndian.Uint32(header))
|
|
|
- if length <= 4 {
|
|
|
+ length = int32(binary.BigEndian.Uint32(header))
|
|
|
+ if length <= 4 || length > 2*math.MaxUint16 {
|
|
|
close(client.responses)
|
|
|
return
|
|
|
}
|
|
|
@@ -83,55 +89,59 @@ func (client *Client) readLoop() {
|
|
|
close(client.responses)
|
|
|
return
|
|
|
}
|
|
|
- resp.correlation_id = int32(binary.BigEndian.Uint32(header))
|
|
|
+ if response.correlation_id != int32(binary.BigEndian.Uint32(header)) {
|
|
|
+ close(client.responses)
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
- resp.buf = make([]byte, length-4)
|
|
|
- n, err = client.conn.Read(resp.buf)
|
|
|
+ buf = make([]byte, length-4)
|
|
|
+ n, err = client.conn.Read(buf)
|
|
|
if err != nil || n != int(length-4) {
|
|
|
close(client.responses)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- client.responses <- resp
|
|
|
+ response.packets <- buf
|
|
|
+ close(response.packets)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (client *Client) sendRequest(api API, body []byte) (err error) {
|
|
|
+func (client *Client) sendRequest(api API, body []byte) (chan []byte, error) {
|
|
|
idLen, err := stringLen(client.id)
|
|
|
if err != nil {
|
|
|
- return err
|
|
|
+ return nil, err
|
|
|
}
|
|
|
- buf := make([]byte, 4+idLen+len(body))
|
|
|
+ // we buffer one packet so that we can send our packet to the request queue without
|
|
|
+ // blocking, and so that the responses can be sent to us async if we want them
|
|
|
+ request := reqResPair{client.correlation_id, make(chan []byte, 1)}
|
|
|
+ buf := make([]byte, 8+idLen+len(body))
|
|
|
off := 0
|
|
|
- binary.BigEndian.PutUint16(buf[off:], uint16(api.key))
|
|
|
- off += 2
|
|
|
- binary.BigEndian.PutUint16(buf[off:], uint16(api.version))
|
|
|
- off += 2
|
|
|
- binary.BigEndian.PutUint32(buf[off:], uint32(client.correlation_id))
|
|
|
- off += 4
|
|
|
- client.correlation_id++
|
|
|
- off, err = encodeString(buf, off, client.id)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
+ off = encodeInt32(buf, off, int32(len(buf)))
|
|
|
+ off = encodeInt16(buf, off, api.key)
|
|
|
+ off = encodeInt16(buf, off, api.version)
|
|
|
+ off = encodeInt32(buf, off, client.correlation_id)
|
|
|
+ off = encodeString(buf, off, client.id)
|
|
|
copy(buf[off:], body)
|
|
|
- return client.write(buf)
|
|
|
+ request.packets <- buf
|
|
|
+ client.requests <- request
|
|
|
+ client.correlation_id++
|
|
|
+ return request.packets, nil
|
|
|
}
|
|
|
|
|
|
-func (client *Client) sendMetadataRequest(topics []string) (err error) {
|
|
|
+func (client *Client) sendMetadataRequest(topics []string) (chan []byte, error) {
|
|
|
bufLen := 4
|
|
|
for i := range topics {
|
|
|
- bufLen += len(topics[i])
|
|
|
+ tmp, err := stringLen(&topics[i])
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ bufLen += tmp
|
|
|
}
|
|
|
buf := make([]byte, bufLen)
|
|
|
off := 0
|
|
|
- binary.BigEndian.PutUint32(buf[off:], uint32(len(topics)))
|
|
|
- off += 4
|
|
|
+ off = encodeInt32(buf, off, int32(len(topics)))
|
|
|
for i := range topics {
|
|
|
- off, err = encodeString(buf, off, &topics[i])
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
+ off = encodeString(buf, off, &topics[i])
|
|
|
}
|
|
|
return client.sendRequest(REQUEST_METADATA, buf)
|
|
|
}
|