broker.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. /*
  2. Package protocol provides the low-level primitives necessary for communicating with a Kafka 0.8 cluster.
  3. The core of the package is the Broker. It represents a connection to a single Kafka broker service, and
  4. has methods for querying the broker.
  5. The other types are mostly Request types or Response types. Most of the Broker methods take a Request of a
  6. specific type and return a Response of the appropriate type.
  7. The objects and properties in this package are mostly undocumented, as they line up exactly with the
  8. protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
  9. */
  10. package protocol
  11. import enc "sarama/encoding"
  12. import "sarama/types"
  13. import (
  14. "io"
  15. "net"
  16. "sync"
  17. )
  18. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  19. type Broker struct {
  20. id int32
  21. host string
  22. port int32
  23. correlation_id int32
  24. conn net.Conn
  25. lock sync.Mutex
  26. responses chan responsePromise
  27. done chan bool
  28. }
  29. type responsePromise struct {
  30. correlation_id int32
  31. packets chan []byte
  32. errors chan error
  33. }
  34. // NewBroker creates and returns a Broker targetting the given host:port address.
  35. // This does not attempt to actually connect, you have to call Connect() for that.
  36. func NewBroker(host string, port int32) *Broker {
  37. b := new(Broker)
  38. b.id = -1 // don't know it yet
  39. b.host = host
  40. b.port = port
  41. return b
  42. }
  43. func (b *Broker) Connect() error {
  44. b.lock.Lock()
  45. defer b.lock.Unlock()
  46. if b.conn != nil {
  47. return AlreadyConnected
  48. }
  49. addr, err := net.ResolveIPAddr("ip", b.host)
  50. if err != nil {
  51. return err
  52. }
  53. b.conn, err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port), Zone: addr.Zone})
  54. if err != nil {
  55. return err
  56. }
  57. b.done = make(chan bool)
  58. // permit a few outstanding requests before we block waiting for responses
  59. b.responses = make(chan responsePromise, 4)
  60. go b.responseReceiver()
  61. return nil
  62. }
  63. func (b *Broker) Close() error {
  64. b.lock.Lock()
  65. defer b.lock.Unlock()
  66. if b.conn == nil {
  67. return NotConnected
  68. }
  69. close(b.responses)
  70. <-b.done
  71. err := b.conn.Close()
  72. b.conn = nil
  73. b.done = nil
  74. b.responses = nil
  75. return err
  76. }
  77. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  78. func (b *Broker) ID() int32 {
  79. return b.id
  80. }
  81. // Equals compares two brokers. Two brokers are considered equal if they have the same host, port, and id,
  82. // or if they are both nil.
  83. func (b *Broker) Equals(a *Broker) bool {
  84. switch {
  85. case a == nil && b == nil:
  86. return true
  87. case (a == nil && b != nil) || (a != nil && b == nil):
  88. return false
  89. }
  90. return a.id == b.id && a.host == b.host && a.port == b.port
  91. }
  92. func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error) {
  93. response := new(MetadataResponse)
  94. err := b.sendAndReceive(clientID, request, response)
  95. if err != nil {
  96. return nil, err
  97. }
  98. return response, nil
  99. }
  100. func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*OffsetResponse, error) {
  101. response := new(OffsetResponse)
  102. err := b.sendAndReceive(clientID, request, response)
  103. if err != nil {
  104. return nil, err
  105. }
  106. return response, nil
  107. }
  108. func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResponse, error) {
  109. var response *ProduceResponse
  110. var err error
  111. if request.RequiredAcks == types.NO_RESPONSE {
  112. err = b.sendAndReceive(clientID, request, nil)
  113. } else {
  114. response = new(ProduceResponse)
  115. err = b.sendAndReceive(clientID, request, response)
  116. }
  117. if err != nil {
  118. return nil, err
  119. }
  120. return response, nil
  121. }
  122. func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse, error) {
  123. response := new(FetchResponse)
  124. err := b.sendAndReceive(clientID, request, response)
  125. if err != nil {
  126. return nil, err
  127. }
  128. return response, nil
  129. }
  130. func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  131. response := new(OffsetCommitResponse)
  132. err := b.sendAndReceive(clientID, request, response)
  133. if err != nil {
  134. return nil, err
  135. }
  136. return response, nil
  137. }
  138. func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  139. response := new(OffsetFetchResponse)
  140. err := b.sendAndReceive(clientID, request, response)
  141. if err != nil {
  142. return nil, err
  143. }
  144. return response, nil
  145. }
  146. func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool) (*responsePromise, error) {
  147. b.lock.Lock()
  148. defer b.lock.Unlock()
  149. if b.conn == nil {
  150. return nil, NotConnected
  151. }
  152. fullRequest := request{b.correlation_id, clientID, req}
  153. buf, err := enc.Encode(&fullRequest)
  154. if err != nil {
  155. return nil, err
  156. }
  157. _, err = b.conn.Write(buf)
  158. if err != nil {
  159. return nil, err
  160. }
  161. b.correlation_id++
  162. if !promiseResponse {
  163. return nil, nil
  164. }
  165. promise := responsePromise{fullRequest.correlation_id, make(chan []byte), make(chan error)}
  166. b.responses <- promise
  167. return &promise, nil
  168. }
  169. func (b *Broker) sendAndReceive(clientID string, req requestEncoder, res enc.Decoder) error {
  170. promise, err := b.send(clientID, req, res != nil)
  171. if err != nil {
  172. return err
  173. }
  174. if promise == nil {
  175. return nil
  176. }
  177. select {
  178. case buf := <-promise.packets:
  179. return enc.Decode(buf, res)
  180. case err = <-promise.errors:
  181. return err
  182. }
  183. }
  184. func (b *Broker) Decode(pd enc.PacketDecoder) (err error) {
  185. b.id, err = pd.GetInt32()
  186. if err != nil {
  187. return err
  188. }
  189. b.host, err = pd.GetString()
  190. if err != nil {
  191. return err
  192. }
  193. b.port, err = pd.GetInt32()
  194. if err != nil {
  195. return err
  196. }
  197. return nil
  198. }
  199. func (b *Broker) responseReceiver() {
  200. header := make([]byte, 8)
  201. for response := range b.responses {
  202. _, err := io.ReadFull(b.conn, header)
  203. if err != nil {
  204. response.errors <- err
  205. continue
  206. }
  207. decodedHeader := responseHeader{}
  208. err = enc.Decode(header, &decodedHeader)
  209. if err != nil {
  210. response.errors <- err
  211. continue
  212. }
  213. if decodedHeader.correlation_id != response.correlation_id {
  214. response.errors <- enc.DecodingError
  215. continue
  216. }
  217. buf := make([]byte, decodedHeader.length-4)
  218. _, err = io.ReadFull(b.conn, buf)
  219. if err != nil {
  220. response.errors <- err
  221. continue
  222. }
  223. response.packets <- buf
  224. }
  225. close(b.done)
  226. }