consumer.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package kafka
  2. import k "sarama/protocol"
  3. // Consumer processes Kafka messages from a given topic and partition.
  4. // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
  5. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  6. type Consumer struct {
  7. client *Client
  8. topic string
  9. partition int32
  10. group string
  11. offset int64
  12. broker *k.Broker
  13. stopper, done chan bool
  14. messages chan *Message
  15. errors chan error
  16. }
  17. // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
  18. // part of the named consumer group.
  19. func NewConsumer(client *Client, topic string, partition int32, group string) (*Consumer, error) {
  20. broker, err := client.leader(topic, partition)
  21. if err != nil {
  22. return nil, err
  23. }
  24. c := new(Consumer)
  25. c.client = client
  26. c.topic = topic
  27. c.partition = partition
  28. c.group = group
  29. // We should really be sending an OffsetFetchRequest, but that doesn't seem to
  30. // work in kafka yet. Hopefully will in beta 2...
  31. c.offset = 0
  32. c.broker = broker
  33. c.stopper = make(chan bool)
  34. c.done = make(chan bool)
  35. c.messages = make(chan *Message)
  36. c.errors = make(chan error)
  37. go c.fetchMessages()
  38. return c, nil
  39. }
  40. // Errors returns the read channel for any errors that might be returned by the broker.
  41. func (c *Consumer) Errors() <-chan error {
  42. return c.errors
  43. }
  44. // Messages returns the read channel for all messages that will be returned by the broker.
  45. func (c *Consumer) Messages() <-chan *Message {
  46. return c.messages
  47. }
  48. // Close stops the consumer from fetching messages. It is required to call this function before
  49. // a consumer object passes out of scope, as it will otherwise leak memory. You must call this before
  50. // calling Close on the underlying client.
  51. func (c *Consumer) Close() {
  52. close(c.stopper)
  53. <-c.done
  54. }
  55. // helper function for safely sending an error on the errors channel
  56. // if it returns true, the error was sent (or was nil)
  57. // if it returns false, the stopper channel signaled that your goroutine should return!
  58. func (c *Consumer) sendError(err error) bool {
  59. if err == nil {
  60. return true
  61. }
  62. select {
  63. case <-c.stopper:
  64. close(c.messages)
  65. close(c.errors)
  66. close(c.done)
  67. return false
  68. case c.errors <- err:
  69. return true
  70. }
  71. }
  72. func (c *Consumer) fetchMessages() {
  73. var fetchSize int32 = 1024
  74. for {
  75. request := new(k.FetchRequest)
  76. request.MinBytes = 1
  77. request.MaxWaitTime = 1000
  78. request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
  79. response, err := c.broker.Fetch(c.client.id, request)
  80. switch err.(type) {
  81. case k.EncodingError:
  82. if c.sendError(err) {
  83. continue
  84. } else {
  85. return
  86. }
  87. case nil:
  88. break
  89. default:
  90. c.client.disconnectBroker(c.broker)
  91. c.broker, err = c.client.leader(c.topic, c.partition)
  92. if c.sendError(err) {
  93. continue
  94. } else {
  95. return
  96. }
  97. }
  98. block := response.GetBlock(c.topic, c.partition)
  99. if block == nil {
  100. if c.sendError(IncompleteResponse) {
  101. continue
  102. } else {
  103. return
  104. }
  105. }
  106. switch block.Err {
  107. case k.NO_ERROR:
  108. break
  109. case k.UNKNOWN_TOPIC_OR_PARTITION, k.NOT_LEADER_FOR_PARTITION, k.LEADER_NOT_AVAILABLE:
  110. err = c.client.refreshTopic(c.topic)
  111. if c.sendError(err) {
  112. continue
  113. } else {
  114. return
  115. }
  116. default:
  117. if c.sendError(block.Err) {
  118. continue
  119. } else {
  120. return
  121. }
  122. }
  123. if len(block.MsgSet.Messages) == 0 {
  124. // We got no messages. If we got a trailing one then we need to ask for more data.
  125. // Otherwise we just poll again and wait for one to be produced...
  126. if block.MsgSet.PartialTrailingMessage {
  127. fetchSize *= 2
  128. }
  129. select {
  130. case <-c.stopper:
  131. close(c.messages)
  132. close(c.errors)
  133. close(c.done)
  134. return
  135. default:
  136. continue
  137. }
  138. }
  139. for _, msgBlock := range block.MsgSet.Messages {
  140. // smoosh the kafka return data into a more useful single struct
  141. msg := new(Message)
  142. msg.Offset = msgBlock.Offset
  143. msg.Key = msgBlock.Msg.Key
  144. msg.Value = msgBlock.Msg.Value
  145. // and send it
  146. select {
  147. case <-c.stopper:
  148. close(c.messages)
  149. close(c.errors)
  150. close(c.done)
  151. return
  152. case c.messages <- msg:
  153. c.offset++
  154. }
  155. }
  156. }
  157. }