consumer.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. package sarama
  2. // OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset.
  3. type OffsetMethod int
  4. const (
  5. // OffsetMethodManual causes the consumer to interpret the OffsetValue in the ConsumerConfig as the
  6. // offset at which to start, allowing the user to manually specify their desired starting offset.
  7. OffsetMethodManual OffsetMethod = iota
  8. // OffsetMethodNewest causes the consumer to start at the most recent available offset, as
  9. // determined by querying the broker.
  10. OffsetMethodNewest
  11. // OffsetMethodOldest causes the consumer to start at the oldest available offset, as
  12. // determined by querying the broker.
  13. OffsetMethodOldest
  14. )
  15. // ConsumerConfig is used to pass multiple configuration options to NewConsumer.
  16. type ConsumerConfig struct {
  17. // The default (maximum) amount of data to fetch from the broker in each request. The default of 0 is treated as 1024 bytes.
  18. DefaultFetchSize int32
  19. // The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
  20. // The default of 0 is treated as 'at least one' to prevent the consumer from spinning when no messages are available.
  21. MinFetchSize int32
  22. // The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
  23. // treated as no limit.
  24. MaxMessageSize int32
  25. // The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
  26. // returns fewer than that anyways. The default of 0 is treated as no limit.
  27. MaxWaitTime int32
  28. // The method used to determine at which offset to begin consuming messages.
  29. OffsetMethod OffsetMethod
  30. // Interpreted differently according to the value of OffsetMethod.
  31. OffsetValue int64
  32. // The number of events to buffer in the Events channel. Setting this can let the
  33. // consumer continue fetching messages in the background while local code consumes events,
  34. // greatly improving throughput.
  35. EventBufferSize int
  36. }
  37. // ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
  38. // a message (in which case Err is nil and the other fields are all set).
  39. type ConsumerEvent struct {
  40. Key, Value []byte
  41. Offset int64
  42. Err error
  43. }
  44. // Consumer processes Kafka messages from a given topic and partition.
  45. // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
  46. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  47. type Consumer struct {
  48. client *Client
  49. topic string
  50. partition int32
  51. group string
  52. config ConsumerConfig
  53. offset int64
  54. broker *Broker
  55. stopper, done chan bool
  56. events chan *ConsumerEvent
  57. }
  58. // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
  59. // part of the named consumer group.
  60. func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) {
  61. if config == nil {
  62. config = new(ConsumerConfig)
  63. }
  64. if config.DefaultFetchSize < 0 {
  65. return nil, ConfigurationError("Invalid DefaultFetchSize")
  66. } else if config.DefaultFetchSize == 0 {
  67. config.DefaultFetchSize = 1024
  68. }
  69. if config.MinFetchSize < 0 {
  70. return nil, ConfigurationError("Invalid MinFetchSize")
  71. } else if config.MinFetchSize == 0 {
  72. config.MinFetchSize = 1
  73. }
  74. if config.MaxMessageSize < 0 {
  75. return nil, ConfigurationError("Invalid MaxMessageSize")
  76. }
  77. if config.MaxWaitTime < 0 {
  78. return nil, ConfigurationError("Invalid MaxWaitTime")
  79. }
  80. if config.EventBufferSize < 0 {
  81. return nil, ConfigurationError("Invalid EventBufferSize")
  82. }
  83. if topic == "" {
  84. return nil, ConfigurationError("Empty topic")
  85. }
  86. broker, err := client.Leader(topic, partition)
  87. if err != nil {
  88. return nil, err
  89. }
  90. c := &Consumer{
  91. client: client,
  92. topic: topic,
  93. partition: partition,
  94. group: group,
  95. config: *config,
  96. broker: broker,
  97. stopper: make(chan bool),
  98. done: make(chan bool),
  99. events: make(chan *ConsumerEvent, config.EventBufferSize),
  100. }
  101. switch config.OffsetMethod {
  102. case OffsetMethodManual:
  103. if config.OffsetValue < 0 {
  104. return nil, ConfigurationError("OffsetValue cannot be < 0 when OffsetMethod is MANUAL")
  105. }
  106. c.offset = config.OffsetValue
  107. case OffsetMethodNewest:
  108. c.offset, err = c.getOffset(LatestOffsets, true)
  109. if err != nil {
  110. return nil, err
  111. }
  112. case OffsetMethodOldest:
  113. c.offset, err = c.getOffset(EarliestOffset, true)
  114. if err != nil {
  115. return nil, err
  116. }
  117. default:
  118. return nil, ConfigurationError("Invalid OffsetMethod")
  119. }
  120. go c.fetchMessages()
  121. return c, nil
  122. }
  123. // Events returns the read channel for any events (messages or errors) that might be returned by the broker.
  124. func (c *Consumer) Events() <-chan *ConsumerEvent {
  125. return c.events
  126. }
  127. // Close stops the consumer from fetching messages. It is required to call this function before
  128. // a consumer object passes out of scope, as it will otherwise leak memory. You must call this before
  129. // calling Close on the underlying client.
  130. func (c *Consumer) Close() error {
  131. close(c.stopper)
  132. <-c.done
  133. return nil
  134. }
  135. // helper function for safely sending an error on the errors channel
  136. // if it returns true, the error was sent (or was nil)
  137. // if it returns false, the stopper channel signaled that your goroutine should return!
  138. func (c *Consumer) sendError(err error) bool {
  139. if err == nil {
  140. return true
  141. }
  142. select {
  143. case <-c.stopper:
  144. close(c.events)
  145. close(c.done)
  146. return false
  147. case c.events <- &ConsumerEvent{Err: err}:
  148. return true
  149. }
  150. // For backward compatibility with go1.0
  151. return true
  152. }
  153. func (c *Consumer) fetchMessages() {
  154. fetchSize := c.config.DefaultFetchSize
  155. for {
  156. request := new(FetchRequest)
  157. request.MinBytes = c.config.MinFetchSize
  158. request.MaxWaitTime = c.config.MaxWaitTime
  159. request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
  160. response, err := c.broker.Fetch(c.client.id, request)
  161. switch {
  162. case err == nil:
  163. break
  164. case err == EncodingError:
  165. if c.sendError(err) {
  166. continue
  167. } else {
  168. return
  169. }
  170. default:
  171. c.client.disconnectBroker(c.broker)
  172. for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
  173. if !c.sendError(err) {
  174. return
  175. }
  176. }
  177. continue
  178. }
  179. block := response.GetBlock(c.topic, c.partition)
  180. if block == nil {
  181. if c.sendError(IncompleteResponse) {
  182. continue
  183. } else {
  184. return
  185. }
  186. }
  187. switch block.Err {
  188. case NoError:
  189. break
  190. case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
  191. err = c.client.RefreshTopicMetadata(c.topic)
  192. if c.sendError(err) {
  193. for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
  194. if !c.sendError(err) {
  195. return
  196. }
  197. }
  198. continue
  199. } else {
  200. return
  201. }
  202. default:
  203. if c.sendError(block.Err) {
  204. continue
  205. } else {
  206. return
  207. }
  208. }
  209. if len(block.MsgSet.Messages) == 0 {
  210. // We got no messages. If we got a trailing one then we need to ask for more data.
  211. // Otherwise we just poll again and wait for one to be produced...
  212. if block.MsgSet.PartialTrailingMessage {
  213. if c.config.MaxMessageSize == 0 {
  214. fetchSize *= 2
  215. } else {
  216. if fetchSize == c.config.MaxMessageSize {
  217. if c.sendError(MessageTooLarge) {
  218. continue
  219. } else {
  220. return
  221. }
  222. } else {
  223. fetchSize *= 2
  224. if fetchSize > c.config.MaxMessageSize {
  225. fetchSize = c.config.MaxMessageSize
  226. }
  227. }
  228. }
  229. }
  230. select {
  231. case <-c.stopper:
  232. close(c.events)
  233. close(c.done)
  234. return
  235. default:
  236. continue
  237. }
  238. } else {
  239. fetchSize = c.config.DefaultFetchSize
  240. }
  241. for _, msgBlock := range block.MsgSet.Messages {
  242. select {
  243. case <-c.stopper:
  244. close(c.events)
  245. close(c.done)
  246. return
  247. case c.events <- &ConsumerEvent{Key: msgBlock.Msg.Key, Value: msgBlock.Msg.Value, Offset: msgBlock.Offset}:
  248. c.offset++
  249. }
  250. }
  251. }
  252. }
  253. func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
  254. request := &OffsetRequest{}
  255. request.AddBlock(c.topic, c.partition, where, 1)
  256. response, err := c.broker.GetAvailableOffsets(c.client.id, request)
  257. switch err {
  258. case nil:
  259. break
  260. case EncodingError:
  261. return -1, err
  262. default:
  263. if !retry {
  264. return -1, err
  265. }
  266. c.client.disconnectBroker(c.broker)
  267. c.broker, err = c.client.Leader(c.topic, c.partition)
  268. if err != nil {
  269. return -1, err
  270. }
  271. return c.getOffset(where, false)
  272. }
  273. block := response.GetBlock(c.topic, c.partition)
  274. if block == nil {
  275. return -1, IncompleteResponse
  276. }
  277. switch block.Err {
  278. case NoError:
  279. if len(block.Offsets) < 1 {
  280. return -1, IncompleteResponse
  281. }
  282. return block.Offsets[0], nil
  283. case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
  284. if !retry {
  285. return -1, block.Err
  286. }
  287. err = c.client.RefreshTopicMetadata(c.topic)
  288. if err != nil {
  289. return -1, err
  290. }
  291. c.broker, err = c.client.Leader(c.topic, c.partition)
  292. if err != nil {
  293. return -1, err
  294. }
  295. return c.getOffset(where, false)
  296. }
  297. return -1, block.Err
  298. }