consumer.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package sarama
  2. // ConsumerConfig is used to pass multiple configuration options to NewConsumer.
  3. type ConsumerConfig struct {
  4. // The default (maximum) amount of data to fetch from the broker in each request. The default of 0 is treated as 1024 bytes.
  5. DefaultFetchSize int32
  6. // The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
  7. // The default of 0 is treated as 'at least one' to prevent the consumer from spinning when no messages are available.
  8. MinFetchSize int32
  9. // The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
  10. // treated as no limit.
  11. MaxMessageSize int32
  12. // The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
  13. // returns fewer than that anyways. The default of 0 is treated as no limit.
  14. MaxWaitTime int32
  15. // The offset to start fetching messages from
  16. StartingOffset int64
  17. }
  18. // Consumer processes Kafka messages from a given topic and partition.
  19. // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
  20. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  21. type Consumer struct {
  22. client *Client
  23. topic string
  24. partition int32
  25. group string
  26. config ConsumerConfig
  27. offset int64
  28. broker *Broker
  29. stopper, done chan bool
  30. messages chan *MessageBlock
  31. errors chan error
  32. }
  33. // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
  34. // part of the named consumer group.
  35. func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) {
  36. if config == nil {
  37. config = new(ConsumerConfig)
  38. }
  39. if config.DefaultFetchSize < 0 {
  40. return nil, ConfigurationError("Invalid DefaultFetchSize")
  41. } else if config.DefaultFetchSize == 0 {
  42. config.DefaultFetchSize = 1024
  43. }
  44. if config.MinFetchSize < 0 {
  45. return nil, ConfigurationError("Invalid MinFetchSize")
  46. } else if config.MinFetchSize == 0 {
  47. config.MinFetchSize = 1
  48. }
  49. if config.MaxMessageSize < 0 {
  50. return nil, ConfigurationError("Invalid MaxMessageSize")
  51. }
  52. if config.MaxWaitTime < 0 {
  53. return nil, ConfigurationError("Invalid MaxWaitTime")
  54. }
  55. if config.StartingOffset < 0 {
  56. return nil, ConfigurationError("Invalid StartingOffset")
  57. }
  58. broker, err := client.leader(topic, partition)
  59. if err != nil {
  60. return nil, err
  61. }
  62. c := new(Consumer)
  63. c.client = client
  64. c.topic = topic
  65. c.partition = partition
  66. c.group = group
  67. c.config = *config
  68. c.offset = config.StartingOffset
  69. c.broker = broker
  70. c.stopper = make(chan bool)
  71. c.done = make(chan bool)
  72. c.messages = make(chan *MessageBlock)
  73. c.errors = make(chan error)
  74. go c.fetchMessages()
  75. return c, nil
  76. }
  77. // Errors returns the read channel for any errors that might be returned by the broker.
  78. func (c *Consumer) Errors() <-chan error {
  79. return c.errors
  80. }
  81. // Messages returns the read channel for all messages that will be returned by the broker.
  82. func (c *Consumer) Messages() <-chan *MessageBlock {
  83. return c.messages
  84. }
  85. // Close stops the consumer from fetching messages. It is required to call this function before
  86. // a consumer object passes out of scope, as it will otherwise leak memory. You must call this before
  87. // calling Close on the underlying client.
  88. func (c *Consumer) Close() error {
  89. close(c.stopper)
  90. <-c.done
  91. return nil
  92. }
  93. // helper function for safely sending an error on the errors channel
  94. // if it returns true, the error was sent (or was nil)
  95. // if it returns false, the stopper channel signaled that your goroutine should return!
  96. func (c *Consumer) sendError(err error) bool {
  97. if err == nil {
  98. return true
  99. }
  100. select {
  101. case <-c.stopper:
  102. close(c.messages)
  103. close(c.errors)
  104. close(c.done)
  105. return false
  106. case c.errors <- err:
  107. return true
  108. }
  109. return true
  110. }
  111. func (c *Consumer) fetchMessages() {
  112. var fetchSize int32 = c.config.DefaultFetchSize
  113. for {
  114. request := new(FetchRequest)
  115. request.MinBytes = c.config.MinFetchSize
  116. request.MaxWaitTime = c.config.MaxWaitTime
  117. request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
  118. response, err := c.broker.Fetch(c.client.id, request)
  119. switch {
  120. case err == nil:
  121. break
  122. case err == EncodingError:
  123. if c.sendError(err) {
  124. continue
  125. } else {
  126. return
  127. }
  128. default:
  129. c.client.disconnectBroker(c.broker)
  130. for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
  131. if !c.sendError(err) {
  132. return
  133. }
  134. }
  135. }
  136. block := response.GetBlock(c.topic, c.partition)
  137. if block == nil {
  138. if c.sendError(IncompleteResponse) {
  139. continue
  140. } else {
  141. return
  142. }
  143. }
  144. switch block.Err {
  145. case NO_ERROR:
  146. break
  147. case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION, LEADER_NOT_AVAILABLE:
  148. err = c.client.refreshTopic(c.topic)
  149. if c.sendError(err) {
  150. for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
  151. if !c.sendError(err) {
  152. return
  153. }
  154. }
  155. continue
  156. } else {
  157. return
  158. }
  159. default:
  160. if c.sendError(block.Err) {
  161. continue
  162. } else {
  163. return
  164. }
  165. }
  166. if len(block.MsgSet.Messages) == 0 {
  167. // We got no messages. If we got a trailing one then we need to ask for more data.
  168. // Otherwise we just poll again and wait for one to be produced...
  169. if block.MsgSet.PartialTrailingMessage {
  170. if c.config.MaxMessageSize == 0 {
  171. fetchSize *= 2
  172. } else {
  173. if fetchSize == c.config.MaxMessageSize {
  174. if c.sendError(MessageTooLarge) {
  175. continue
  176. } else {
  177. return
  178. }
  179. } else {
  180. fetchSize *= 2
  181. if fetchSize > c.config.MaxMessageSize {
  182. fetchSize = c.config.MaxMessageSize
  183. }
  184. }
  185. }
  186. }
  187. select {
  188. case <-c.stopper:
  189. close(c.messages)
  190. close(c.errors)
  191. close(c.done)
  192. return
  193. default:
  194. continue
  195. }
  196. } else {
  197. fetchSize = c.config.DefaultFetchSize
  198. }
  199. for _, msgBlock := range block.MsgSet.Messages {
  200. select {
  201. case <-c.stopper:
  202. close(c.messages)
  203. close(c.errors)
  204. close(c.done)
  205. return
  206. case c.messages <- msgBlock:
  207. c.offset++
  208. }
  209. }
  210. }
  211. }