| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- package kafka
- import (
- "encoding/binary"
- "math"
- "net"
- )
- type ApiKey int16
- type ApiVersion int16
- type API struct {
- key ApiKey
- version ApiVersion
- }
- var (
- REQUEST_PRODUCE = API{0, 0}
- REQUEST_FETCH = API{1, 0}
- REQUEST_OFFSET = API{2, 0}
- REQUEST_METADATA = API{3, 0}
- REQUEST_LEADER_AND_ISR = API{4, 0}
- REQUEST_STOP_REPLICA = API{5, 0}
- REQUEST_OFFSET_COMMIT = API{6, 0}
- REQUEST_OFFSET_FETCH = API{7, 0}
- )
- type Client struct {
- addr, id string
- correlation_id int32
- conn net.Conn
- }
- func NewClient(addr string) (client *Client, err error) {
- conn, err := net.Dial("tcp", addr)
- if err != nil {
- return nil, err
- }
- client = &Client{addr, "", 0, conn}
- return client, err
- }
- func (client *Client) write(buf []byte) (err error) {
- size := make([]byte, 4)
- binary.BigEndian.PutUint32(size, uint32(len(buf)))
- _, err = client.conn.Write(size)
- if err != nil {
- return err
- }
- _, err = client.conn.Write(buf)
- if err != nil {
- return err
- }
- return nil
- }
- func (client *Client) read() (buf []byte, err error) {
- size := make([]byte, 4)
- n, err := client.conn.Read(size)
- if err != nil {
- return nil, err
- }
- if n != 4 {
- return nil, nil
- }
- s := binary.BigEndian.Uint32(size)
- buf = make([]byte, s)
- n, err = client.conn.Read(buf)
- if err != nil {
- return nil, err
- }
- if uint32(n) != s {
- return nil, nil
- }
- return buf, nil
- }
- func encodeString(in string) (buf []byte) {
- size := len(in)
- if size > math.MaxInt16 {
- panic("string too long to encode") /* Or just return nil? */
- }
- buf = make([]byte, 2+size)
- binary.BigEndian.PutUint16(buf, uint16(size))
- if size > 0 {
- copy(buf[2:], in)
- }
- return buf
- }
- func encodeBytes(in []byte) (buf []byte) {
- size := len(in)
- if size > math.MaxInt32 {
- panic("bytes too long to encode") /* Or just return nil? */
- }
- buf = make([]byte, 4+size)
- binary.BigEndian.PutUint32(buf, uint32(size))
- if size > 0 {
- copy(buf[4:], in)
- }
- return buf
- }
- func (client *Client) sendRequest(api *API, body []byte) (err error) {
- id := encodeString(client.id)
- buf := make([]byte, 4+len(id)+len(body))
- binary.BigEndian.PutUint16(buf[0:2], uint16(api.key))
- binary.BigEndian.PutUint16(buf[2:4], uint16(api.version))
- binary.BigEndian.PutUint32(buf[4:8], uint32(client.correlation_id))
- copy(buf[8:], id)
- copy(buf[8+len(id):], body)
- return client.write(buf)
- }
|