consumer.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. package sarama
  2. import (
  3. "time"
  4. )
  5. // OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset.
  6. type OffsetMethod int
  7. const (
  8. // OffsetMethodManual causes the consumer to interpret the OffsetValue in the ConsumerConfig as the
  9. // offset at which to start, allowing the user to manually specify their desired starting offset.
  10. OffsetMethodManual OffsetMethod = iota
  11. // OffsetMethodNewest causes the consumer to start at the most recent available offset, as
  12. // determined by querying the broker.
  13. OffsetMethodNewest
  14. // OffsetMethodOldest causes the consumer to start at the oldest available offset, as
  15. // determined by querying the broker.
  16. OffsetMethodOldest
  17. )
  18. // ConsumerConfig is used to pass multiple configuration options to NewConsumer.
  19. type ConsumerConfig struct {
  20. // The default (maximum) amount of data to fetch from the broker in each request. The default is 32768 bytes.
  21. DefaultFetchSize int32
  22. // The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
  23. // The default is 1, as 0 causes the consumer to spin when no messages are available.
  24. MinFetchSize int32
  25. // The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
  26. // treated as no limit.
  27. MaxMessageSize int32
  28. // The maximum amount of time the broker will wait for MinFetchSize bytes to become available before it
  29. // returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available.
  30. // 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
  31. MaxWaitTime time.Duration
  32. // The method used to determine at which offset to begin consuming messages.
  33. OffsetMethod OffsetMethod
  34. // Interpreted differently according to the value of OffsetMethod.
  35. OffsetValue int64
  36. // The number of events to buffer in the Events channel. Having this non-zero permits the
  37. // consumer to continue fetching messages in the background while client code consumes events,
  38. // greatly improving throughput. The default is 16.
  39. EventBufferSize int
  40. }
  41. // ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
  42. // a message (in which case Err is nil and Offset, Key, and Value are set). Topic and Partition are always set.
  43. type ConsumerEvent struct {
  44. Key, Value []byte
  45. Topic string
  46. Partition int32
  47. Offset int64
  48. Err error
  49. }
  50. // Consumer processes Kafka messages from a given topic and partition.
  51. // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
  52. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  53. type Consumer struct {
  54. client *Client
  55. topic string
  56. partition int32
  57. group string
  58. config ConsumerConfig
  59. offset int64
  60. broker *Broker
  61. stopper, done chan bool
  62. events chan *ConsumerEvent
  63. }
  64. // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
  65. // part of the named consumer group.
  66. func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) {
  67. // Check that we are not dealing with a closed Client before processing
  68. // any other arguments
  69. if client.Closed() {
  70. return nil, ClosedClient
  71. }
  72. if config == nil {
  73. config = NewConsumerConfig()
  74. }
  75. if err := config.Validate(); err != nil {
  76. return nil, err
  77. }
  78. if topic == "" {
  79. return nil, ConfigurationError("Empty topic")
  80. }
  81. broker, err := client.Leader(topic, partition)
  82. if err != nil {
  83. return nil, err
  84. }
  85. c := &Consumer{
  86. client: client,
  87. topic: topic,
  88. partition: partition,
  89. group: group,
  90. config: *config,
  91. broker: broker,
  92. stopper: make(chan bool),
  93. done: make(chan bool),
  94. events: make(chan *ConsumerEvent, config.EventBufferSize),
  95. }
  96. switch config.OffsetMethod {
  97. case OffsetMethodManual:
  98. if config.OffsetValue < 0 {
  99. return nil, ConfigurationError("OffsetValue cannot be < 0 when OffsetMethod is MANUAL")
  100. }
  101. c.offset = config.OffsetValue
  102. case OffsetMethodNewest:
  103. c.offset, err = c.getOffset(LatestOffsets, true)
  104. if err != nil {
  105. return nil, err
  106. }
  107. case OffsetMethodOldest:
  108. c.offset, err = c.getOffset(EarliestOffset, true)
  109. if err != nil {
  110. return nil, err
  111. }
  112. default:
  113. return nil, ConfigurationError("Invalid OffsetMethod")
  114. }
  115. go withRecover(c.fetchMessages)
  116. return c, nil
  117. }
  118. // Events returns the read channel for any events (messages or errors) that might be returned by the broker.
  119. func (c *Consumer) Events() <-chan *ConsumerEvent {
  120. return c.events
  121. }
  122. // Close stops the consumer from fetching messages. It is required to call this function before
  123. // a consumer object passes out of scope, as it will otherwise leak memory. You must call this before
  124. // calling Close on the underlying client.
  125. func (c *Consumer) Close() error {
  126. close(c.stopper)
  127. <-c.done
  128. return nil
  129. }
  130. // helper function for safely sending an error on the errors channel
  131. // if it returns true, the error was sent (or was nil)
  132. // if it returns false, the stopper channel signaled that your goroutine should return!
  133. func (c *Consumer) sendError(err error) bool {
  134. if err == nil {
  135. return true
  136. }
  137. select {
  138. case <-c.stopper:
  139. close(c.events)
  140. close(c.done)
  141. return false
  142. case c.events <- &ConsumerEvent{Err: err, Topic: c.topic, Partition: c.partition}:
  143. return true
  144. }
  145. }
  146. func (c *Consumer) fetchMessages() {
  147. fetchSize := c.config.DefaultFetchSize
  148. for {
  149. request := new(FetchRequest)
  150. request.MinBytes = c.config.MinFetchSize
  151. request.MaxWaitTime = int32(c.config.MaxWaitTime / time.Millisecond)
  152. request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
  153. response, err := c.broker.Fetch(c.client.id, request)
  154. switch {
  155. case err == nil:
  156. break
  157. case err == EncodingError:
  158. if c.sendError(err) {
  159. continue
  160. } else {
  161. return
  162. }
  163. default:
  164. Logger.Printf("Unexpected error processing FetchRequest; disconnecting broker %s: %s\n", c.broker.addr, err)
  165. c.client.disconnectBroker(c.broker)
  166. for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
  167. if !c.sendError(err) {
  168. return
  169. }
  170. }
  171. continue
  172. }
  173. block := response.GetBlock(c.topic, c.partition)
  174. if block == nil {
  175. if c.sendError(IncompleteResponse) {
  176. continue
  177. } else {
  178. return
  179. }
  180. }
  181. switch block.Err {
  182. case NoError:
  183. break
  184. case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
  185. err = c.client.RefreshTopicMetadata(c.topic)
  186. if c.sendError(err) {
  187. for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
  188. if !c.sendError(err) {
  189. return
  190. }
  191. }
  192. continue
  193. } else {
  194. return
  195. }
  196. default:
  197. if c.sendError(block.Err) {
  198. continue
  199. } else {
  200. return
  201. }
  202. }
  203. if len(block.MsgSet.Messages) == 0 {
  204. // We got no messages. If we got a trailing one then we need to ask for more data.
  205. // Otherwise we just poll again and wait for one to be produced...
  206. if block.MsgSet.PartialTrailingMessage {
  207. if c.config.MaxMessageSize == 0 {
  208. fetchSize *= 2
  209. } else {
  210. if fetchSize == c.config.MaxMessageSize {
  211. if c.sendError(MessageTooLarge) {
  212. continue
  213. } else {
  214. return
  215. }
  216. } else {
  217. fetchSize *= 2
  218. if fetchSize > c.config.MaxMessageSize {
  219. fetchSize = c.config.MaxMessageSize
  220. }
  221. }
  222. }
  223. }
  224. select {
  225. case <-c.stopper:
  226. close(c.events)
  227. close(c.done)
  228. return
  229. default:
  230. continue
  231. }
  232. } else {
  233. fetchSize = c.config.DefaultFetchSize
  234. }
  235. atLeastOne := false
  236. for _, msgBlock := range block.MsgSet.Messages {
  237. prelude := true
  238. for _, msg := range msgBlock.Messages() {
  239. if prelude && msg.Offset < c.offset {
  240. continue
  241. }
  242. prelude = false
  243. event := &ConsumerEvent{Topic: c.topic, Partition: c.partition}
  244. if msg.Offset == c.offset {
  245. atLeastOne = true
  246. event.Key = msg.Msg.Key
  247. event.Value = msg.Msg.Value
  248. event.Offset = msg.Offset
  249. c.offset++
  250. } else {
  251. event.Err = IncompleteResponse
  252. }
  253. select {
  254. case <-c.stopper:
  255. close(c.events)
  256. close(c.done)
  257. return
  258. case c.events <- event:
  259. }
  260. }
  261. }
  262. if !atLeastOne {
  263. select {
  264. case <-c.stopper:
  265. close(c.events)
  266. close(c.done)
  267. return
  268. case c.events <- &ConsumerEvent{Topic: c.topic, Partition: c.partition, Err: IncompleteResponse}:
  269. }
  270. }
  271. }
  272. }
  273. func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
  274. offset, err := c.client.GetOffset(c.topic, c.partition, where)
  275. switch err {
  276. case nil:
  277. break
  278. case EncodingError:
  279. return -1, err
  280. default:
  281. if !retry {
  282. return -1, err
  283. }
  284. switch err.(type) {
  285. case KError:
  286. switch err {
  287. case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
  288. err = c.client.RefreshTopicMetadata(c.topic)
  289. if err != nil {
  290. return -1, err
  291. }
  292. default:
  293. Logger.Printf("Unexpected error processing OffsetRequest; disconnecting broker %s: %s\n", c.broker.addr, err)
  294. c.client.disconnectBroker(c.broker)
  295. broker, brokerErr := c.client.Leader(c.topic, c.partition)
  296. if brokerErr != nil {
  297. return -1, brokerErr
  298. }
  299. c.broker = broker
  300. }
  301. return c.getOffset(where, false)
  302. }
  303. return -1, err
  304. }
  305. return offset, nil
  306. }
  307. // NewConsumerConfig creates a ConsumerConfig instance with sane defaults.
  308. func NewConsumerConfig() *ConsumerConfig {
  309. return &ConsumerConfig{
  310. DefaultFetchSize: 32768,
  311. MinFetchSize: 1,
  312. MaxWaitTime: 250 * time.Millisecond,
  313. EventBufferSize: 16,
  314. }
  315. }
  316. // Validate checks a ConsumerConfig instance. It will return a
  317. // ConfigurationError if the specified value doesn't make sense.
  318. func (config *ConsumerConfig) Validate() error {
  319. if config.DefaultFetchSize <= 0 {
  320. return ConfigurationError("Invalid DefaultFetchSize")
  321. }
  322. if config.MinFetchSize <= 0 {
  323. return ConfigurationError("Invalid MinFetchSize")
  324. }
  325. if config.MaxMessageSize < 0 {
  326. return ConfigurationError("Invalid MaxMessageSize")
  327. }
  328. if config.MaxWaitTime < 1*time.Millisecond {
  329. return ConfigurationError("Invalid MaxWaitTime, it needs to be at least 1ms")
  330. } else if config.MaxWaitTime < 100*time.Millisecond {
  331. Logger.Println("ConsumerConfig.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
  332. } else if config.MaxWaitTime%time.Millisecond != 0 {
  333. Logger.Println("ConsumerConfig.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
  334. }
  335. if config.EventBufferSize < 0 {
  336. return ConfigurationError("Invalid EventBufferSize")
  337. }
  338. return nil
  339. }