consumer.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. package sarama
  2. // OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset.
  3. type OffsetMethod int
  4. const (
  5. // OffsetMethodManual causes the consumer to interpret the OffsetValue in the ConsumerConfig as the
  6. // offset at which to start, allowing the user to manually specify their desired starting offset.
  7. OffsetMethodManual OffsetMethod = iota
  8. // OffsetMethodNewest causes the consumer to start at the most recent available offset, as
  9. // determined by querying the broker.
  10. OffsetMethodNewest
  11. // OffsetMethodOldest causes the consumer to start at the oldest available offset, as
  12. // determined by querying the broker.
  13. OffsetMethodOldest
  14. )
  15. // ConsumerConfig is used to pass multiple configuration options to NewConsumer.
  16. type ConsumerConfig struct {
  17. // The default (maximum) amount of data to fetch from the broker in each request. The default of 0 is treated as 1024 bytes.
  18. DefaultFetchSize int32
  19. // The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
  20. // The default of 0 is treated as 'at least one' to prevent the consumer from spinning when no messages are available.
  21. MinFetchSize int32
  22. // The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
  23. // treated as no limit.
  24. MaxMessageSize int32
  25. // The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
  26. // returns fewer than that anyways. The default of 0 causes Kafka to return immediately, which is rarely desirable
  27. // as it causes the Consumer to spin when no events are available. 100-500ms is a reasonable range for most cases.
  28. MaxWaitTime int32
  29. // The method used to determine at which offset to begin consuming messages.
  30. OffsetMethod OffsetMethod
  31. // Interpreted differently according to the value of OffsetMethod.
  32. OffsetValue int64
  33. // The number of events to buffer in the Events channel. Setting this can let the
  34. // consumer continue fetching messages in the background while local code consumes events,
  35. // greatly improving throughput.
  36. EventBufferSize int
  37. }
  38. // ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
  39. // a message (in which case Err is nil and Offset, Key, and Value are set). Topic and Partition are always set.
  40. type ConsumerEvent struct {
  41. Key, Value []byte
  42. Topic string
  43. Partition int32
  44. Offset int64
  45. Err error
  46. }
  47. // Consumer processes Kafka messages from a given topic and partition.
  48. // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
  49. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  50. type Consumer struct {
  51. client *Client
  52. topic string
  53. partition int32
  54. group string
  55. config ConsumerConfig
  56. offset int64
  57. broker *Broker
  58. stopper, done chan bool
  59. events chan *ConsumerEvent
  60. }
  61. // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
  62. // part of the named consumer group.
  63. func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) {
  64. if config == nil {
  65. config = NewConsumerConfig()
  66. }
  67. if err := config.Validate(); err != nil {
  68. return nil, err
  69. }
  70. if topic == "" {
  71. return nil, ConfigurationError("Empty topic")
  72. }
  73. broker, err := client.Leader(topic, partition)
  74. if err != nil {
  75. return nil, err
  76. }
  77. c := &Consumer{
  78. client: client,
  79. topic: topic,
  80. partition: partition,
  81. group: group,
  82. config: *config,
  83. broker: broker,
  84. stopper: make(chan bool),
  85. done: make(chan bool),
  86. events: make(chan *ConsumerEvent, config.EventBufferSize),
  87. }
  88. switch config.OffsetMethod {
  89. case OffsetMethodManual:
  90. if config.OffsetValue < 0 {
  91. return nil, ConfigurationError("OffsetValue cannot be < 0 when OffsetMethod is MANUAL")
  92. }
  93. c.offset = config.OffsetValue
  94. case OffsetMethodNewest:
  95. c.offset, err = c.getOffset(LatestOffsets, true)
  96. if err != nil {
  97. return nil, err
  98. }
  99. case OffsetMethodOldest:
  100. c.offset, err = c.getOffset(EarliestOffset, true)
  101. if err != nil {
  102. return nil, err
  103. }
  104. default:
  105. return nil, ConfigurationError("Invalid OffsetMethod")
  106. }
  107. go withRecover(c.fetchMessages)
  108. return c, nil
  109. }
  110. // Events returns the read channel for any events (messages or errors) that might be returned by the broker.
  111. func (c *Consumer) Events() <-chan *ConsumerEvent {
  112. return c.events
  113. }
  114. // Close stops the consumer from fetching messages. It is required to call this function before
  115. // a consumer object passes out of scope, as it will otherwise leak memory. You must call this before
  116. // calling Close on the underlying client.
  117. func (c *Consumer) Close() error {
  118. close(c.stopper)
  119. <-c.done
  120. return nil
  121. }
  122. // helper function for safely sending an error on the errors channel
  123. // if it returns true, the error was sent (or was nil)
  124. // if it returns false, the stopper channel signaled that your goroutine should return!
  125. func (c *Consumer) sendError(err error) bool {
  126. if err == nil {
  127. return true
  128. }
  129. select {
  130. case <-c.stopper:
  131. close(c.events)
  132. close(c.done)
  133. return false
  134. case c.events <- &ConsumerEvent{Err: err, Topic: c.topic, Partition: c.partition}:
  135. return true
  136. }
  137. }
  138. func (c *Consumer) fetchMessages() {
  139. fetchSize := c.config.DefaultFetchSize
  140. for {
  141. request := new(FetchRequest)
  142. request.MinBytes = c.config.MinFetchSize
  143. request.MaxWaitTime = c.config.MaxWaitTime
  144. request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
  145. response, err := c.broker.Fetch(c.client.id, request)
  146. switch {
  147. case err == nil:
  148. break
  149. case err == EncodingError:
  150. if c.sendError(err) {
  151. continue
  152. } else {
  153. return
  154. }
  155. default:
  156. Logger.Printf("Unexpected error processing FetchRequest; disconnecting broker %s: %s\n", c.broker.addr, err)
  157. c.client.disconnectBroker(c.broker)
  158. for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
  159. if !c.sendError(err) {
  160. return
  161. }
  162. }
  163. continue
  164. }
  165. block := response.GetBlock(c.topic, c.partition)
  166. if block == nil {
  167. if c.sendError(IncompleteResponse) {
  168. continue
  169. } else {
  170. return
  171. }
  172. }
  173. switch block.Err {
  174. case NoError:
  175. break
  176. case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
  177. err = c.client.RefreshTopicMetadata(c.topic)
  178. if c.sendError(err) {
  179. for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
  180. if !c.sendError(err) {
  181. return
  182. }
  183. }
  184. continue
  185. } else {
  186. return
  187. }
  188. default:
  189. if c.sendError(block.Err) {
  190. continue
  191. } else {
  192. return
  193. }
  194. }
  195. if len(block.MsgSet.Messages) == 0 {
  196. // We got no messages. If we got a trailing one then we need to ask for more data.
  197. // Otherwise we just poll again and wait for one to be produced...
  198. if block.MsgSet.PartialTrailingMessage {
  199. if c.config.MaxMessageSize == 0 {
  200. fetchSize *= 2
  201. } else {
  202. if fetchSize == c.config.MaxMessageSize {
  203. if c.sendError(MessageTooLarge) {
  204. continue
  205. } else {
  206. return
  207. }
  208. } else {
  209. fetchSize *= 2
  210. if fetchSize > c.config.MaxMessageSize {
  211. fetchSize = c.config.MaxMessageSize
  212. }
  213. }
  214. }
  215. }
  216. select {
  217. case <-c.stopper:
  218. close(c.events)
  219. close(c.done)
  220. return
  221. default:
  222. continue
  223. }
  224. } else {
  225. fetchSize = c.config.DefaultFetchSize
  226. }
  227. for _, msgBlock := range block.MsgSet.Messages {
  228. for _, msg := range msgBlock.Messages() {
  229. select {
  230. case <-c.stopper:
  231. close(c.events)
  232. close(c.done)
  233. return
  234. case c.events <- &ConsumerEvent{Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset, Topic: c.topic, Partition: c.partition}:
  235. c.offset++
  236. }
  237. }
  238. }
  239. }
  240. }
  241. func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
  242. request := &OffsetRequest{}
  243. request.AddBlock(c.topic, c.partition, where, 1)
  244. response, err := c.broker.GetAvailableOffsets(c.client.id, request)
  245. switch err {
  246. case nil:
  247. break
  248. case EncodingError:
  249. return -1, err
  250. default:
  251. if !retry {
  252. return -1, err
  253. }
  254. Logger.Printf("Unexpected error processing OffsetRequest; disconnecting broker %s: %s\n", c.broker.addr, err)
  255. c.client.disconnectBroker(c.broker)
  256. c.broker, err = c.client.Leader(c.topic, c.partition)
  257. if err != nil {
  258. return -1, err
  259. }
  260. return c.getOffset(where, false)
  261. }
  262. block := response.GetBlock(c.topic, c.partition)
  263. if block == nil {
  264. return -1, IncompleteResponse
  265. }
  266. switch block.Err {
  267. case NoError:
  268. if len(block.Offsets) < 1 {
  269. return -1, IncompleteResponse
  270. }
  271. return block.Offsets[0], nil
  272. case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
  273. if !retry {
  274. return -1, block.Err
  275. }
  276. err = c.client.RefreshTopicMetadata(c.topic)
  277. if err != nil {
  278. return -1, err
  279. }
  280. c.broker, err = c.client.Leader(c.topic, c.partition)
  281. if err != nil {
  282. return -1, err
  283. }
  284. return c.getOffset(where, false)
  285. }
  286. return -1, block.Err
  287. }
  288. // Creates a ConsumerConfig instance with sane defaults.
  289. func NewConsumerConfig() *ConsumerConfig {
  290. return &ConsumerConfig{
  291. DefaultFetchSize: 1024,
  292. MinFetchSize: 1,
  293. MaxWaitTime: 250,
  294. }
  295. }
  296. // Validates a ConsumerConfig instance. It will return a
  297. // ConfigurationError if the specified value doesn't make sense.
  298. func (config *ConsumerConfig) Validate() error {
  299. if config.DefaultFetchSize <= 0 {
  300. return ConfigurationError("Invalid DefaultFetchSize")
  301. }
  302. if config.MinFetchSize <= 0 {
  303. return ConfigurationError("Invalid MinFetchSize")
  304. }
  305. if config.MaxMessageSize < 0 {
  306. return ConfigurationError("Invalid MaxMessageSize")
  307. }
  308. if config.MaxWaitTime <= 0 {
  309. return ConfigurationError("Invalid MaxWaitTime")
  310. } else if config.MaxWaitTime < 100 {
  311. Logger.Println("ConsumerConfig.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
  312. }
  313. if config.EventBufferSize < 0 {
  314. return ConfigurationError("Invalid EventBufferSize")
  315. }
  316. return nil
  317. }