client.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package kafka
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "math"
  6. "net"
  7. )
  8. type API struct {
  9. key int16
  10. version int16
  11. }
  12. var (
  13. REQUEST_PRODUCE = API{0, 0}
  14. REQUEST_FETCH = API{1, 0}
  15. REQUEST_OFFSET = API{2, 0}
  16. REQUEST_METADATA = API{3, 0}
  17. REQUEST_LEADER_AND_ISR = API{4, 0}
  18. REQUEST_STOP_REPLICA = API{5, 0}
  19. REQUEST_OFFSET_COMMIT = API{6, 0}
  20. REQUEST_OFFSET_FETCH = API{7, 0}
  21. )
  22. type Client struct {
  23. addr string
  24. id *string
  25. correlation_id int32
  26. conn net.Conn
  27. requests chan reqResPair
  28. responses chan reqResPair
  29. }
  30. type reqResPair struct {
  31. correlation_id int32
  32. packets chan []byte
  33. }
  34. func NewClient(addr string) (client *Client, err error) {
  35. conn, err := net.Dial("tcp", addr)
  36. if err != nil {
  37. return nil, err
  38. }
  39. client = &Client{addr: addr, conn: conn, requests: make(chan reqResPair), responses: make(chan reqResPair)}
  40. go client.sendRequestLoop()
  41. go client.rcvResponseLoop()
  42. return client, err
  43. }
  44. func (client *Client) sendRequestLoop() {
  45. var request reqResPair
  46. var n int
  47. var err error
  48. var buf []byte
  49. for {
  50. request = <-client.requests
  51. buf = <-request.packets
  52. n, err = client.conn.Write(buf)
  53. if err != nil || n != len(buf) {
  54. close(client.requests)
  55. return
  56. }
  57. client.responses <- request
  58. }
  59. }
  60. func (client *Client) rcvResponseLoop() {
  61. var response reqResPair
  62. var n int
  63. var length int32
  64. var err error
  65. var buf []byte
  66. header := make([]byte, 4)
  67. for {
  68. response = <-client.responses
  69. n, err = client.conn.Read(header)
  70. if err != nil || n != 4 {
  71. close(client.responses)
  72. return
  73. }
  74. length = int32(binary.BigEndian.Uint32(header))
  75. if length <= 4 || length > 2*math.MaxUint16 {
  76. close(client.responses)
  77. return
  78. }
  79. n, err = client.conn.Read(header)
  80. if err != nil || n != 4 {
  81. close(client.responses)
  82. return
  83. }
  84. if response.correlation_id != int32(binary.BigEndian.Uint32(header)) {
  85. close(client.responses)
  86. return
  87. }
  88. buf = make([]byte, length-4)
  89. n, err = client.conn.Read(buf)
  90. if err != nil || n != int(length-4) {
  91. close(client.responses)
  92. return
  93. }
  94. response.packets <- buf
  95. close(response.packets)
  96. }
  97. }
  98. func (client *Client) encode(api API, body []byte, pe packetEncoder) {
  99. pe.putInt32(int32(len(body)))
  100. pe.putInt16(api.key)
  101. pe.putInt16(api.version)
  102. pe.putInt32(client.correlation_id)
  103. pe.putString(client.id)
  104. //pe.putRaw(body)
  105. }
  106. func (client *Client) sendRequest(api API, body encoder) (chan []byte, error) {
  107. var prepEnc prepEncoder
  108. var realEnc realEncoder
  109. req := request{api, client.correlation_id, client.id, body}
  110. req.encode(&prepEnc)
  111. if prepEnc.err {
  112. return nil, errors.New("kafka encoding error")
  113. }
  114. realEnc.raw = make([]byte, prepEnc.length+4)
  115. realEnc.putInt32(int32(prepEnc.length))
  116. req.encode(&realEnc)
  117. // we buffer one packet so that we can send our packet to the request queue without
  118. // blocking, and so that the responses can be sent to us async if we want them
  119. request := reqResPair{client.correlation_id, make(chan []byte, 1)}
  120. request.packets <- realEnc.raw
  121. client.requests <- request
  122. client.correlation_id++
  123. return request.packets, nil
  124. }