consumer.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. package sarama
  2. // OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset.
  3. type OffsetMethod int
  4. const (
  5. // OffsetMethodManual causes the consumer to interpret the OffsetValue in the ConsumerConfig as the
  6. // offset at which to start, allowing the user to manually specify their desired starting offset.
  7. OffsetMethodManual OffsetMethod = iota
  8. // OffsetMethodNewest causes the consumer to start at the most recent available offset, as
  9. // determined by querying the broker.
  10. OffsetMethodNewest
  11. // OffsetMethodOldest causes the consumer to start at the oldest available offset, as
  12. // determined by querying the broker.
  13. OffsetMethodOldest
  14. )
  15. // ConsumerConfig is used to pass multiple configuration options to NewConsumer.
  16. type ConsumerConfig struct {
  17. // The default (maximum) amount of data to fetch from the broker in each request. The default of 0 is treated as 1024 bytes.
  18. DefaultFetchSize int32
  19. // The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
  20. // The default of 0 is treated as 'at least one' to prevent the consumer from spinning when no messages are available.
  21. MinFetchSize int32
  22. // The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
  23. // treated as no limit.
  24. MaxMessageSize int32
  25. // The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
  26. // returns fewer than that anyways. The default of 0 causes Kafka to return immediately, which is rarely desirable
  27. // as it causes the Consumer to spin when no events are available. 100-500ms is a reasonable range for most cases.
  28. MaxWaitTime int32
  29. // The method used to determine at which offset to begin consuming messages.
  30. OffsetMethod OffsetMethod
  31. // Interpreted differently according to the value of OffsetMethod.
  32. OffsetValue int64
  33. // The number of events to buffer in the Events channel. Setting this can let the
  34. // consumer continue fetching messages in the background while local code consumes events,
  35. // greatly improving throughput.
  36. EventBufferSize int
  37. }
  38. // ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
  39. // a message (in which case Err is nil and the other fields are all set).
  40. type ConsumerEvent struct {
  41. Key, Value []byte
  42. Topic string
  43. Partition int32
  44. Offset int64
  45. Err error
  46. }
  47. // Consumer processes Kafka messages from a given topic and partition.
  48. // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
  49. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  50. type Consumer struct {
  51. client *Client
  52. topic string
  53. partition int32
  54. group string
  55. config ConsumerConfig
  56. offset int64
  57. broker *Broker
  58. stopper, done chan bool
  59. events chan *ConsumerEvent
  60. }
  61. // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
  62. // part of the named consumer group.
  63. func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) {
  64. if config == nil {
  65. config = new(ConsumerConfig)
  66. }
  67. if config.DefaultFetchSize < 0 {
  68. return nil, ConfigurationError("Invalid DefaultFetchSize")
  69. } else if config.DefaultFetchSize == 0 {
  70. config.DefaultFetchSize = 1024
  71. }
  72. if config.MinFetchSize < 0 {
  73. return nil, ConfigurationError("Invalid MinFetchSize")
  74. } else if config.MinFetchSize == 0 {
  75. config.MinFetchSize = 1
  76. }
  77. if config.MaxMessageSize < 0 {
  78. return nil, ConfigurationError("Invalid MaxMessageSize")
  79. }
  80. if config.MaxWaitTime <= 0 {
  81. return nil, ConfigurationError("Invalid MaxWaitTime")
  82. } else if config.MaxWaitTime < 100 {
  83. Logger.Println("ConsumerConfig.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
  84. }
  85. if config.EventBufferSize < 0 {
  86. return nil, ConfigurationError("Invalid EventBufferSize")
  87. }
  88. if topic == "" {
  89. return nil, ConfigurationError("Empty topic")
  90. }
  91. broker, err := client.Leader(topic, partition)
  92. if err != nil {
  93. return nil, err
  94. }
  95. c := &Consumer{
  96. client: client,
  97. topic: topic,
  98. partition: partition,
  99. group: group,
  100. config: *config,
  101. broker: broker,
  102. stopper: make(chan bool),
  103. done: make(chan bool),
  104. events: make(chan *ConsumerEvent, config.EventBufferSize),
  105. }
  106. switch config.OffsetMethod {
  107. case OffsetMethodManual:
  108. if config.OffsetValue < 0 {
  109. return nil, ConfigurationError("OffsetValue cannot be < 0 when OffsetMethod is MANUAL")
  110. }
  111. c.offset = config.OffsetValue
  112. case OffsetMethodNewest:
  113. c.offset, err = c.getOffset(LatestOffsets, true)
  114. if err != nil {
  115. return nil, err
  116. }
  117. case OffsetMethodOldest:
  118. c.offset, err = c.getOffset(EarliestOffset, true)
  119. if err != nil {
  120. return nil, err
  121. }
  122. default:
  123. return nil, ConfigurationError("Invalid OffsetMethod")
  124. }
  125. go withRecover(c.fetchMessages)
  126. return c, nil
  127. }
  128. // Events returns the read channel for any events (messages or errors) that might be returned by the broker.
  129. func (c *Consumer) Events() <-chan *ConsumerEvent {
  130. return c.events
  131. }
  132. // Close stops the consumer from fetching messages. It is required to call this function before
  133. // a consumer object passes out of scope, as it will otherwise leak memory. You must call this before
  134. // calling Close on the underlying client.
  135. func (c *Consumer) Close() error {
  136. close(c.stopper)
  137. <-c.done
  138. return nil
  139. }
  140. // helper function for safely sending an error on the errors channel
  141. // if it returns true, the error was sent (or was nil)
  142. // if it returns false, the stopper channel signaled that your goroutine should return!
  143. func (c *Consumer) sendError(err error) bool {
  144. if err == nil {
  145. return true
  146. }
  147. select {
  148. case <-c.stopper:
  149. close(c.events)
  150. close(c.done)
  151. return false
  152. case c.events <- &ConsumerEvent{Err: err}:
  153. return true
  154. }
  155. }
  156. func (c *Consumer) fetchMessages() {
  157. fetchSize := c.config.DefaultFetchSize
  158. for {
  159. request := new(FetchRequest)
  160. request.MinBytes = c.config.MinFetchSize
  161. request.MaxWaitTime = c.config.MaxWaitTime
  162. request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
  163. response, err := c.broker.Fetch(c.client.id, request)
  164. switch {
  165. case err == nil:
  166. break
  167. case err == EncodingError:
  168. if c.sendError(err) {
  169. continue
  170. } else {
  171. return
  172. }
  173. default:
  174. Logger.Printf("Unexpected error processing FetchRequest; disconnecting broker %s: %s\n", c.broker.addr, err)
  175. c.client.disconnectBroker(c.broker)
  176. for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
  177. if !c.sendError(err) {
  178. return
  179. }
  180. }
  181. continue
  182. }
  183. block := response.GetBlock(c.topic, c.partition)
  184. if block == nil {
  185. if c.sendError(IncompleteResponse) {
  186. continue
  187. } else {
  188. return
  189. }
  190. }
  191. switch block.Err {
  192. case NoError:
  193. break
  194. case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
  195. err = c.client.RefreshTopicMetadata(c.topic)
  196. if c.sendError(err) {
  197. for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
  198. if !c.sendError(err) {
  199. return
  200. }
  201. }
  202. continue
  203. } else {
  204. return
  205. }
  206. default:
  207. if c.sendError(block.Err) {
  208. continue
  209. } else {
  210. return
  211. }
  212. }
  213. if len(block.MsgSet.Messages) == 0 {
  214. // We got no messages. If we got a trailing one then we need to ask for more data.
  215. // Otherwise we just poll again and wait for one to be produced...
  216. if block.MsgSet.PartialTrailingMessage {
  217. if c.config.MaxMessageSize == 0 {
  218. fetchSize *= 2
  219. } else {
  220. if fetchSize == c.config.MaxMessageSize {
  221. if c.sendError(MessageTooLarge) {
  222. continue
  223. } else {
  224. return
  225. }
  226. } else {
  227. fetchSize *= 2
  228. if fetchSize > c.config.MaxMessageSize {
  229. fetchSize = c.config.MaxMessageSize
  230. }
  231. }
  232. }
  233. }
  234. select {
  235. case <-c.stopper:
  236. close(c.events)
  237. close(c.done)
  238. return
  239. default:
  240. continue
  241. }
  242. } else {
  243. fetchSize = c.config.DefaultFetchSize
  244. }
  245. for _, msgBlock := range block.MsgSet.Messages {
  246. for _, msg := range msgBlock.Messages() {
  247. select {
  248. case <-c.stopper:
  249. close(c.events)
  250. close(c.done)
  251. return
  252. case c.events <- &ConsumerEvent{Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset, Topic: c.topic, Partition: c.partition}:
  253. c.offset++
  254. }
  255. }
  256. }
  257. }
  258. }
  259. func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
  260. request := &OffsetRequest{}
  261. request.AddBlock(c.topic, c.partition, where, 1)
  262. response, err := c.broker.GetAvailableOffsets(c.client.id, request)
  263. switch err {
  264. case nil:
  265. break
  266. case EncodingError:
  267. return -1, err
  268. default:
  269. if !retry {
  270. return -1, err
  271. }
  272. Logger.Printf("Unexpected error processing OffsetRequest; disconnecting broker %s: %s\n", c.broker.addr, err)
  273. c.client.disconnectBroker(c.broker)
  274. c.broker, err = c.client.Leader(c.topic, c.partition)
  275. if err != nil {
  276. return -1, err
  277. }
  278. return c.getOffset(where, false)
  279. }
  280. block := response.GetBlock(c.topic, c.partition)
  281. if block == nil {
  282. return -1, IncompleteResponse
  283. }
  284. switch block.Err {
  285. case NoError:
  286. if len(block.Offsets) < 1 {
  287. return -1, IncompleteResponse
  288. }
  289. return block.Offsets[0], nil
  290. case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
  291. if !retry {
  292. return -1, block.Err
  293. }
  294. err = c.client.RefreshTopicMetadata(c.topic)
  295. if err != nil {
  296. return -1, err
  297. }
  298. c.broker, err = c.client.Leader(c.topic, c.partition)
  299. if err != nil {
  300. return -1, err
  301. }
  302. return c.getOffset(where, false)
  303. }
  304. return -1, block.Err
  305. }