consumer.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package kafka
  2. import k "sarama/protocol"
  3. // Consumer processes Kafka messages from a given topic and partition.
  4. // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
  5. // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
  6. type Consumer struct {
  7. client *Client
  8. topic string
  9. partition int32
  10. group string
  11. offset int64
  12. broker *k.Broker
  13. stopper, done chan bool
  14. messages chan *Message
  15. errors chan error
  16. }
  17. // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
  18. // part of the named consumer group.
  19. func NewConsumer(client *Client, topic string, partition int32, group string) (*Consumer, error) {
  20. broker, err := client.leader(topic, partition)
  21. if err != nil {
  22. return nil, err
  23. }
  24. c := new(Consumer)
  25. c.client = client
  26. c.topic = topic
  27. c.partition = partition
  28. c.group = group
  29. // We should really be sending an OffsetFetchRequest, but that doesn't seem to
  30. // work in kafka yet. Hopefully will in beta 2...
  31. c.offset = 0
  32. c.broker = broker
  33. c.stopper = make(chan bool)
  34. c.done = make(chan bool)
  35. c.messages = make(chan *Message)
  36. c.errors = make(chan error)
  37. go c.fetchMessages()
  38. return c, nil
  39. }
  40. // Errors returns the read channel for any errors that might be returned by the broker.
  41. func (c *Consumer) Errors() <-chan error {
  42. return c.errors
  43. }
  44. // Messages returns the read channel for all messages that will be returned by the broker.
  45. func (c *Consumer) Messages() <-chan *Message {
  46. return c.messages
  47. }
  48. // Close stops the consumer from fetching messages. It is required to call this function before
  49. // a consumer object passes out of scope, as it will otherwise leak memory. You must call this before
  50. // calling Close on the underlying client.
  51. func (c *Consumer) Close() {
  52. close(c.stopper)
  53. <-c.done
  54. }
  55. func (c *Consumer) fetchMessages() {
  56. var fetchSize int32 = 1024
  57. for {
  58. request := new(k.FetchRequest)
  59. request.MinBytes = 1
  60. request.MaxWaitTime = 1000
  61. request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
  62. response, err := c.broker.Fetch(c.client.id, request)
  63. switch err.(type) {
  64. case k.EncodingError:
  65. select {
  66. case <-c.stopper:
  67. close(c.messages)
  68. close(c.errors)
  69. close(c.done)
  70. return
  71. case c.errors <- err:
  72. continue
  73. }
  74. case nil:
  75. break
  76. default:
  77. c.client.disconnectBroker(c.broker)
  78. c.broker, err = c.client.leader(c.topic, c.partition)
  79. if err != nil {
  80. select {
  81. case <-c.stopper:
  82. close(c.messages)
  83. close(c.errors)
  84. close(c.done)
  85. return
  86. case c.errors <- err:
  87. continue
  88. }
  89. }
  90. continue
  91. }
  92. block := response.GetBlock(c.topic, c.partition)
  93. if block == nil {
  94. select {
  95. case <-c.stopper:
  96. close(c.messages)
  97. close(c.errors)
  98. close(c.done)
  99. return
  100. case c.errors <- IncompleteResponse:
  101. continue
  102. }
  103. }
  104. if block.Err != k.NO_ERROR {
  105. select {
  106. case <-c.stopper:
  107. close(c.messages)
  108. close(c.errors)
  109. close(c.done)
  110. return
  111. case c.errors <- block.Err:
  112. continue
  113. }
  114. }
  115. if len(block.MsgSet.Messages) == 0 {
  116. // We got no messages. If we got a trailing one then we need to ask for more data.
  117. // Otherwise we just poll again and wait for one to be produced...
  118. if block.MsgSet.PartialTrailingMessage {
  119. fetchSize *= 2
  120. }
  121. select {
  122. case <-c.stopper:
  123. close(c.messages)
  124. close(c.errors)
  125. close(c.done)
  126. return
  127. default:
  128. continue
  129. }
  130. }
  131. for _, msgBlock := range block.MsgSet.Messages {
  132. // smoosh the kafka return data into a more useful single struct
  133. msg := new(Message)
  134. msg.Offset = msgBlock.Offset
  135. msg.Key = msgBlock.Msg.Key
  136. msg.Value = msgBlock.Msg.Value
  137. // and send it
  138. select {
  139. case <-c.stopper:
  140. close(c.messages)
  141. close(c.errors)
  142. close(c.done)
  143. return
  144. case c.messages <- msg:
  145. c.offset++
  146. }
  147. }
  148. }
  149. }