client.go 943 B

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. package kafka
  2. import (
  3. "encoding/binary"
  4. "net"
  5. )
  6. type Client struct {
  7. addr string
  8. conn net.Conn
  9. }
  10. func NewClient(addr string) (client *Client, err error) {
  11. conn, err := net.Dial("tcp", addr)
  12. if err != nil {
  13. return nil, err
  14. }
  15. client = &Client{addr, conn}
  16. return client, err
  17. }
  18. func (client *Client) write(buf []byte) (err error) {
  19. var size [4]byte
  20. binary.BigEndian.PutUint32(size[:], uint32(len(buf)))
  21. _, err = client.conn.Write(size[:])
  22. if err != nil {
  23. return err
  24. }
  25. _, err = client.conn.Write(buf)
  26. if err != nil {
  27. return err
  28. }
  29. return nil
  30. }
  31. func (client *Client) read() (buf []byte, err error) {
  32. var size [4]byte
  33. n, err := client.conn.Read(size[:])
  34. if err != nil {
  35. return nil, err
  36. }
  37. if n != 4 {
  38. return nil, nil
  39. }
  40. s := binary.BigEndian.Uint32(size[:])
  41. buf = make([]byte, s)
  42. n, err = client.conn.Read(buf)
  43. if err != nil {
  44. return nil, err
  45. }
  46. if uint32(n) != s {
  47. return nil, nil
  48. }
  49. return buf, nil
  50. }