client.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package kafka
  2. import (
  3. "encoding/binary"
  4. "math"
  5. "net"
  6. "strings"
  7. )
  8. type ApiKey uint16
  9. type ApiVersion uint16
  10. type API struct {
  11. key ApiKey
  12. version ApiVersion
  13. }
  14. var (
  15. REQUEST_PRODUCE = API{0, 0}
  16. REQUEST_FETCH = API{1, 0}
  17. REQUEST_OFFSET = API{2, 0}
  18. REQUEST_METADATA = API{3, 0}
  19. REQUEST_LEADER_AND_ISR = API{4, 0}
  20. REQUEST_STOP_REPLICA = API{5, 0}
  21. REQUEST_OFFSET_COMMIT = API{6, 0}
  22. REQUEST_OFFSET_FETCH = API{7, 0}
  23. )
  24. type Client struct {
  25. addr, id string
  26. correlation_id int32
  27. conn net.Conn
  28. }
  29. func NewClient(addr string) (client *Client, err error) {
  30. conn, err := net.Dial("tcp", addr)
  31. if err != nil {
  32. return nil, err
  33. }
  34. client = &Client{addr, "", 0, conn}
  35. return client, err
  36. }
  37. func (client *Client) write(buf []byte) (err error) {
  38. size := make([]byte, 4)
  39. binary.BigEndian.PutUint32(size, uint32(len(buf)))
  40. _, err = client.conn.Write(size)
  41. if err != nil {
  42. return err
  43. }
  44. _, err = client.conn.Write(buf)
  45. if err != nil {
  46. return err
  47. }
  48. return nil
  49. }
  50. func (client *Client) read() (buf []byte, err error) {
  51. size := make([]byte, 4)
  52. n, err := client.conn.Read(size)
  53. if err != nil {
  54. return nil, err
  55. }
  56. if n != 4 {
  57. return nil, nil
  58. }
  59. s := binary.BigEndian.Uint32(size)
  60. buf = make([]byte, s)
  61. n, err = client.conn.Read(buf)
  62. if err != nil {
  63. return nil, err
  64. }
  65. if uint32(n) != s {
  66. return nil, nil
  67. }
  68. return buf, nil
  69. }
  70. func encodeString(in string) (buf []byte) {
  71. r := strings.NewReader(in)
  72. size := r.Len()
  73. if size > math.MaxInt16 {
  74. panic("string too long to encode") /* Or just return nil? */
  75. }
  76. buf = make([]byte, 2+size)
  77. binary.BigEndian.PutUint16(buf, uint16(size))
  78. if size > 0 {
  79. _, err := r.Read(buf[2:])
  80. if err != nil {
  81. /* this should never happen */
  82. panic("couldn't read from string")
  83. }
  84. }
  85. return buf
  86. }