broker.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package protocol
  2. import enc "sarama/encoding"
  3. import "sarama/types"
  4. import (
  5. "io"
  6. "net"
  7. "sync"
  8. )
  9. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  10. type Broker struct {
  11. id int32
  12. host string
  13. port int32
  14. correlation_id int32
  15. conn net.Conn
  16. lock sync.Mutex
  17. responses chan responsePromise
  18. done chan bool
  19. }
  20. type responsePromise struct {
  21. correlation_id int32
  22. packets chan []byte
  23. errors chan error
  24. }
  25. // NewBroker creates and returns a Broker targetting the given host:port address.
  26. // This does not attempt to actually connect, you have to call Connect() for that.
  27. func NewBroker(host string, port int32) *Broker {
  28. b := new(Broker)
  29. b.id = -1 // don't know it yet
  30. b.host = host
  31. b.port = port
  32. return b
  33. }
  34. func (b *Broker) Connect() error {
  35. b.lock.Lock()
  36. defer b.lock.Unlock()
  37. if b.conn != nil {
  38. return AlreadyConnected
  39. }
  40. addr, err := net.ResolveIPAddr("ip", b.host)
  41. if err != nil {
  42. return err
  43. }
  44. b.conn, err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port), Zone: addr.Zone})
  45. if err != nil {
  46. return err
  47. }
  48. b.done = make(chan bool)
  49. // permit a few outstanding requests before we block waiting for responses
  50. b.responses = make(chan responsePromise, 4)
  51. go b.responseReceiver()
  52. return nil
  53. }
  54. func (b *Broker) Close() error {
  55. b.lock.Lock()
  56. defer b.lock.Unlock()
  57. if b.conn == nil {
  58. return NotConnected
  59. }
  60. close(b.responses)
  61. <-b.done
  62. err := b.conn.Close()
  63. b.conn = nil
  64. b.done = nil
  65. b.responses = nil
  66. return err
  67. }
  68. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  69. func (b *Broker) ID() int32 {
  70. return b.id
  71. }
  72. // Equals compares two brokers. Two brokers are considered equal if they have the same host, port, and id,
  73. // or if they are both nil.
  74. func (b *Broker) Equals(a *Broker) bool {
  75. switch {
  76. case a == nil && b == nil:
  77. return true
  78. case (a == nil && b != nil) || (a != nil && b == nil):
  79. return false
  80. }
  81. return a.id == b.id && a.host == b.host && a.port == b.port
  82. }
  83. func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error) {
  84. response := new(MetadataResponse)
  85. err := b.sendAndReceive(clientID, request, response)
  86. if err != nil {
  87. return nil, err
  88. }
  89. return response, nil
  90. }
  91. func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*OffsetResponse, error) {
  92. response := new(OffsetResponse)
  93. err := b.sendAndReceive(clientID, request, response)
  94. if err != nil {
  95. return nil, err
  96. }
  97. return response, nil
  98. }
  99. func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResponse, error) {
  100. var response *ProduceResponse
  101. var err error
  102. if request.RequiredAcks == types.NO_RESPONSE {
  103. err = b.sendAndReceive(clientID, request, nil)
  104. } else {
  105. response = new(ProduceResponse)
  106. err = b.sendAndReceive(clientID, request, response)
  107. }
  108. if err != nil {
  109. return nil, err
  110. }
  111. return response, nil
  112. }
  113. func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse, error) {
  114. response := new(FetchResponse)
  115. err := b.sendAndReceive(clientID, request, response)
  116. if err != nil {
  117. return nil, err
  118. }
  119. return response, nil
  120. }
  121. func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  122. response := new(OffsetCommitResponse)
  123. err := b.sendAndReceive(clientID, request, response)
  124. if err != nil {
  125. return nil, err
  126. }
  127. return response, nil
  128. }
  129. func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  130. response := new(OffsetFetchResponse)
  131. err := b.sendAndReceive(clientID, request, response)
  132. if err != nil {
  133. return nil, err
  134. }
  135. return response, nil
  136. }
  137. func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool) (*responsePromise, error) {
  138. b.lock.Lock()
  139. defer b.lock.Unlock()
  140. if b.conn == nil {
  141. return nil, NotConnected
  142. }
  143. fullRequest := request{b.correlation_id, clientID, req}
  144. buf, err := enc.Encode(&fullRequest)
  145. if err != nil {
  146. return nil, err
  147. }
  148. _, err = b.conn.Write(buf)
  149. if err != nil {
  150. return nil, err
  151. }
  152. b.correlation_id++
  153. if !promiseResponse {
  154. return nil, nil
  155. }
  156. promise := responsePromise{fullRequest.correlation_id, make(chan []byte), make(chan error)}
  157. b.responses <- promise
  158. return &promise, nil
  159. }
  160. func (b *Broker) sendAndReceive(clientID string, req requestEncoder, res enc.Decoder) error {
  161. promise, err := b.send(clientID, req, res != nil)
  162. if err != nil {
  163. return err
  164. }
  165. if promise == nil {
  166. return nil
  167. }
  168. select {
  169. case buf := <-promise.packets:
  170. return enc.Decode(buf, res)
  171. case err = <-promise.errors:
  172. return err
  173. }
  174. }
  175. func (b *Broker) Decode(pd enc.PacketDecoder) (err error) {
  176. b.id, err = pd.GetInt32()
  177. if err != nil {
  178. return err
  179. }
  180. b.host, err = pd.GetString()
  181. if err != nil {
  182. return err
  183. }
  184. b.port, err = pd.GetInt32()
  185. if err != nil {
  186. return err
  187. }
  188. return nil
  189. }
  190. func (b *Broker) responseReceiver() {
  191. header := make([]byte, 8)
  192. for response := range b.responses {
  193. _, err := io.ReadFull(b.conn, header)
  194. if err != nil {
  195. response.errors <- err
  196. continue
  197. }
  198. decodedHeader := responseHeader{}
  199. err = enc.Decode(header, &decodedHeader)
  200. if err != nil {
  201. response.errors <- err
  202. continue
  203. }
  204. if decodedHeader.correlation_id != response.correlation_id {
  205. response.errors <- enc.DecodingError
  206. continue
  207. }
  208. buf := make([]byte, decodedHeader.length-4)
  209. _, err = io.ReadFull(b.conn, buf)
  210. if err != nil {
  211. response.errors <- err
  212. continue
  213. }
  214. response.packets <- buf
  215. }
  216. close(b.done)
  217. }