consumer.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package sarama
  2. // Consumer processes Kafka messages from a given topic and partition.
  3. // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
  4. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  5. type Consumer struct {
  6. client *Client
  7. topic string
  8. partition int32
  9. group string
  10. offset int64
  11. broker *Broker
  12. stopper, done chan bool
  13. messages chan *MessageBlock
  14. errors chan error
  15. }
  16. // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
  17. // part of the named consumer group.
  18. func NewConsumer(client *Client, topic string, partition int32, group string) (*Consumer, error) {
  19. broker, err := client.leader(topic, partition)
  20. if err != nil {
  21. return nil, err
  22. }
  23. c := new(Consumer)
  24. c.client = client
  25. c.topic = topic
  26. c.partition = partition
  27. c.group = group
  28. // We should really be sending an OffsetFetchRequest, but that doesn't seem to
  29. // work in kafka yet. Hopefully will in beta 2...
  30. c.offset = 0
  31. c.broker = broker
  32. c.stopper = make(chan bool)
  33. c.done = make(chan bool)
  34. c.messages = make(chan *MessageBlock)
  35. c.errors = make(chan error)
  36. go c.fetchMessages()
  37. return c, nil
  38. }
  39. // Errors returns the read channel for any errors that might be returned by the broker.
  40. func (c *Consumer) Errors() <-chan error {
  41. return c.errors
  42. }
  43. // Messages returns the read channel for all messages that will be returned by the broker.
  44. func (c *Consumer) Messages() <-chan *MessageBlock {
  45. return c.messages
  46. }
  47. // Close stops the consumer from fetching messages. It is required to call this function before
  48. // a consumer object passes out of scope, as it will otherwise leak memory. You must call this before
  49. // calling Close on the underlying client.
  50. func (c *Consumer) Close() {
  51. close(c.stopper)
  52. <-c.done
  53. }
  54. // helper function for safely sending an error on the errors channel
  55. // if it returns true, the error was sent (or was nil)
  56. // if it returns false, the stopper channel signaled that your goroutine should return!
  57. func (c *Consumer) sendError(err error) bool {
  58. if err == nil {
  59. return true
  60. }
  61. select {
  62. case <-c.stopper:
  63. close(c.messages)
  64. close(c.errors)
  65. close(c.done)
  66. return false
  67. case c.errors <- err:
  68. return true
  69. }
  70. return true
  71. }
  72. func (c *Consumer) fetchMessages() {
  73. var fetchSize int32 = 1024
  74. for {
  75. request := new(FetchRequest)
  76. request.MinBytes = 1
  77. request.MaxWaitTime = 1000
  78. request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
  79. response, err := c.broker.Fetch(c.client.id, request)
  80. switch {
  81. case err == nil:
  82. break
  83. case err == EncodingError:
  84. if c.sendError(err) {
  85. continue
  86. } else {
  87. return
  88. }
  89. default:
  90. c.client.disconnectBroker(c.broker)
  91. for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
  92. if !c.sendError(err) {
  93. return
  94. }
  95. }
  96. }
  97. block := response.GetBlock(c.topic, c.partition)
  98. if block == nil {
  99. if c.sendError(IncompleteResponse) {
  100. continue
  101. } else {
  102. return
  103. }
  104. }
  105. switch block.Err {
  106. case NO_ERROR:
  107. break
  108. case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION, LEADER_NOT_AVAILABLE:
  109. err = c.client.refreshTopic(c.topic)
  110. if c.sendError(err) {
  111. for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
  112. if !c.sendError(err) {
  113. return
  114. }
  115. }
  116. continue
  117. } else {
  118. return
  119. }
  120. default:
  121. if c.sendError(block.Err) {
  122. continue
  123. } else {
  124. return
  125. }
  126. }
  127. if len(block.MsgSet.Messages) == 0 {
  128. // We got no messages. If we got a trailing one then we need to ask for more data.
  129. // Otherwise we just poll again and wait for one to be produced...
  130. if block.MsgSet.PartialTrailingMessage {
  131. fetchSize *= 2
  132. }
  133. select {
  134. case <-c.stopper:
  135. close(c.messages)
  136. close(c.errors)
  137. close(c.done)
  138. return
  139. default:
  140. continue
  141. }
  142. }
  143. for _, msgBlock := range block.MsgSet.Messages {
  144. select {
  145. case <-c.stopper:
  146. close(c.messages)
  147. close(c.errors)
  148. close(c.done)
  149. return
  150. case c.messages <- msgBlock:
  151. c.offset++
  152. }
  153. }
  154. }
  155. }