consumer.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. package sarama
  2. // ConsumerConfig is used to pass multiple configuration options to NewConsumer.
  3. type ConsumerConfig struct {
  4. // The default (maximum) amount of data to fetch from the broker in each request. The default of 0 is treated as 1024 bytes.
  5. DefaultFetchSize int32
  6. // The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
  7. // The default of 0 is treated as 'at least one' to prevent the consumer from spinning when no messages are available.
  8. MinFetchSize int32
  9. // The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
  10. // treated as no limit.
  11. MaxMessageSize int32
  12. // The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
  13. // returns fewer than that anyways. The default of 0 is treated as no limit.
  14. MaxWaitTime int32
  15. }
  16. // Consumer processes Kafka messages from a given topic and partition.
  17. // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
  18. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  19. type Consumer struct {
  20. client *Client
  21. topic string
  22. partition int32
  23. group string
  24. config ConsumerConfig
  25. offset int64
  26. broker *Broker
  27. stopper, done chan bool
  28. messages chan *MessageBlock
  29. errors chan error
  30. }
  31. // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
  32. // part of the named consumer group.
  33. func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) {
  34. if config == nil {
  35. config = new(ConsumerConfig)
  36. }
  37. if config.DefaultFetchSize < 0 {
  38. return nil, ConfigurationError("Invalid DefaultFetchSize")
  39. } else if config.DefaultFetchSize == 0 {
  40. config.DefaultFetchSize = 1024
  41. }
  42. if config.MinFetchSize < 0 {
  43. return nil, ConfigurationError("Invalid MinFetchSize")
  44. } else if config.MinFetchSize == 0 {
  45. config.MinFetchSize = 1
  46. }
  47. if config.MaxMessageSize < 0 {
  48. return nil, ConfigurationError("Invalid MaxMessageSize")
  49. }
  50. if config.MaxWaitTime < 0 {
  51. return nil, ConfigurationError("Invalid MaxWaitTime")
  52. }
  53. broker, err := client.leader(topic, partition)
  54. if err != nil {
  55. return nil, err
  56. }
  57. c := new(Consumer)
  58. c.client = client
  59. c.topic = topic
  60. c.partition = partition
  61. c.group = group
  62. c.config = *config
  63. // We should really be sending an OffsetFetchRequest, but that doesn't seem to
  64. // work in kafka yet. Hopefully will in beta 2...
  65. c.offset = 0
  66. c.broker = broker
  67. c.stopper = make(chan bool)
  68. c.done = make(chan bool)
  69. c.messages = make(chan *MessageBlock)
  70. c.errors = make(chan error)
  71. go c.fetchMessages()
  72. return c, nil
  73. }
  74. // Errors returns the read channel for any errors that might be returned by the broker.
  75. func (c *Consumer) Errors() <-chan error {
  76. return c.errors
  77. }
  78. // Messages returns the read channel for all messages that will be returned by the broker.
  79. func (c *Consumer) Messages() <-chan *MessageBlock {
  80. return c.messages
  81. }
  82. // Close stops the consumer from fetching messages. It is required to call this function before
  83. // a consumer object passes out of scope, as it will otherwise leak memory. You must call this before
  84. // calling Close on the underlying client.
  85. func (c *Consumer) Close() error {
  86. close(c.stopper)
  87. <-c.done
  88. return nil
  89. }
  90. // helper function for safely sending an error on the errors channel
  91. // if it returns true, the error was sent (or was nil)
  92. // if it returns false, the stopper channel signaled that your goroutine should return!
  93. func (c *Consumer) sendError(err error) bool {
  94. if err == nil {
  95. return true
  96. }
  97. select {
  98. case <-c.stopper:
  99. close(c.messages)
  100. close(c.errors)
  101. close(c.done)
  102. return false
  103. case c.errors <- err:
  104. return true
  105. }
  106. return true
  107. }
  108. func (c *Consumer) fetchMessages() {
  109. var fetchSize int32 = c.config.DefaultFetchSize
  110. for {
  111. request := new(FetchRequest)
  112. request.MinBytes = c.config.MinFetchSize
  113. request.MaxWaitTime = c.config.MaxWaitTime
  114. request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
  115. response, err := c.broker.Fetch(c.client.id, request)
  116. switch {
  117. case err == nil:
  118. break
  119. case err == EncodingError:
  120. if c.sendError(err) {
  121. continue
  122. } else {
  123. return
  124. }
  125. default:
  126. c.client.disconnectBroker(c.broker)
  127. for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
  128. if !c.sendError(err) {
  129. return
  130. }
  131. }
  132. }
  133. block := response.GetBlock(c.topic, c.partition)
  134. if block == nil {
  135. if c.sendError(IncompleteResponse) {
  136. continue
  137. } else {
  138. return
  139. }
  140. }
  141. switch block.Err {
  142. case NO_ERROR:
  143. break
  144. case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION, LEADER_NOT_AVAILABLE:
  145. err = c.client.refreshTopic(c.topic)
  146. if c.sendError(err) {
  147. for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
  148. if !c.sendError(err) {
  149. return
  150. }
  151. }
  152. continue
  153. } else {
  154. return
  155. }
  156. default:
  157. if c.sendError(block.Err) {
  158. continue
  159. } else {
  160. return
  161. }
  162. }
  163. if len(block.MsgSet.Messages) == 0 {
  164. // We got no messages. If we got a trailing one then we need to ask for more data.
  165. // Otherwise we just poll again and wait for one to be produced...
  166. if block.MsgSet.PartialTrailingMessage {
  167. if c.config.MaxMessageSize == 0 {
  168. fetchSize *= 2
  169. } else {
  170. if fetchSize == c.config.MaxMessageSize {
  171. if c.sendError(MessageTooLarge) {
  172. continue
  173. } else {
  174. return
  175. }
  176. } else {
  177. fetchSize *= 2
  178. if fetchSize > c.config.MaxMessageSize {
  179. fetchSize = c.config.MaxMessageSize
  180. }
  181. }
  182. }
  183. }
  184. select {
  185. case <-c.stopper:
  186. close(c.messages)
  187. close(c.errors)
  188. close(c.done)
  189. return
  190. default:
  191. continue
  192. }
  193. } else {
  194. fetchSize = c.config.DefaultFetchSize
  195. }
  196. for _, msgBlock := range block.MsgSet.Messages {
  197. select {
  198. case <-c.stopper:
  199. close(c.messages)
  200. close(c.errors)
  201. close(c.done)
  202. return
  203. case c.messages <- msgBlock:
  204. c.offset++
  205. }
  206. }
  207. }
  208. }