producer.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. // ProducerConfig is used to pass multiple configuration options to NewProducer.
  8. //
  9. // If MaxBufferTime=MaxBufferedBytes=0, messages will be delivered immediately and
  10. // constantly, but if multiple messages are received while a roundtrip to kafka
  11. // is in progress, they will both be combined into the next request. In this
  12. // mode, errors are not returned from SendMessage, but over the Errors()
  13. // channel.
  14. //
  15. // With MaxBufferTime and/or MaxBufferedBytes set to values > 0, sarama will
  16. // buffer messages before sending, to reduce traffic.
  17. type ProducerConfig struct {
  18. Partitioner Partitioner // Chooses the partition to send messages to, or randomly if this is nil.
  19. RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
  20. Timeout int32 // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
  21. Compression CompressionCodec // The type of compression to use on messages (defaults to no compression).
  22. MaxBufferedBytes uint32 // The maximum number of bytes to buffer per-broker before sending to Kafka.
  23. MaxBufferTime uint32 // The maximum number of milliseconds to buffer messages before sending to a broker.
  24. }
  25. // Producer publishes Kafka messages. It routes messages to the correct broker
  26. // for the provided topic-partition, refreshing metadata as appropriate, and
  27. // parses responses for errors. You must call Close() on a producer to avoid
  28. // leaks: it may not be garbage-collected automatically when it passes out of
  29. // scope (this is in addition to calling Close on the underlying client, which
  30. // is still necessary).
  31. //
  32. // The default values for MaxBufferedBytes and MaxBufferTime cause sarama to
  33. // deliver messages immediately, but to buffer subsequent messages while a
  34. // previous request is in-flight. This is often the correct behaviour.
  35. //
  36. // If synchronous operation is desired, you can use SendMessage. This will cause
  37. // sarama to block until the broker has returned a value. Normally, you will
  38. // want to use QueueMessage instead, and read the error back from the Errors()
  39. // channel. Note that when using QueueMessage, you *must* read the values from
  40. // the Errors() channel, or sarama will block indefinitely after a few requests.
  41. type Producer struct {
  42. client *Client
  43. config ProducerConfig
  44. brokerProducers map[*Broker]*brokerProducer
  45. m sync.RWMutex
  46. errors chan error
  47. deliveryLocks map[topicPartition]chan bool
  48. dm sync.RWMutex
  49. }
  50. type brokerProducer struct {
  51. mapM sync.Mutex
  52. messages map[topicPartition][]*produceMessage
  53. bufferedBytes uint32
  54. flushNow chan bool
  55. broker *Broker
  56. stopper chan bool
  57. done chan bool
  58. hasMessages chan bool
  59. }
  60. type topicPartition struct {
  61. topic string
  62. partition int32
  63. }
  64. // NewProducer creates a new Producer using the given client.
  65. func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
  66. if config == nil {
  67. config = new(ProducerConfig)
  68. }
  69. if config.RequiredAcks < -1 {
  70. return nil, ConfigurationError("Invalid RequiredAcks")
  71. }
  72. if config.Timeout < 0 {
  73. return nil, ConfigurationError("Invalid Timeout")
  74. }
  75. if config.Partitioner == nil {
  76. config.Partitioner = NewRandomPartitioner()
  77. }
  78. if config.MaxBufferedBytes == 0 {
  79. config.MaxBufferedBytes = 1
  80. }
  81. return &Producer{
  82. client: client,
  83. config: *config,
  84. errors: make(chan error, 16),
  85. deliveryLocks: make(map[topicPartition]chan bool),
  86. brokerProducers: make(map[*Broker]*brokerProducer),
  87. }, nil
  88. }
  89. // When operating in asynchronous mode, provides access to errors generated
  90. // while parsing ProduceResponses from kafka. Should never be called in
  91. // synchronous mode.
  92. func (p *Producer) Errors() chan error {
  93. return p.errors
  94. }
  95. // Close shuts down the producer and flushes any messages it may have buffered.
  96. // You must call this function before a producer object passes out of scope, as
  97. // it may otherwise leak memory. You must call this before calling Close on the
  98. // underlying client.
  99. func (p *Producer) Close() error {
  100. for _, bp := range p.brokerProducers {
  101. bp.Close()
  102. }
  103. return nil
  104. }
  105. // QueueMessage sends a message with the given key and value to the given topic.
  106. // The partition to send to is selected by the Producer's Partitioner. To send
  107. // strings as either key or value, see the StringEncoder type.
  108. //
  109. // QueueMessage uses buffering semantics to reduce the nubmer of requests to the
  110. // broker. The buffer logic is tunable with config.MaxBufferedBytes and
  111. // config.MaxBufferTime.
  112. //
  113. // QueueMessage will return an error if it's unable to construct the message
  114. // (unlikely), but network and response errors must be read from Errors(), since
  115. // QueueMessage uses asynchronous delivery. Note that you MUST read back from
  116. // Errors(), otherwise the producer will stall after some number of errors.
  117. //
  118. // If you care about message ordering, you should not call QueueMessage and
  119. // SendMessage on the same Producer. Either, used alone, preserves ordering,
  120. // however.
  121. func (p *Producer) QueueMessage(topic string, key, value Encoder) error {
  122. return p.genericSendMessage(topic, key, value, false)
  123. }
  124. // SendMessage sends a message with the given key and value to the given topic.
  125. // The partition to send to is selected by the Producer's Partitioner. To send
  126. // strings as either key or value, see the StringEncoder type.
  127. //
  128. // Unlike QueueMessage, SendMessage operates synchronously, and will block until
  129. // the response is received from the broker, returning any error generated in
  130. // the process. Reading from Errors() may interfere with the operation of
  131. // SendMessage().
  132. //
  133. // If you care about message ordering, you should not call QueueMessage and
  134. // SendMessage on the same Producer.
  135. func (p *Producer) SendMessage(topic string, key, value Encoder) (err error) {
  136. return p.genericSendMessage(topic, key, value, true)
  137. }
  138. func (p *Producer) genericSendMessage(topic string, key, value Encoder, synchronous bool) (err error) {
  139. var keyBytes, valBytes []byte
  140. if key != nil {
  141. if keyBytes, err = key.Encode(); err != nil {
  142. return err
  143. }
  144. }
  145. if value != nil {
  146. if valBytes, err = value.Encode(); err != nil {
  147. return err
  148. }
  149. }
  150. partition, err := p.choosePartition(topic, key)
  151. if err != nil {
  152. return err
  153. }
  154. // produce_message.go
  155. msg := &produceMessage{
  156. tp: topicPartition{topic, partition},
  157. key: keyBytes,
  158. value: valBytes,
  159. failures: 0,
  160. sync: synchronous,
  161. }
  162. // produce_message.go
  163. return msg.enqueue(p)
  164. }
  165. func (p *Producer) addMessage(msg *produceMessage) error {
  166. bp, err := p.brokerProducerFor(msg.tp)
  167. if err != nil {
  168. return err
  169. }
  170. bp.addMessage(msg, p.config.MaxBufferedBytes)
  171. return nil
  172. }
  173. func (p *Producer) brokerProducerFor(tp topicPartition) (*brokerProducer, error) {
  174. broker, err := p.client.Leader(tp.topic, tp.partition)
  175. if err != nil {
  176. return nil, err
  177. }
  178. p.m.RLock()
  179. bp, ok := p.brokerProducers[broker]
  180. p.m.RUnlock()
  181. if !ok {
  182. p.m.Lock()
  183. bp, ok = p.brokerProducers[broker]
  184. if !ok {
  185. bp = p.newBrokerProducer(broker)
  186. p.brokerProducers[broker] = bp
  187. }
  188. p.m.Unlock()
  189. }
  190. return bp, nil
  191. }
  192. func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
  193. bp := &brokerProducer{
  194. messages: make(map[topicPartition][]*produceMessage),
  195. flushNow: make(chan bool, 1),
  196. broker: broker,
  197. stopper: make(chan bool),
  198. done: make(chan bool),
  199. hasMessages: make(chan bool, 1),
  200. }
  201. maxBufferTime := time.Duration(p.config.MaxBufferTime) * time.Millisecond
  202. var wg sync.WaitGroup
  203. wg.Add(1)
  204. go func() {
  205. timer := time.NewTimer(maxBufferTime)
  206. wg.Done()
  207. for {
  208. select {
  209. case <-bp.flushNow:
  210. bp.flush(p)
  211. case <-timer.C:
  212. bp.flushIfAnyMessages(p)
  213. case <-bp.stopper:
  214. delete(p.brokerProducers, bp.broker)
  215. bp.flushIfAnyMessages(p)
  216. p.client.disconnectBroker(bp.broker)
  217. close(bp.flushNow)
  218. close(bp.hasMessages)
  219. close(bp.done)
  220. return
  221. }
  222. timer.Reset(maxBufferTime)
  223. }
  224. }()
  225. wg.Wait() // don't return until the G has started
  226. return bp
  227. }
  228. func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32) {
  229. bp.mapM.Lock()
  230. if msg.failures > 0 {
  231. // Prepend: Deliver first, before any more recently-added messages.
  232. bp.messages[msg.tp] = append([]*produceMessage{msg}, bp.messages[msg.tp]...)
  233. } else {
  234. // Append
  235. bp.messages[msg.tp] = append(bp.messages[msg.tp], msg)
  236. }
  237. bp.bufferedBytes += msg.byteSize()
  238. select {
  239. case bp.hasMessages <- true:
  240. default:
  241. }
  242. bp.mapM.Unlock()
  243. bp.flushIfOverCapacity(maxBufferBytes)
  244. }
  245. func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes uint32) {
  246. if bp.bufferedBytes > maxBufferBytes {
  247. select {
  248. case bp.flushNow <- true:
  249. default:
  250. }
  251. }
  252. }
  253. func (bp *brokerProducer) flushIfAnyMessages(p *Producer) {
  254. select {
  255. case <-bp.hasMessages:
  256. select {
  257. case bp.hasMessages <- true:
  258. default:
  259. }
  260. bp.flush(p)
  261. default:
  262. }
  263. }
  264. func (bp *brokerProducer) flush(p *Producer) {
  265. var prb produceRequestBuilder
  266. // only deliver messages for topic-partitions that are not currently being delivered.
  267. bp.mapM.Lock()
  268. for tp, messages := range bp.messages {
  269. if len(messages) > 0 && p.tryAcquireDeliveryLock(tp) {
  270. defer p.releaseDeliveryLock(tp)
  271. prb = append(prb, messages...)
  272. delete(bp.messages, tp)
  273. }
  274. }
  275. bp.mapM.Unlock()
  276. if len(prb) > 0 {
  277. bp.mapM.Lock()
  278. bp.bufferedBytes -= prb.byteSize()
  279. bp.mapM.Unlock()
  280. bp.flushRequest(p, prb, func(err error) {
  281. p.errors <- err
  282. })
  283. }
  284. }
  285. func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, errorCb func(error)) {
  286. // produce_message.go
  287. req := prb.toRequest(&p.config)
  288. response, err := bp.broker.Produce(p.client.id, req)
  289. switch err {
  290. case nil:
  291. break
  292. case EncodingError:
  293. // No sense in retrying; it'll just fail again. But what about all the other
  294. // messages that weren't invalid? Really, this is a "shit's broke real good"
  295. // scenario, so logging it and moving on is probably acceptable.
  296. Logger.Printf("[DATA LOSS] EncodingError! Dropped %d messages.\n", len(prb))
  297. errorCb(err)
  298. return
  299. default:
  300. bp.Close()
  301. overlimit := 0
  302. prb.reverseEach(func(msg *produceMessage) {
  303. if err := msg.reenqueue(p); err != nil {
  304. overlimit++
  305. }
  306. })
  307. if overlimit > 0 {
  308. Logger.Printf("[DATA LOSS] %d cannot find a leader for %d messages %d, so they were dropped.\n", overlimit)
  309. errorCb(fmt.Errorf("Dropped %d messages that exceeded the retry limit", overlimit))
  310. }
  311. return
  312. }
  313. // When does this ever actually happen, and why don't we explode when it does?
  314. // This seems bad.
  315. if response == nil {
  316. errorCb(nil)
  317. return
  318. }
  319. for topic, d := range response.Blocks {
  320. for partition, block := range d {
  321. if block == nil {
  322. // IncompleteResponse. Here we just drop all the messages; we don't know whether
  323. // they were successfully sent or not. Non-ideal, but how often does it happen?
  324. Logger.Printf("[DATA LOSS] IncompleteResponse: up to %d messages for %s:%d are in an unknown state\n",
  325. len(prb), topic, partition)
  326. }
  327. switch block.Err {
  328. case NoError:
  329. // All the messages for this topic-partition were delivered successfully!
  330. // Unlock delivery for this topic-partition and discard the produceMessage objects.
  331. errorCb(nil)
  332. case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
  333. p.client.RefreshTopicMetadata(topic)
  334. overlimit := 0
  335. prb.reverseEach(func(msg *produceMessage) {
  336. if msg.hasTopicPartition(topic, partition) {
  337. if err := msg.reenqueue(p); err != nil {
  338. overlimit++
  339. }
  340. }
  341. })
  342. if overlimit > 0 {
  343. Logger.Printf("[DATA LOSS] %d cannot find a leader for %d messages %d, so they were dropped.\n", overlimit)
  344. }
  345. default:
  346. Logger.Printf("[DATA LOSS] Non-retriable error from kafka! Dropped up to %d messages for %s:%d.\n",
  347. len(prb), topic, partition)
  348. }
  349. }
  350. }
  351. }
  352. func (bp *brokerProducer) Close() error {
  353. select {
  354. case <-bp.stopper:
  355. return fmt.Errorf("already closed or closing")
  356. default:
  357. close(bp.stopper)
  358. <-bp.done
  359. }
  360. return nil
  361. }
  362. func (p *Producer) tryAcquireDeliveryLock(tp topicPartition) bool {
  363. p.dm.RLock()
  364. ch, ok := p.deliveryLocks[tp]
  365. p.dm.RUnlock()
  366. if !ok {
  367. p.dm.Lock()
  368. ch, ok = p.deliveryLocks[tp]
  369. if !ok {
  370. ch = make(chan bool, 1)
  371. p.deliveryLocks[tp] = ch
  372. }
  373. p.dm.Unlock()
  374. }
  375. select {
  376. case ch <- true:
  377. return true
  378. default:
  379. return false
  380. }
  381. }
  382. func (p *Producer) releaseDeliveryLock(tp topicPartition) {
  383. p.dm.RLock()
  384. ch := p.deliveryLocks[tp]
  385. p.dm.RUnlock()
  386. select {
  387. case <-ch:
  388. default:
  389. panic("Serious logic bug: releaseDeliveryLock called without acquiring lock first.")
  390. }
  391. }
  392. func (p *Producer) choosePartition(topic string, key Encoder) (int32, error) {
  393. partitions, err := p.client.Partitions(topic)
  394. if err != nil {
  395. return -1, err
  396. }
  397. numPartitions := int32(len(partitions))
  398. choice := p.config.Partitioner.Partition(key, numPartitions)
  399. if choice < 0 || choice >= numPartitions {
  400. return -1, InvalidPartition
  401. }
  402. return partitions[choice], nil
  403. }