producer.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. // ProducerConfig is used to pass multiple configuration options to NewProducer.
  8. //
  9. // If MaxBufferTime=MaxBufferedBytes=0, messages will be delivered immediately and
  10. // constantly, but if multiple messages are received while a roundtrip to kafka
  11. // is in progress, they will both be combined into the next request. In this
  12. // mode, errors are not returned from SendMessage, but over the Errors()
  13. // channel.
  14. //
  15. // With MaxBufferTime and/or MaxBufferedBytes set to values > 0, sarama will
  16. // buffer messages before sending, to reduce traffic.
  17. type ProducerConfig struct {
  18. Partitioner Partitioner // Chooses the partition to send messages to, or randomly if this is nil.
  19. RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
  20. Timeout int32 // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
  21. Compression CompressionCodec // The type of compression to use on messages (defaults to no compression).
  22. MaxBufferedBytes uint32 // The maximum number of bytes to buffer per-broker before sending to Kafka.
  23. MaxBufferTime uint32 // The maximum number of milliseconds to buffer messages before sending to a broker.
  24. }
  25. // Producer publishes Kafka messages. It routes messages to the correct broker
  26. // for the provided topic-partition, refreshing metadata as appropriate, and
  27. // parses responses for errors. You must call Close() on a producer to avoid
  28. // leaks: it may not be garbage-collected automatically when it passes out of
  29. // scope (this is in addition to calling Close on the underlying client, which
  30. // is still necessary).
  31. //
  32. // The default values for MaxBufferedBytes and MaxBufferTime cause sarama to
  33. // deliver messages immediately, but to buffer subsequent messages while a
  34. // previous request is in-flight. This is often the correct behaviour.
  35. //
  36. // If synchronous operation is desired, you can use SendMessage. This will cause
  37. // sarama to block until the broker has returned a value. Normally, you will
  38. // want to use QueueMessage instead, and read the error back from the Errors()
  39. // channel. Note that when using QueueMessage, you *must* read the values from
  40. // the Errors() channel, or sarama will block indefinitely after a few requests.
  41. type Producer struct {
  42. client *Client
  43. config ProducerConfig
  44. brokerProducers map[*Broker]*brokerProducer
  45. m sync.RWMutex
  46. errors chan error
  47. deliveryLocks map[topicPartition]chan bool
  48. dm sync.RWMutex
  49. }
  50. type brokerProducer struct {
  51. mapM sync.Mutex
  52. messages map[topicPartition][]*produceMessage
  53. bufferedBytes uint32
  54. flushNow chan bool
  55. broker *Broker
  56. stopper chan bool
  57. done chan bool
  58. hasMessages chan bool
  59. }
  60. type topicPartition struct {
  61. topic string
  62. partition int32
  63. }
  64. // NewProducer creates a new Producer using the given client.
  65. func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
  66. if config == nil {
  67. config = new(ProducerConfig)
  68. }
  69. if config.RequiredAcks < -1 {
  70. return nil, ConfigurationError("Invalid RequiredAcks")
  71. }
  72. if config.Timeout < 0 {
  73. return nil, ConfigurationError("Invalid Timeout")
  74. }
  75. if config.Partitioner == nil {
  76. config.Partitioner = NewRandomPartitioner()
  77. }
  78. if config.MaxBufferedBytes == 0 {
  79. return nil, ConfigurationError("Invalid MaxBufferedBytes")
  80. }
  81. if config.MaxBufferTime == 0 {
  82. return nil, ConfigurationError("Invalid MaxBufferTime")
  83. }
  84. return &Producer{
  85. client: client,
  86. config: *config,
  87. errors: make(chan error, 16),
  88. deliveryLocks: make(map[topicPartition]chan bool),
  89. brokerProducers: make(map[*Broker]*brokerProducer),
  90. }, nil
  91. }
  92. // When operating in asynchronous mode, provides access to errors generated
  93. // while parsing ProduceResponses from kafka. Should never be called in
  94. // synchronous mode.
  95. func (p *Producer) Errors() chan error {
  96. return p.errors
  97. }
  98. // Close shuts down the producer and flushes any messages it may have buffered.
  99. // You must call this function before a producer object passes out of scope, as
  100. // it may otherwise leak memory. You must call this before calling Close on the
  101. // underlying client.
  102. func (p *Producer) Close() error {
  103. for _, bp := range p.brokerProducers {
  104. bp.Close()
  105. }
  106. return nil
  107. }
  108. // QueueMessage sends a message with the given key and value to the given topic.
  109. // The partition to send to is selected by the Producer's Partitioner. To send
  110. // strings as either key or value, see the StringEncoder type.
  111. //
  112. // QueueMessage uses buffering semantics to reduce the nubmer of requests to the
  113. // broker. The buffer logic is tunable with config.MaxBufferedBytes and
  114. // config.MaxBufferTime.
  115. //
  116. // QueueMessage will return an error if it's unable to construct the message
  117. // (unlikely), but network and response errors must be read from Errors(), since
  118. // QueueMessage uses asynchronous delivery. Note that you MUST read back from
  119. // Errors(), otherwise the producer will stall after some number of errors.
  120. //
  121. // If you care about message ordering, you should not call QueueMessage and
  122. // SendMessage on the same Producer. Either, used alone, preserves ordering,
  123. // however.
  124. func (p *Producer) QueueMessage(topic string, key, value Encoder) error {
  125. return p.genericSendMessage(topic, key, value, false)
  126. }
  127. // SendMessage sends a message with the given key and value to the given topic.
  128. // The partition to send to is selected by the Producer's Partitioner. To send
  129. // strings as either key or value, see the StringEncoder type.
  130. //
  131. // Unlike QueueMessage, SendMessage operates synchronously, and will block until
  132. // the response is received from the broker, returning any error generated in
  133. // the process. Reading from Errors() may interfere with the operation of
  134. // SendMessage().
  135. //
  136. // If you care about message ordering, you should not call QueueMessage and
  137. // SendMessage on the same Producer.
  138. func (p *Producer) SendMessage(topic string, key, value Encoder) (err error) {
  139. return p.genericSendMessage(topic, key, value, true)
  140. }
  141. func (p *Producer) genericSendMessage(topic string, key, value Encoder, synchronous bool) (err error) {
  142. var keyBytes, valBytes []byte
  143. if key != nil {
  144. if keyBytes, err = key.Encode(); err != nil {
  145. return err
  146. }
  147. }
  148. if value != nil {
  149. if valBytes, err = value.Encode(); err != nil {
  150. return err
  151. }
  152. }
  153. partition, err := p.choosePartition(topic, key)
  154. if err != nil {
  155. return err
  156. }
  157. // produce_message.go
  158. msg := &produceMessage{
  159. tp: topicPartition{topic, partition},
  160. key: keyBytes,
  161. value: valBytes,
  162. sync: synchronous,
  163. }
  164. // produce_message.go
  165. return msg.enqueue(p)
  166. }
  167. func (p *Producer) addMessage(msg *produceMessage) error {
  168. bp, err := p.brokerProducerFor(msg.tp)
  169. if err != nil {
  170. return err
  171. }
  172. bp.addMessage(msg, p.config.MaxBufferedBytes)
  173. return nil
  174. }
  175. func (p *Producer) brokerProducerFor(tp topicPartition) (*brokerProducer, error) {
  176. broker, err := p.client.Leader(tp.topic, tp.partition)
  177. if err != nil {
  178. return nil, err
  179. }
  180. p.m.RLock()
  181. bp, ok := p.brokerProducers[broker]
  182. p.m.RUnlock()
  183. if !ok {
  184. p.m.Lock()
  185. bp, ok = p.brokerProducers[broker]
  186. if !ok {
  187. bp = p.newBrokerProducer(broker)
  188. p.brokerProducers[broker] = bp
  189. }
  190. p.m.Unlock()
  191. }
  192. return bp, nil
  193. }
  194. func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
  195. bp := &brokerProducer{
  196. messages: make(map[topicPartition][]*produceMessage),
  197. flushNow: make(chan bool, 1),
  198. broker: broker,
  199. stopper: make(chan bool),
  200. done: make(chan bool),
  201. hasMessages: make(chan bool, 1),
  202. }
  203. maxBufferTime := time.Duration(p.config.MaxBufferTime) * time.Millisecond
  204. var wg sync.WaitGroup
  205. wg.Add(1)
  206. go func() {
  207. timer := time.NewTimer(maxBufferTime)
  208. var shutdownRequired bool
  209. wg.Done()
  210. for {
  211. select {
  212. case <-bp.flushNow:
  213. if shutdownRequired = bp.flush(p); shutdownRequired {
  214. goto shutdown
  215. }
  216. case <-timer.C:
  217. if shutdownRequired = bp.flushIfAnyMessages(p); shutdownRequired {
  218. goto shutdown
  219. }
  220. case <-bp.stopper:
  221. goto shutdown
  222. }
  223. timer.Reset(maxBufferTime)
  224. }
  225. shutdown:
  226. delete(p.brokerProducers, bp.broker)
  227. bp.flushIfAnyMessages(p)
  228. p.client.disconnectBroker(bp.broker)
  229. close(bp.flushNow)
  230. close(bp.hasMessages)
  231. close(bp.done)
  232. }()
  233. wg.Wait() // don't return until the G has started
  234. return bp
  235. }
  236. func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32) {
  237. bp.mapM.Lock()
  238. if msg.retried {
  239. // Prepend: Deliver first, before any more recently-added messages.
  240. bp.messages[msg.tp] = append([]*produceMessage{msg}, bp.messages[msg.tp]...)
  241. } else {
  242. // Append
  243. bp.messages[msg.tp] = append(bp.messages[msg.tp], msg)
  244. }
  245. bp.bufferedBytes += msg.byteSize()
  246. select {
  247. case bp.hasMessages <- true:
  248. default:
  249. }
  250. bp.mapM.Unlock()
  251. bp.flushIfOverCapacity(maxBufferBytes)
  252. }
  253. func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes uint32) {
  254. if bp.bufferedBytes > maxBufferBytes {
  255. select {
  256. case bp.flushNow <- true:
  257. default:
  258. }
  259. }
  260. }
  261. func (bp *brokerProducer) flushIfAnyMessages(p *Producer) (shutdownRequired bool) {
  262. select {
  263. case <-bp.hasMessages:
  264. select {
  265. case bp.hasMessages <- true:
  266. default:
  267. }
  268. return bp.flush(p)
  269. default:
  270. }
  271. return false
  272. }
  273. func (bp *brokerProducer) flush(p *Producer) (shutdownRequired bool) {
  274. var prb produceRequestBuilder
  275. // only deliver messages for topic-partitions that are not currently being delivered.
  276. bp.mapM.Lock()
  277. for tp, messages := range bp.messages {
  278. if len(messages) > 0 && p.tryAcquireDeliveryLock(tp) {
  279. prb = append(prb, messages...)
  280. delete(bp.messages, tp)
  281. p.releaseDeliveryLock(tp)
  282. }
  283. }
  284. bp.mapM.Unlock()
  285. if len(prb) > 0 {
  286. bp.mapM.Lock()
  287. bp.bufferedBytes -= prb.byteSize()
  288. bp.mapM.Unlock()
  289. return bp.flushRequest(p, prb, func(err error) {
  290. if err != nil {
  291. Logger.Println(err)
  292. }
  293. p.errors <- err
  294. })
  295. }
  296. return false
  297. }
  298. func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, errorCb func(error)) (shutdownRequired bool) {
  299. // produce_message.go
  300. req := prb.toRequest(&p.config)
  301. response, err := bp.broker.Produce(p.client.id, req)
  302. switch err {
  303. case nil:
  304. break
  305. case EncodingError:
  306. // No sense in retrying; it'll just fail again. But what about all the other
  307. // messages that weren't invalid? Really, this is a "shit's broke real good"
  308. // scenario, so logging it and moving on is probably acceptable.
  309. errorCb(err)
  310. return false
  311. default:
  312. overlimit := 0
  313. prb.reverseEach(func(msg *produceMessage) {
  314. if err := msg.reenqueue(p); err != nil {
  315. overlimit++
  316. }
  317. })
  318. if overlimit > 0 {
  319. errorCb(DroppedMessagesError{overlimit, nil})
  320. }
  321. return true
  322. }
  323. // When does this ever actually happen, and why don't we explode when it does?
  324. // This seems bad.
  325. if response == nil {
  326. errorCb(nil)
  327. return false
  328. }
  329. for topic, d := range response.Blocks {
  330. for partition, block := range d {
  331. if block == nil {
  332. // IncompleteResponse. Here we just drop all the messages; we don't know whether
  333. // they were successfully sent or not. Non-ideal, but how often does it happen?
  334. errorCb(DroppedMessagesError{len(prb), IncompleteResponse})
  335. }
  336. switch block.Err {
  337. case NoError:
  338. // All the messages for this topic-partition were delivered successfully!
  339. // Unlock delivery for this topic-partition and discard the produceMessage objects.
  340. errorCb(nil)
  341. case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
  342. p.client.RefreshTopicMetadata(topic)
  343. overlimit := 0
  344. prb.reverseEach(func(msg *produceMessage) {
  345. if msg.hasTopicPartition(topic, partition) {
  346. if err := msg.reenqueue(p); err != nil {
  347. overlimit++
  348. }
  349. }
  350. })
  351. if overlimit > 0 {
  352. errorCb(DroppedMessagesError{overlimit, nil})
  353. }
  354. default:
  355. errorCb(DroppedMessagesError{len(prb), err})
  356. }
  357. }
  358. }
  359. return false
  360. }
  361. func (bp *brokerProducer) Close() error {
  362. select {
  363. case <-bp.stopper:
  364. return fmt.Errorf("already closed or closing")
  365. default:
  366. close(bp.stopper)
  367. <-bp.done
  368. }
  369. return nil
  370. }
  371. func (p *Producer) tryAcquireDeliveryLock(tp topicPartition) bool {
  372. p.dm.RLock()
  373. ch, ok := p.deliveryLocks[tp]
  374. p.dm.RUnlock()
  375. if !ok {
  376. p.dm.Lock()
  377. ch, ok = p.deliveryLocks[tp]
  378. if !ok {
  379. ch = make(chan bool, 1)
  380. p.deliveryLocks[tp] = ch
  381. }
  382. p.dm.Unlock()
  383. }
  384. select {
  385. case ch <- true:
  386. return true
  387. default:
  388. return false
  389. }
  390. }
  391. func (p *Producer) releaseDeliveryLock(tp topicPartition) {
  392. p.dm.RLock()
  393. ch := p.deliveryLocks[tp]
  394. p.dm.RUnlock()
  395. select {
  396. case <-ch:
  397. default:
  398. panic("Serious logic bug: releaseDeliveryLock called without acquiring lock first.")
  399. }
  400. }
  401. func (p *Producer) choosePartition(topic string, key Encoder) (int32, error) {
  402. partitions, err := p.client.Partitions(topic)
  403. if err != nil {
  404. return -1, err
  405. }
  406. numPartitions := int32(len(partitions))
  407. choice := p.config.Partitioner.Partition(key, numPartitions)
  408. if choice < 0 || choice >= numPartitions {
  409. return -1, InvalidPartition
  410. }
  411. return partitions[choice], nil
  412. }