consumer.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package sarama
  2. // ConsumerConfig is used to pass multiple configuration options to NewConsumer.
  3. type ConsumerConfig struct {
  4. // The default (maximum) amount of data to fetch from the broker in each request. The default of 0 is treated as 1024 bytes.
  5. DefaultFetchSize int32
  6. // The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
  7. // The default of 0 is treated as 'at least one' to prevent the consumer from spinning when no messages are available.
  8. MinFetchSize int32
  9. // The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
  10. // treated as no limit.
  11. MaxMessageSize int32
  12. // The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
  13. // returns fewer than that anyways. The default of 0 is treated as no limit.
  14. MaxWaitTime int32
  15. // The offset to start fetching messages from
  16. StartingOffset int64
  17. }
  18. // ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
  19. // a message (in which case Err is nil and the other fields are all set).
  20. type ConsumerEvent struct {
  21. Key, Value []byte
  22. Offset int64
  23. Err error
  24. }
  25. // Consumer processes Kafka messages from a given topic and partition.
  26. // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
  27. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  28. type Consumer struct {
  29. client *Client
  30. topic string
  31. partition int32
  32. group string
  33. config ConsumerConfig
  34. offset int64
  35. broker *Broker
  36. stopper, done chan bool
  37. events chan *ConsumerEvent
  38. }
  39. // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
  40. // part of the named consumer group.
  41. func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) {
  42. if config == nil {
  43. config = new(ConsumerConfig)
  44. }
  45. if config.DefaultFetchSize < 0 {
  46. return nil, ConfigurationError("Invalid DefaultFetchSize")
  47. } else if config.DefaultFetchSize == 0 {
  48. config.DefaultFetchSize = 1024
  49. }
  50. if config.MinFetchSize < 0 {
  51. return nil, ConfigurationError("Invalid MinFetchSize")
  52. } else if config.MinFetchSize == 0 {
  53. config.MinFetchSize = 1
  54. }
  55. if config.MaxMessageSize < 0 {
  56. return nil, ConfigurationError("Invalid MaxMessageSize")
  57. }
  58. if config.MaxWaitTime < 0 {
  59. return nil, ConfigurationError("Invalid MaxWaitTime")
  60. }
  61. if config.StartingOffset < 0 {
  62. return nil, ConfigurationError("Invalid StartingOffset")
  63. }
  64. broker, err := client.leader(topic, partition)
  65. if err != nil {
  66. return nil, err
  67. }
  68. c := new(Consumer)
  69. c.client = client
  70. c.topic = topic
  71. c.partition = partition
  72. c.group = group
  73. c.config = *config
  74. c.offset = config.StartingOffset
  75. c.broker = broker
  76. c.stopper = make(chan bool)
  77. c.done = make(chan bool)
  78. c.events = make(chan *ConsumerEvent)
  79. go c.fetchMessages()
  80. return c, nil
  81. }
  82. // Events returns the read channel for any events (messages or errors) that might be returned by the broker.
  83. func (c *Consumer) Events() <-chan *ConsumerEvent {
  84. return c.events
  85. }
  86. // Close stops the consumer from fetching messages. It is required to call this function before
  87. // a consumer object passes out of scope, as it will otherwise leak memory. You must call this before
  88. // calling Close on the underlying client.
  89. func (c *Consumer) Close() error {
  90. close(c.stopper)
  91. <-c.done
  92. return nil
  93. }
  94. // helper function for safely sending an error on the errors channel
  95. // if it returns true, the error was sent (or was nil)
  96. // if it returns false, the stopper channel signaled that your goroutine should return!
  97. func (c *Consumer) sendError(err error) bool {
  98. if err == nil {
  99. return true
  100. }
  101. select {
  102. case <-c.stopper:
  103. close(c.events)
  104. close(c.done)
  105. return false
  106. case c.events <- &ConsumerEvent{Err: err}:
  107. return true
  108. }
  109. return true
  110. }
  111. func (c *Consumer) fetchMessages() {
  112. var fetchSize int32 = c.config.DefaultFetchSize
  113. for {
  114. request := new(FetchRequest)
  115. request.MinBytes = c.config.MinFetchSize
  116. request.MaxWaitTime = c.config.MaxWaitTime
  117. request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
  118. response, err := c.broker.Fetch(c.client.id, request)
  119. switch {
  120. case err == nil:
  121. break
  122. case err == EncodingError:
  123. if c.sendError(err) {
  124. continue
  125. } else {
  126. return
  127. }
  128. default:
  129. c.client.disconnectBroker(c.broker)
  130. for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
  131. if !c.sendError(err) {
  132. return
  133. }
  134. }
  135. continue
  136. }
  137. block := response.GetBlock(c.topic, c.partition)
  138. if block == nil {
  139. if c.sendError(IncompleteResponse) {
  140. continue
  141. } else {
  142. return
  143. }
  144. }
  145. switch block.Err {
  146. case NO_ERROR:
  147. break
  148. case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION, LEADER_NOT_AVAILABLE:
  149. err = c.client.refreshTopic(c.topic)
  150. if c.sendError(err) {
  151. for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
  152. if !c.sendError(err) {
  153. return
  154. }
  155. }
  156. continue
  157. } else {
  158. return
  159. }
  160. default:
  161. if c.sendError(block.Err) {
  162. continue
  163. } else {
  164. return
  165. }
  166. }
  167. if len(block.MsgSet.Messages) == 0 {
  168. // We got no messages. If we got a trailing one then we need to ask for more data.
  169. // Otherwise we just poll again and wait for one to be produced...
  170. if block.MsgSet.PartialTrailingMessage {
  171. if c.config.MaxMessageSize == 0 {
  172. fetchSize *= 2
  173. } else {
  174. if fetchSize == c.config.MaxMessageSize {
  175. if c.sendError(MessageTooLarge) {
  176. continue
  177. } else {
  178. return
  179. }
  180. } else {
  181. fetchSize *= 2
  182. if fetchSize > c.config.MaxMessageSize {
  183. fetchSize = c.config.MaxMessageSize
  184. }
  185. }
  186. }
  187. }
  188. select {
  189. case <-c.stopper:
  190. close(c.events)
  191. close(c.done)
  192. return
  193. default:
  194. continue
  195. }
  196. } else {
  197. fetchSize = c.config.DefaultFetchSize
  198. }
  199. for _, msgBlock := range block.MsgSet.Messages {
  200. select {
  201. case <-c.stopper:
  202. close(c.events)
  203. close(c.done)
  204. return
  205. case c.events <- &ConsumerEvent{Key: msgBlock.Msg.Key, Value: msgBlock.Msg.Value, Offset: msgBlock.Offset}:
  206. c.offset++
  207. }
  208. }
  209. }
  210. }