client.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package kafka
  2. import (
  3. "encoding/binary"
  4. "math"
  5. "net"
  6. )
  7. type ApiKey int16
  8. type ApiVersion int16
  9. type API struct {
  10. key ApiKey
  11. version ApiVersion
  12. }
  13. var (
  14. REQUEST_PRODUCE = API{0, 0}
  15. REQUEST_FETCH = API{1, 0}
  16. REQUEST_OFFSET = API{2, 0}
  17. REQUEST_METADATA = API{3, 0}
  18. REQUEST_LEADER_AND_ISR = API{4, 0}
  19. REQUEST_STOP_REPLICA = API{5, 0}
  20. REQUEST_OFFSET_COMMIT = API{6, 0}
  21. REQUEST_OFFSET_FETCH = API{7, 0}
  22. )
  23. type Client struct {
  24. addr, id string
  25. correlation_id int32
  26. conn net.Conn
  27. }
  28. func NewClient(addr string) (client *Client, err error) {
  29. conn, err := net.Dial("tcp", addr)
  30. if err != nil {
  31. return nil, err
  32. }
  33. client = &Client{addr, "", 0, conn}
  34. return client, err
  35. }
  36. func (client *Client) write(buf []byte) (err error) {
  37. size := make([]byte, 4)
  38. binary.BigEndian.PutUint32(size, uint32(len(buf)))
  39. _, err = client.conn.Write(size)
  40. if err != nil {
  41. return err
  42. }
  43. _, err = client.conn.Write(buf)
  44. if err != nil {
  45. return err
  46. }
  47. return nil
  48. }
  49. func (client *Client) read() (buf []byte, err error) {
  50. size := make([]byte, 4)
  51. n, err := client.conn.Read(size)
  52. if err != nil {
  53. return nil, err
  54. }
  55. if n != 4 {
  56. return nil, nil
  57. }
  58. s := binary.BigEndian.Uint32(size)
  59. buf = make([]byte, s)
  60. n, err = client.conn.Read(buf)
  61. if err != nil {
  62. return nil, err
  63. }
  64. if uint32(n) != s {
  65. return nil, nil
  66. }
  67. return buf, nil
  68. }
  69. func encodeString(in string) (buf []byte) {
  70. size := len(in)
  71. if size > math.MaxInt16 {
  72. panic("string too long to encode") /* Or just return nil? */
  73. }
  74. buf = make([]byte, 2+size)
  75. binary.BigEndian.PutUint16(buf, uint16(size))
  76. if size > 0 {
  77. copy(buf[2:], in)
  78. }
  79. return buf
  80. }
  81. func encodeBytes(in []byte) (buf []byte) {
  82. size := len(in)
  83. if size > math.MaxInt32 {
  84. panic("bytes too long to encode") /* Or just return nil? */
  85. }
  86. buf = make([]byte, 4+size)
  87. binary.BigEndian.PutUint32(buf, uint32(size))
  88. if size > 0 {
  89. copy(buf[4:], in)
  90. }
  91. return buf
  92. }
  93. func (client *Client) sendRequest(api *API, body []byte) (err error) {
  94. id := encodeString(client.id)
  95. buf := make([]byte, 4+len(id)+len(body))
  96. binary.BigEndian.PutUint16(buf[0:2], uint16(api.key))
  97. binary.BigEndian.PutUint16(buf[2:4], uint16(api.version))
  98. binary.BigEndian.PutUint32(buf[4:8], uint32(client.correlation_id))
  99. copy(buf[8:], id)
  100. copy(buf[8+len(id):], body)
  101. return client.write(buf)
  102. }