consumer.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package kafka
  2. import k "sarama/protocol"
  3. import (
  4. "sarama/encoding"
  5. "sarama/types"
  6. )
  7. // Consumer processes Kafka messages from a given topic and partition.
  8. // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
  9. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  10. type Consumer struct {
  11. client *Client
  12. topic string
  13. partition int32
  14. group string
  15. offset int64
  16. broker *k.Broker
  17. stopper, done chan bool
  18. messages chan *Message
  19. errors chan error
  20. }
  21. // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
  22. // part of the named consumer group.
  23. func NewConsumer(client *Client, topic string, partition int32, group string) (*Consumer, error) {
  24. broker, err := client.leader(topic, partition)
  25. if err != nil {
  26. return nil, err
  27. }
  28. c := new(Consumer)
  29. c.client = client
  30. c.topic = topic
  31. c.partition = partition
  32. c.group = group
  33. // We should really be sending an OffsetFetchRequest, but that doesn't seem to
  34. // work in kafka yet. Hopefully will in beta 2...
  35. c.offset = 0
  36. c.broker = broker
  37. c.stopper = make(chan bool)
  38. c.done = make(chan bool)
  39. c.messages = make(chan *Message)
  40. c.errors = make(chan error)
  41. go c.fetchMessages()
  42. return c, nil
  43. }
  44. // Errors returns the read channel for any errors that might be returned by the broker.
  45. func (c *Consumer) Errors() <-chan error {
  46. return c.errors
  47. }
  48. // Messages returns the read channel for all messages that will be returned by the broker.
  49. func (c *Consumer) Messages() <-chan *Message {
  50. return c.messages
  51. }
  52. // Close stops the consumer from fetching messages. It is required to call this function before
  53. // a consumer object passes out of scope, as it will otherwise leak memory. You must call this before
  54. // calling Close on the underlying client.
  55. func (c *Consumer) Close() {
  56. close(c.stopper)
  57. <-c.done
  58. }
  59. // helper function for safely sending an error on the errors channel
  60. // if it returns true, the error was sent (or was nil)
  61. // if it returns false, the stopper channel signaled that your goroutine should return!
  62. func (c *Consumer) sendError(err error) bool {
  63. if err == nil {
  64. return true
  65. }
  66. select {
  67. case <-c.stopper:
  68. close(c.messages)
  69. close(c.errors)
  70. close(c.done)
  71. return false
  72. case c.errors <- err:
  73. return true
  74. }
  75. }
  76. func (c *Consumer) fetchMessages() {
  77. var fetchSize int32 = 1024
  78. for {
  79. request := new(k.FetchRequest)
  80. request.MinBytes = 1
  81. request.MaxWaitTime = 1000
  82. request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
  83. response, err := c.broker.Fetch(c.client.id, request)
  84. switch {
  85. case err == nil:
  86. break
  87. case err == encoding.EncodingError:
  88. if c.sendError(err) {
  89. continue
  90. } else {
  91. return
  92. }
  93. default:
  94. c.client.disconnectBroker(c.broker)
  95. for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
  96. if !c.sendError(err) {
  97. return
  98. }
  99. }
  100. }
  101. block := response.GetBlock(c.topic, c.partition)
  102. if block == nil {
  103. if c.sendError(IncompleteResponse) {
  104. continue
  105. } else {
  106. return
  107. }
  108. }
  109. switch block.Err {
  110. case types.NO_ERROR:
  111. break
  112. case types.UNKNOWN_TOPIC_OR_PARTITION, types.NOT_LEADER_FOR_PARTITION, types.LEADER_NOT_AVAILABLE:
  113. err = c.client.refreshTopic(c.topic)
  114. if c.sendError(err) {
  115. continue
  116. } else {
  117. return
  118. }
  119. default:
  120. if c.sendError(block.Err) {
  121. continue
  122. } else {
  123. return
  124. }
  125. }
  126. if len(block.MsgSet.Messages) == 0 {
  127. // We got no messages. If we got a trailing one then we need to ask for more data.
  128. // Otherwise we just poll again and wait for one to be produced...
  129. if block.MsgSet.PartialTrailingMessage {
  130. fetchSize *= 2
  131. }
  132. select {
  133. case <-c.stopper:
  134. close(c.messages)
  135. close(c.errors)
  136. close(c.done)
  137. return
  138. default:
  139. continue
  140. }
  141. }
  142. for _, msgBlock := range block.MsgSet.Messages {
  143. // smoosh the kafka return data into a more useful single struct
  144. msg := new(Message)
  145. msg.Offset = msgBlock.Offset
  146. msg.Key = msgBlock.Msg.Key
  147. msg.Value = msgBlock.Msg.Value
  148. // and send it
  149. select {
  150. case <-c.stopper:
  151. close(c.messages)
  152. close(c.errors)
  153. close(c.done)
  154. return
  155. case c.messages <- msg:
  156. c.offset++
  157. }
  158. }
  159. }
  160. }