producer.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. // ProducerConfig is used to pass multiple configuration options to NewProducer.
  8. //
  9. // If MaxBufferTime=MaxBufferedBytes=0, messages will be delivered immediately and
  10. // constantly, but if multiple messages are received while a roundtrip to kafka
  11. // is in progress, they will both be combined into the next request. In this
  12. // mode, errors are not returned from SendMessage, but over the Errors()
  13. // channel.
  14. //
  15. // With MaxBufferTime and/or MaxBufferedBytes set to values > 0, sarama will
  16. // buffer messages before sending, to reduce traffic.
  17. type ProducerConfig struct {
  18. Partitioner Partitioner // Chooses the partition to send messages to, or randomly if this is nil.
  19. RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
  20. Timeout int32 // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
  21. Compression CompressionCodec // The type of compression to use on messages (defaults to no compression).
  22. MaxBufferedBytes uint32 // The maximum number of bytes to buffer per-broker before sending to Kafka.
  23. MaxBufferTime uint32 // The maximum number of milliseconds to buffer messages before sending to a broker.
  24. }
  25. // Producer publishes Kafka messages. It routes messages to the correct broker
  26. // for the provided topic-partition, refreshing metadata as appropriate, and
  27. // parses responses for errors. You must call Close() on a producer to avoid
  28. // leaks: it may not be garbage-collected automatically when it passes out of
  29. // scope (this is in addition to calling Close on the underlying client, which
  30. // is still necessary).
  31. //
  32. // The default values for MaxBufferedBytes and MaxBufferTime cause sarama to
  33. // deliver messages immediately, but to buffer subsequent messages while a
  34. // previous request is in-flight. This is often the correct behaviour.
  35. //
  36. // If synchronous operation is desired, you can use SendMessage. This will cause
  37. // sarama to block until the broker has returned a value. Normally, you will
  38. // want to use QueueMessage instead, and read the error back from the Errors()
  39. // channel. Note that when using QueueMessage, you *must* read the values from
  40. // the Errors() channel, or sarama will block indefinitely after a few requests.
  41. type Producer struct {
  42. client *Client
  43. config ProducerConfig
  44. brokerProducers map[*Broker]*brokerProducer
  45. m sync.RWMutex
  46. errors chan error
  47. deliveryLocks map[topicPartition]chan bool
  48. dm sync.RWMutex
  49. }
  50. type brokerProducer struct {
  51. mapM sync.Mutex
  52. messages map[topicPartition][]*produceMessage
  53. bufferedBytes uint32
  54. flushNow chan bool
  55. broker *Broker
  56. stopper chan bool
  57. done chan bool
  58. hasMessages chan bool
  59. }
  60. type topicPartition struct {
  61. topic string
  62. partition int32
  63. }
  64. // NewProducer creates a new Producer using the given client.
  65. func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
  66. if config == nil {
  67. config = NewProducerConfig()
  68. }
  69. if err := config.Validate(); err != nil {
  70. return nil, err
  71. }
  72. return &Producer{
  73. client: client,
  74. config: *config,
  75. errors: make(chan error, 16),
  76. deliveryLocks: make(map[topicPartition]chan bool),
  77. brokerProducers: make(map[*Broker]*brokerProducer),
  78. }, nil
  79. }
  80. // When operating in asynchronous mode, provides access to errors generated
  81. // while parsing ProduceResponses from kafka. Should never be called in
  82. // synchronous mode.
  83. func (p *Producer) Errors() chan error {
  84. return p.errors
  85. }
  86. // Close shuts down the producer and flushes any messages it may have buffered.
  87. // You must call this function before a producer object passes out of scope, as
  88. // it may otherwise leak memory. You must call this before calling Close on the
  89. // underlying client.
  90. func (p *Producer) Close() error {
  91. for _, bp := range p.brokerProducers {
  92. bp.Close()
  93. }
  94. return nil
  95. }
  96. // QueueMessage sends a message with the given key and value to the given topic.
  97. // The partition to send to is selected by the Producer's Partitioner. To send
  98. // strings as either key or value, see the StringEncoder type.
  99. //
  100. // QueueMessage uses buffering semantics to reduce the nubmer of requests to the
  101. // broker. The buffer logic is tunable with config.MaxBufferedBytes and
  102. // config.MaxBufferTime.
  103. //
  104. // QueueMessage will return an error if it's unable to construct the message
  105. // (unlikely), but network and response errors must be read from Errors(), since
  106. // QueueMessage uses asynchronous delivery. Note that you MUST read back from
  107. // Errors(), otherwise the producer will stall after some number of errors.
  108. //
  109. // If you care about message ordering, you should not call QueueMessage and
  110. // SendMessage on the same Producer. Either, used alone, preserves ordering,
  111. // however.
  112. func (p *Producer) QueueMessage(topic string, key, value Encoder) error {
  113. return p.genericSendMessage(topic, key, value, false)
  114. }
  115. // SendMessage sends a message with the given key and value to the given topic.
  116. // The partition to send to is selected by the Producer's Partitioner. To send
  117. // strings as either key or value, see the StringEncoder type.
  118. //
  119. // Unlike QueueMessage, SendMessage operates synchronously, and will block until
  120. // the response is received from the broker, returning any error generated in
  121. // the process. Reading from Errors() may interfere with the operation of
  122. // SendMessage().
  123. //
  124. // If you care about message ordering, you should not call QueueMessage and
  125. // SendMessage on the same Producer.
  126. func (p *Producer) SendMessage(topic string, key, value Encoder) (err error) {
  127. return p.genericSendMessage(topic, key, value, true)
  128. }
  129. func (p *Producer) genericSendMessage(topic string, key, value Encoder, synchronous bool) (err error) {
  130. var keyBytes, valBytes []byte
  131. if key != nil {
  132. if keyBytes, err = key.Encode(); err != nil {
  133. return err
  134. }
  135. }
  136. if value != nil {
  137. if valBytes, err = value.Encode(); err != nil {
  138. return err
  139. }
  140. }
  141. partition, err := p.choosePartition(topic, key)
  142. if err != nil {
  143. return err
  144. }
  145. // produce_message.go
  146. msg := &produceMessage{
  147. tp: topicPartition{topic, partition},
  148. key: keyBytes,
  149. value: valBytes,
  150. sync: synchronous,
  151. }
  152. // produce_message.go
  153. return msg.enqueue(p)
  154. }
  155. func (p *Producer) addMessage(msg *produceMessage) error {
  156. bp, err := p.brokerProducerFor(msg.tp)
  157. if err != nil {
  158. return err
  159. }
  160. bp.addMessage(msg, p.config.MaxBufferedBytes)
  161. return nil
  162. }
  163. func (p *Producer) brokerProducerFor(tp topicPartition) (*brokerProducer, error) {
  164. broker, err := p.client.Leader(tp.topic, tp.partition)
  165. if err != nil {
  166. return nil, err
  167. }
  168. p.m.RLock()
  169. bp, ok := p.brokerProducers[broker]
  170. p.m.RUnlock()
  171. if !ok {
  172. p.m.Lock()
  173. bp, ok = p.brokerProducers[broker]
  174. if !ok {
  175. bp = p.newBrokerProducer(broker)
  176. p.brokerProducers[broker] = bp
  177. }
  178. p.m.Unlock()
  179. }
  180. return bp, nil
  181. }
  182. func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
  183. bp := &brokerProducer{
  184. messages: make(map[topicPartition][]*produceMessage),
  185. flushNow: make(chan bool, 1),
  186. broker: broker,
  187. stopper: make(chan bool),
  188. done: make(chan bool),
  189. hasMessages: make(chan bool, 1),
  190. }
  191. maxBufferTime := time.Duration(p.config.MaxBufferTime) * time.Millisecond
  192. var wg sync.WaitGroup
  193. wg.Add(1)
  194. go func() {
  195. timer := time.NewTimer(maxBufferTime)
  196. var shutdownRequired bool
  197. wg.Done()
  198. for {
  199. select {
  200. case <-bp.flushNow:
  201. if shutdownRequired = bp.flush(p); shutdownRequired {
  202. goto shutdown
  203. }
  204. case <-timer.C:
  205. if shutdownRequired = bp.flushIfAnyMessages(p); shutdownRequired {
  206. goto shutdown
  207. }
  208. case <-bp.stopper:
  209. goto shutdown
  210. }
  211. timer.Reset(maxBufferTime)
  212. }
  213. shutdown:
  214. delete(p.brokerProducers, bp.broker)
  215. bp.flushIfAnyMessages(p)
  216. p.client.disconnectBroker(bp.broker)
  217. close(bp.flushNow)
  218. close(bp.hasMessages)
  219. close(bp.done)
  220. }()
  221. wg.Wait() // don't return until the G has started
  222. return bp
  223. }
  224. func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32) {
  225. bp.mapM.Lock()
  226. if msg.retried {
  227. // Prepend: Deliver first, before any more recently-added messages.
  228. bp.messages[msg.tp] = append([]*produceMessage{msg}, bp.messages[msg.tp]...)
  229. } else {
  230. // Append
  231. bp.messages[msg.tp] = append(bp.messages[msg.tp], msg)
  232. }
  233. bp.bufferedBytes += msg.byteSize()
  234. select {
  235. case bp.hasMessages <- true:
  236. default:
  237. }
  238. bp.mapM.Unlock()
  239. bp.flushIfOverCapacity(maxBufferBytes)
  240. }
  241. func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes uint32) {
  242. if bp.bufferedBytes > maxBufferBytes {
  243. select {
  244. case bp.flushNow <- true:
  245. default:
  246. }
  247. }
  248. }
  249. func (bp *brokerProducer) flushIfAnyMessages(p *Producer) (shutdownRequired bool) {
  250. select {
  251. case <-bp.hasMessages:
  252. select {
  253. case bp.hasMessages <- true:
  254. default:
  255. }
  256. return bp.flush(p)
  257. default:
  258. }
  259. return false
  260. }
  261. func (bp *brokerProducer) flush(p *Producer) (shutdownRequired bool) {
  262. var prb produceRequestBuilder
  263. // only deliver messages for topic-partitions that are not currently being delivered.
  264. bp.mapM.Lock()
  265. for tp, messages := range bp.messages {
  266. if len(messages) > 0 && p.tryAcquireDeliveryLock(tp) {
  267. prb = append(prb, messages...)
  268. delete(bp.messages, tp)
  269. p.releaseDeliveryLock(tp)
  270. }
  271. }
  272. bp.mapM.Unlock()
  273. if len(prb) > 0 {
  274. bp.mapM.Lock()
  275. bp.bufferedBytes -= prb.byteSize()
  276. bp.mapM.Unlock()
  277. return bp.flushRequest(p, prb, func(err error) {
  278. if err != nil {
  279. Logger.Println(err)
  280. }
  281. p.errors <- err
  282. })
  283. }
  284. return false
  285. }
  286. func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, errorCb func(error)) (shutdownRequired bool) {
  287. // produce_message.go
  288. req := prb.toRequest(&p.config)
  289. response, err := bp.broker.Produce(p.client.id, req)
  290. switch err {
  291. case nil:
  292. break
  293. case EncodingError:
  294. // No sense in retrying; it'll just fail again. But what about all the other
  295. // messages that weren't invalid? Really, this is a "shit's broke real good"
  296. // scenario, so logging it and moving on is probably acceptable.
  297. errorCb(err)
  298. return false
  299. default:
  300. overlimit := 0
  301. prb.reverseEach(func(msg *produceMessage) {
  302. if err := msg.reenqueue(p); err != nil {
  303. overlimit++
  304. }
  305. })
  306. if overlimit > 0 {
  307. errorCb(DroppedMessagesError{overlimit, nil})
  308. }
  309. return true
  310. }
  311. // When does this ever actually happen, and why don't we explode when it does?
  312. // This seems bad.
  313. if response == nil {
  314. errorCb(nil)
  315. return false
  316. }
  317. for topic, d := range response.Blocks {
  318. for partition, block := range d {
  319. if block == nil {
  320. // IncompleteResponse. Here we just drop all the messages; we don't know whether
  321. // they were successfully sent or not. Non-ideal, but how often does it happen?
  322. errorCb(DroppedMessagesError{len(prb), IncompleteResponse})
  323. }
  324. switch block.Err {
  325. case NoError:
  326. // All the messages for this topic-partition were delivered successfully!
  327. // Unlock delivery for this topic-partition and discard the produceMessage objects.
  328. errorCb(nil)
  329. case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
  330. p.client.RefreshTopicMetadata(topic)
  331. overlimit := 0
  332. prb.reverseEach(func(msg *produceMessage) {
  333. if msg.hasTopicPartition(topic, partition) {
  334. if err := msg.reenqueue(p); err != nil {
  335. overlimit++
  336. }
  337. }
  338. })
  339. if overlimit > 0 {
  340. errorCb(DroppedMessagesError{overlimit, nil})
  341. }
  342. default:
  343. errorCb(DroppedMessagesError{len(prb), err})
  344. }
  345. }
  346. }
  347. return false
  348. }
  349. func (bp *brokerProducer) Close() error {
  350. select {
  351. case <-bp.stopper:
  352. return fmt.Errorf("already closed or closing")
  353. default:
  354. close(bp.stopper)
  355. <-bp.done
  356. }
  357. return nil
  358. }
  359. func (p *Producer) tryAcquireDeliveryLock(tp topicPartition) bool {
  360. p.dm.RLock()
  361. ch, ok := p.deliveryLocks[tp]
  362. p.dm.RUnlock()
  363. if !ok {
  364. p.dm.Lock()
  365. ch, ok = p.deliveryLocks[tp]
  366. if !ok {
  367. ch = make(chan bool, 1)
  368. p.deliveryLocks[tp] = ch
  369. }
  370. p.dm.Unlock()
  371. }
  372. select {
  373. case ch <- true:
  374. return true
  375. default:
  376. return false
  377. }
  378. }
  379. func (p *Producer) releaseDeliveryLock(tp topicPartition) {
  380. p.dm.RLock()
  381. ch := p.deliveryLocks[tp]
  382. p.dm.RUnlock()
  383. select {
  384. case <-ch:
  385. default:
  386. panic("Serious logic bug: releaseDeliveryLock called without acquiring lock first.")
  387. }
  388. }
  389. func (p *Producer) choosePartition(topic string, key Encoder) (int32, error) {
  390. partitions, err := p.client.Partitions(topic)
  391. if err != nil {
  392. return -1, err
  393. }
  394. numPartitions := int32(len(partitions))
  395. if numPartitions == 0 {
  396. return -1, LeaderNotAvailable
  397. }
  398. choice := p.config.Partitioner.Partition(key, numPartitions)
  399. if choice < 0 || choice >= numPartitions {
  400. return -1, InvalidPartition
  401. }
  402. return partitions[choice], nil
  403. }
  404. // Creates a new ProducerConfig instance with sensible defaults.
  405. func NewProducerConfig() *ProducerConfig {
  406. return &ProducerConfig{
  407. Partitioner: NewRandomPartitioner(),
  408. RequiredAcks: WaitForLocal,
  409. }
  410. }
  411. // Validates a ProducerConfig instance. It will return a
  412. // ConfigurationError if the specified value doesn't make sense.
  413. func (config *ProducerConfig) Validate() error {
  414. if config.RequiredAcks < -1 {
  415. return ConfigurationError("Invalid RequiredAcks")
  416. }
  417. if config.Timeout < 0 {
  418. return ConfigurationError("Invalid Timeout")
  419. }
  420. if config.MaxBufferedBytes == 0 {
  421. return ConfigurationError("Invalid MaxBufferedBytes")
  422. }
  423. if config.MaxBufferTime == 0 {
  424. return ConfigurationError("Invalid MaxBufferTime")
  425. }
  426. if config.Partitioner == nil {
  427. return ConfigurationError("No partitioner set")
  428. }
  429. return nil
  430. }