|
@@ -2,12 +2,34 @@ package kafka
|
|
|
|
|
|
import (
|
|
|
"encoding/binary"
|
|
|
+ "math"
|
|
|
"net"
|
|
|
+ "strings"
|
|
|
+)
|
|
|
+
|
|
|
+type ApiKey uint16
|
|
|
+type ApiVersion uint16
|
|
|
+
|
|
|
+type API struct {
|
|
|
+ key ApiKey
|
|
|
+ version ApiVersion
|
|
|
+}
|
|
|
+
|
|
|
+var (
|
|
|
+ REQUEST_PRODUCE = API{0, 0}
|
|
|
+ REQUEST_FETCH = API{1, 0}
|
|
|
+ REQUEST_OFFSET = API{2, 0}
|
|
|
+ REQUEST_METADATA = API{3, 0}
|
|
|
+ REQUEST_LEADER_AND_ISR = API{4, 0}
|
|
|
+ REQUEST_STOP_REPLICA = API{5, 0}
|
|
|
+ REQUEST_OFFSET_COMMIT = API{6, 0}
|
|
|
+ REQUEST_OFFSET_FETCH = API{7, 0}
|
|
|
)
|
|
|
|
|
|
type Client struct {
|
|
|
- addr string
|
|
|
- conn net.Conn
|
|
|
+ addr, id string
|
|
|
+ correlation_id int32
|
|
|
+ conn net.Conn
|
|
|
}
|
|
|
|
|
|
func NewClient(addr string) (client *Client, err error) {
|
|
@@ -15,7 +37,7 @@ func NewClient(addr string) (client *Client, err error) {
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- client = &Client{addr, conn}
|
|
|
+ client = &Client{addr, "", 0, conn}
|
|
|
return client, err
|
|
|
}
|
|
|
|
|
@@ -53,3 +75,21 @@ func (client *Client) read() (buf []byte, err error) {
|
|
|
}
|
|
|
return buf, nil
|
|
|
}
|
|
|
+
|
|
|
+func encodeString(in string) (buf []byte) {
|
|
|
+ r := strings.NewReader(in)
|
|
|
+ size := r.Len()
|
|
|
+ if size > math.MaxInt16 {
|
|
|
+ panic("string too long to encode")
|
|
|
+ }
|
|
|
+ buf = make([]byte, 2+size)
|
|
|
+ binary.BigEndian.PutUint16(buf, uint16(size))
|
|
|
+ if size > 0 {
|
|
|
+ _, err := r.Read(buf[2:])
|
|
|
+ if err != nil {
|
|
|
+
|
|
|
+ panic("couldn't read from string")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return buf
|
|
|
+}
|