async_producer.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904
  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/eapache/go-resiliency/breaker"
  7. "github.com/eapache/queue"
  8. )
  9. // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
  10. // to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
  11. // and parses responses for errors. You must read from the Errors() channel or the
  12. // producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
  13. // leaks: it will not be garbage-collected automatically when it passes out of
  14. // scope.
  15. type AsyncProducer interface {
  16. // AsyncClose triggers a shutdown of the producer. The shutdown has completed
  17. // when both the Errors and Successes channels have been closed. When calling
  18. // AsyncClose, you *must* continue to read from those channels in order to
  19. // drain the results of any messages in flight.
  20. AsyncClose()
  21. // Close shuts down the producer and waits for any buffered messages to be
  22. // flushed. You must call this function before a producer object passes out of
  23. // scope, as it may otherwise leak memory. You must call this before calling
  24. // Close on the underlying client.
  25. Close() error
  26. // Input is the input channel for the user to write messages to that they
  27. // wish to send.
  28. Input() chan<- *ProducerMessage
  29. // Successes is the success output channel back to the user when Return.Successes is
  30. // enabled. If Return.Successes is true, you MUST read from this channel or the
  31. // Producer will deadlock. It is suggested that you send and read messages
  32. // together in a single select statement.
  33. Successes() <-chan *ProducerMessage
  34. // Errors is the error output channel back to the user. You MUST read from this
  35. // channel or the Producer will deadlock when the channel is full. Alternatively,
  36. // you can set Producer.Return.Errors in your config to false, which prevents
  37. // errors to be returned.
  38. Errors() <-chan *ProducerError
  39. }
  40. type asyncProducer struct {
  41. client Client
  42. conf *Config
  43. ownClient bool
  44. errors chan *ProducerError
  45. input, successes, retries chan *ProducerMessage
  46. inFlight sync.WaitGroup
  47. brokers map[*Broker]chan<- *ProducerMessage
  48. brokerRefs map[chan<- *ProducerMessage]int
  49. brokerLock sync.Mutex
  50. }
  51. // NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
  52. func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
  53. client, err := NewClient(addrs, conf)
  54. if err != nil {
  55. return nil, err
  56. }
  57. p, err := NewAsyncProducerFromClient(client)
  58. if err != nil {
  59. return nil, err
  60. }
  61. p.(*asyncProducer).ownClient = true
  62. return p, nil
  63. }
  64. // NewAsyncProducerFromClient creates a new Producer using the given client. It is still
  65. // necessary to call Close() on the underlying client when shutting down this producer.
  66. func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
  67. // Check that we are not dealing with a closed Client before processing any other arguments
  68. if client.Closed() {
  69. return nil, ErrClosedClient
  70. }
  71. p := &asyncProducer{
  72. client: client,
  73. conf: client.Config(),
  74. errors: make(chan *ProducerError),
  75. input: make(chan *ProducerMessage),
  76. successes: make(chan *ProducerMessage),
  77. retries: make(chan *ProducerMessage),
  78. brokers: make(map[*Broker]chan<- *ProducerMessage),
  79. brokerRefs: make(map[chan<- *ProducerMessage]int),
  80. }
  81. // launch our singleton dispatchers
  82. go withRecover(p.dispatcher)
  83. go withRecover(p.retryHandler)
  84. return p, nil
  85. }
  86. type flagSet int8
  87. const (
  88. syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
  89. fin // final message from partitionProducer to brokerProducer and back
  90. shutdown // start the shutdown process
  91. )
  92. // ProducerMessage is the collection of elements passed to the Producer in order to send a message.
  93. type ProducerMessage struct {
  94. Topic string // The Kafka topic for this message.
  95. // The partitioning key for this message. Pre-existing Encoders include
  96. // StringEncoder and ByteEncoder.
  97. Key Encoder
  98. // The actual message to store in Kafka. Pre-existing Encoders include
  99. // StringEncoder and ByteEncoder.
  100. Value Encoder
  101. // This field is used to hold arbitrary data you wish to include so it
  102. // will be available when receiving on the Successes and Errors channels.
  103. // Sarama completely ignores this field and is only to be used for
  104. // pass-through data.
  105. Metadata interface{}
  106. // Below this point are filled in by the producer as the message is processed
  107. // Offset is the offset of the message stored on the broker. This is only
  108. // guaranteed to be defined if the message was successfully delivered and
  109. // RequiredAcks is not NoResponse.
  110. Offset int64
  111. // Partition is the partition that the message was sent to. This is only
  112. // guaranteed to be defined if the message was successfully delivered.
  113. Partition int32
  114. // Timestamp is the timestamp assigned to the message by the broker. This
  115. // is only guaranteed to be defined if the message was successfully
  116. // delivered, RequiredAcks is not NoResponse, and the Kafka broker is at
  117. // least version 0.10.0.
  118. Timestamp time.Time
  119. retries int
  120. flags flagSet
  121. }
  122. const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
  123. func (m *ProducerMessage) byteSize() int {
  124. size := producerMessageOverhead
  125. if m.Key != nil {
  126. size += m.Key.Length()
  127. }
  128. if m.Value != nil {
  129. size += m.Value.Length()
  130. }
  131. return size
  132. }
  133. func (m *ProducerMessage) clear() {
  134. m.flags = 0
  135. m.retries = 0
  136. }
  137. // ProducerError is the type of error generated when the producer fails to deliver a message.
  138. // It contains the original ProducerMessage as well as the actual error value.
  139. type ProducerError struct {
  140. Msg *ProducerMessage
  141. Err error
  142. }
  143. func (pe ProducerError) Error() string {
  144. return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
  145. }
  146. // ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
  147. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
  148. // when closing a producer.
  149. type ProducerErrors []*ProducerError
  150. func (pe ProducerErrors) Error() string {
  151. return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
  152. }
  153. func (p *asyncProducer) Errors() <-chan *ProducerError {
  154. return p.errors
  155. }
  156. func (p *asyncProducer) Successes() <-chan *ProducerMessage {
  157. return p.successes
  158. }
  159. func (p *asyncProducer) Input() chan<- *ProducerMessage {
  160. return p.input
  161. }
  162. func (p *asyncProducer) Close() error {
  163. p.AsyncClose()
  164. if p.conf.Producer.Return.Successes {
  165. go withRecover(func() {
  166. for range p.successes {
  167. }
  168. })
  169. }
  170. var errors ProducerErrors
  171. if p.conf.Producer.Return.Errors {
  172. for event := range p.errors {
  173. errors = append(errors, event)
  174. }
  175. } else {
  176. <-p.errors
  177. }
  178. if len(errors) > 0 {
  179. return errors
  180. }
  181. return nil
  182. }
  183. func (p *asyncProducer) AsyncClose() {
  184. go withRecover(p.shutdown)
  185. }
  186. // singleton
  187. // dispatches messages by topic
  188. func (p *asyncProducer) dispatcher() {
  189. handlers := make(map[string]chan<- *ProducerMessage)
  190. shuttingDown := false
  191. for msg := range p.input {
  192. if msg == nil {
  193. Logger.Println("Something tried to send a nil message, it was ignored.")
  194. continue
  195. }
  196. if msg.flags&shutdown != 0 {
  197. shuttingDown = true
  198. p.inFlight.Done()
  199. continue
  200. } else if msg.retries == 0 {
  201. if shuttingDown {
  202. // we can't just call returnError here because that decrements the wait group,
  203. // which hasn't been incremented yet for this message, and shouldn't be
  204. pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
  205. if p.conf.Producer.Return.Errors {
  206. p.errors <- pErr
  207. } else {
  208. Logger.Println(pErr)
  209. }
  210. continue
  211. }
  212. p.inFlight.Add(1)
  213. }
  214. if msg.byteSize() > p.conf.Producer.MaxMessageBytes {
  215. p.returnError(msg, ErrMessageSizeTooLarge)
  216. continue
  217. }
  218. handler := handlers[msg.Topic]
  219. if handler == nil {
  220. handler = p.newTopicProducer(msg.Topic)
  221. handlers[msg.Topic] = handler
  222. }
  223. handler <- msg
  224. }
  225. for _, handler := range handlers {
  226. close(handler)
  227. }
  228. }
  229. // one per topic
  230. // partitions messages, then dispatches them by partition
  231. type topicProducer struct {
  232. parent *asyncProducer
  233. topic string
  234. input <-chan *ProducerMessage
  235. breaker *breaker.Breaker
  236. handlers map[int32]chan<- *ProducerMessage
  237. partitioner Partitioner
  238. }
  239. func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
  240. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  241. tp := &topicProducer{
  242. parent: p,
  243. topic: topic,
  244. input: input,
  245. breaker: breaker.New(3, 1, 10*time.Second),
  246. handlers: make(map[int32]chan<- *ProducerMessage),
  247. partitioner: p.conf.Producer.Partitioner(topic),
  248. }
  249. go withRecover(tp.dispatch)
  250. return input
  251. }
  252. func (tp *topicProducer) dispatch() {
  253. for msg := range tp.input {
  254. if msg.retries == 0 {
  255. if err := tp.partitionMessage(msg); err != nil {
  256. tp.parent.returnError(msg, err)
  257. continue
  258. }
  259. }
  260. handler := tp.handlers[msg.Partition]
  261. if handler == nil {
  262. handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
  263. tp.handlers[msg.Partition] = handler
  264. }
  265. handler <- msg
  266. }
  267. for _, handler := range tp.handlers {
  268. close(handler)
  269. }
  270. }
  271. func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
  272. var partitions []int32
  273. err := tp.breaker.Run(func() (err error) {
  274. if tp.partitioner.RequiresConsistency() {
  275. partitions, err = tp.parent.client.Partitions(msg.Topic)
  276. } else {
  277. partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
  278. }
  279. return
  280. })
  281. if err != nil {
  282. return err
  283. }
  284. numPartitions := int32(len(partitions))
  285. if numPartitions == 0 {
  286. return ErrLeaderNotAvailable
  287. }
  288. choice, err := tp.partitioner.Partition(msg, numPartitions)
  289. if err != nil {
  290. return err
  291. } else if choice < 0 || choice >= numPartitions {
  292. return ErrInvalidPartition
  293. }
  294. msg.Partition = partitions[choice]
  295. return nil
  296. }
  297. // one per partition per topic
  298. // dispatches messages to the appropriate broker
  299. // also responsible for maintaining message order during retries
  300. type partitionProducer struct {
  301. parent *asyncProducer
  302. topic string
  303. partition int32
  304. input <-chan *ProducerMessage
  305. leader *Broker
  306. breaker *breaker.Breaker
  307. output chan<- *ProducerMessage
  308. // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
  309. // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
  310. // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
  311. // therefore whether our buffer is complete and safe to flush)
  312. highWatermark int
  313. retryState []partitionRetryState
  314. }
  315. type partitionRetryState struct {
  316. buf []*ProducerMessage
  317. expectChaser bool
  318. }
  319. func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
  320. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  321. pp := &partitionProducer{
  322. parent: p,
  323. topic: topic,
  324. partition: partition,
  325. input: input,
  326. breaker: breaker.New(3, 1, 10*time.Second),
  327. retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
  328. }
  329. go withRecover(pp.dispatch)
  330. return input
  331. }
  332. func (pp *partitionProducer) dispatch() {
  333. // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
  334. // on the first message
  335. pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
  336. if pp.leader != nil {
  337. pp.output = pp.parent.getBrokerProducer(pp.leader)
  338. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  339. pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  340. }
  341. for msg := range pp.input {
  342. if msg.retries > pp.highWatermark {
  343. // a new, higher, retry level; handle it and then back off
  344. pp.newHighWatermark(msg.retries)
  345. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  346. } else if pp.highWatermark > 0 {
  347. // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
  348. if msg.retries < pp.highWatermark {
  349. // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
  350. if msg.flags&fin == fin {
  351. pp.retryState[msg.retries].expectChaser = false
  352. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  353. } else {
  354. pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
  355. }
  356. continue
  357. } else if msg.flags&fin == fin {
  358. // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
  359. // meaning this retry level is done and we can go down (at least) one level and flush that
  360. pp.retryState[pp.highWatermark].expectChaser = false
  361. pp.flushRetryBuffers()
  362. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  363. continue
  364. }
  365. }
  366. // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
  367. // without breaking any of our ordering guarantees
  368. if pp.output == nil {
  369. if err := pp.updateLeader(); err != nil {
  370. pp.parent.returnError(msg, err)
  371. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  372. continue
  373. }
  374. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  375. }
  376. pp.output <- msg
  377. }
  378. if pp.output != nil {
  379. pp.parent.unrefBrokerProducer(pp.leader, pp.output)
  380. }
  381. }
  382. func (pp *partitionProducer) newHighWatermark(hwm int) {
  383. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
  384. pp.highWatermark = hwm
  385. // send off a fin so that we know when everything "in between" has made it
  386. // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
  387. pp.retryState[pp.highWatermark].expectChaser = true
  388. pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
  389. pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
  390. // a new HWM means that our current broker selection is out of date
  391. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  392. pp.parent.unrefBrokerProducer(pp.leader, pp.output)
  393. pp.output = nil
  394. }
  395. func (pp *partitionProducer) flushRetryBuffers() {
  396. Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  397. for {
  398. pp.highWatermark--
  399. if pp.output == nil {
  400. if err := pp.updateLeader(); err != nil {
  401. pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
  402. goto flushDone
  403. }
  404. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  405. }
  406. for _, msg := range pp.retryState[pp.highWatermark].buf {
  407. pp.output <- msg
  408. }
  409. flushDone:
  410. pp.retryState[pp.highWatermark].buf = nil
  411. if pp.retryState[pp.highWatermark].expectChaser {
  412. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  413. break
  414. } else if pp.highWatermark == 0 {
  415. Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
  416. break
  417. }
  418. }
  419. }
  420. func (pp *partitionProducer) updateLeader() error {
  421. return pp.breaker.Run(func() (err error) {
  422. if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
  423. return err
  424. }
  425. if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
  426. return err
  427. }
  428. pp.output = pp.parent.getBrokerProducer(pp.leader)
  429. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  430. pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  431. return nil
  432. })
  433. }
  434. // one per broker; also constructs an associated flusher
  435. func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
  436. var (
  437. input = make(chan *ProducerMessage)
  438. bridge = make(chan *produceSet)
  439. responses = make(chan *brokerProducerResponse)
  440. )
  441. bp := &brokerProducer{
  442. parent: p,
  443. broker: broker,
  444. input: input,
  445. output: bridge,
  446. responses: responses,
  447. buffer: newProduceSet(p),
  448. currentRetries: make(map[string]map[int32]error),
  449. }
  450. go withRecover(bp.run)
  451. // minimal bridge to make the network response `select`able
  452. go withRecover(func() {
  453. for set := range bridge {
  454. request := set.buildRequest()
  455. response, err := broker.Produce(request)
  456. responses <- &brokerProducerResponse{
  457. set: set,
  458. err: err,
  459. res: response,
  460. }
  461. }
  462. close(responses)
  463. })
  464. return input
  465. }
  466. type brokerProducerResponse struct {
  467. set *produceSet
  468. err error
  469. res *ProduceResponse
  470. }
  471. // groups messages together into appropriately-sized batches for sending to the broker
  472. // handles state related to retries etc
  473. type brokerProducer struct {
  474. parent *asyncProducer
  475. broker *Broker
  476. input <-chan *ProducerMessage
  477. output chan<- *produceSet
  478. responses <-chan *brokerProducerResponse
  479. buffer *produceSet
  480. timer <-chan time.Time
  481. timerFired bool
  482. closing error
  483. currentRetries map[string]map[int32]error
  484. }
  485. func (bp *brokerProducer) run() {
  486. var output chan<- *produceSet
  487. Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
  488. for {
  489. select {
  490. case msg := <-bp.input:
  491. if msg == nil {
  492. bp.shutdown()
  493. return
  494. }
  495. if msg.flags&syn == syn {
  496. Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
  497. bp.broker.ID(), msg.Topic, msg.Partition)
  498. if bp.currentRetries[msg.Topic] == nil {
  499. bp.currentRetries[msg.Topic] = make(map[int32]error)
  500. }
  501. bp.currentRetries[msg.Topic][msg.Partition] = nil
  502. bp.parent.inFlight.Done()
  503. continue
  504. }
  505. if reason := bp.needsRetry(msg); reason != nil {
  506. bp.parent.retryMessage(msg, reason)
  507. if bp.closing == nil && msg.flags&fin == fin {
  508. // we were retrying this partition but we can start processing again
  509. delete(bp.currentRetries[msg.Topic], msg.Partition)
  510. Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
  511. bp.broker.ID(), msg.Topic, msg.Partition)
  512. }
  513. continue
  514. }
  515. if bp.buffer.wouldOverflow(msg) {
  516. if err := bp.waitForSpace(msg); err != nil {
  517. bp.parent.retryMessage(msg, err)
  518. continue
  519. }
  520. }
  521. if err := bp.buffer.add(msg); err != nil {
  522. bp.parent.returnError(msg, err)
  523. continue
  524. }
  525. if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
  526. bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
  527. }
  528. case <-bp.timer:
  529. bp.timerFired = true
  530. case output <- bp.buffer:
  531. bp.rollOver()
  532. case response := <-bp.responses:
  533. bp.handleResponse(response)
  534. }
  535. if bp.timerFired || bp.buffer.readyToFlush() {
  536. output = bp.output
  537. } else {
  538. output = nil
  539. }
  540. }
  541. }
  542. func (bp *brokerProducer) shutdown() {
  543. for !bp.buffer.empty() {
  544. select {
  545. case response := <-bp.responses:
  546. bp.handleResponse(response)
  547. case bp.output <- bp.buffer:
  548. bp.rollOver()
  549. }
  550. }
  551. close(bp.output)
  552. for response := range bp.responses {
  553. bp.handleResponse(response)
  554. }
  555. Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
  556. }
  557. func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
  558. if bp.closing != nil {
  559. return bp.closing
  560. }
  561. return bp.currentRetries[msg.Topic][msg.Partition]
  562. }
  563. func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
  564. Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
  565. for {
  566. select {
  567. case response := <-bp.responses:
  568. bp.handleResponse(response)
  569. // handling a response can change our state, so re-check some things
  570. if reason := bp.needsRetry(msg); reason != nil {
  571. return reason
  572. } else if !bp.buffer.wouldOverflow(msg) {
  573. return nil
  574. }
  575. case bp.output <- bp.buffer:
  576. bp.rollOver()
  577. return nil
  578. }
  579. }
  580. }
  581. func (bp *brokerProducer) rollOver() {
  582. bp.timer = nil
  583. bp.timerFired = false
  584. bp.buffer = newProduceSet(bp.parent)
  585. }
  586. func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
  587. if response.err != nil {
  588. bp.handleError(response.set, response.err)
  589. } else {
  590. bp.handleSuccess(response.set, response.res)
  591. }
  592. if bp.buffer.empty() {
  593. bp.rollOver() // this can happen if the response invalidated our buffer
  594. }
  595. }
  596. func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
  597. // we iterate through the blocks in the request set, not the response, so that we notice
  598. // if the response is missing a block completely
  599. sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
  600. if response == nil {
  601. // this only happens when RequiredAcks is NoResponse, so we have to assume success
  602. bp.parent.returnSuccesses(msgs)
  603. return
  604. }
  605. block := response.GetBlock(topic, partition)
  606. if block == nil {
  607. bp.parent.returnErrors(msgs, ErrIncompleteResponse)
  608. return
  609. }
  610. switch block.Err {
  611. // Success
  612. case ErrNoError:
  613. if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
  614. for _, msg := range msgs {
  615. msg.Timestamp = block.Timestamp
  616. }
  617. }
  618. for i, msg := range msgs {
  619. msg.Offset = block.Offset + int64(i)
  620. }
  621. bp.parent.returnSuccesses(msgs)
  622. // Retriable errors
  623. case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
  624. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  625. Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
  626. bp.broker.ID(), topic, partition, block.Err)
  627. bp.currentRetries[topic][partition] = block.Err
  628. bp.parent.retryMessages(msgs, block.Err)
  629. bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
  630. // Other non-retriable errors
  631. default:
  632. bp.parent.returnErrors(msgs, block.Err)
  633. }
  634. })
  635. }
  636. func (bp *brokerProducer) handleError(sent *produceSet, err error) {
  637. switch err.(type) {
  638. case PacketEncodingError:
  639. sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
  640. bp.parent.returnErrors(msgs, err)
  641. })
  642. default:
  643. Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
  644. bp.parent.abandonBrokerConnection(bp.broker)
  645. _ = bp.broker.Close()
  646. bp.closing = err
  647. sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
  648. bp.parent.retryMessages(msgs, err)
  649. })
  650. bp.buffer.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
  651. bp.parent.retryMessages(msgs, err)
  652. })
  653. bp.rollOver()
  654. }
  655. }
  656. // singleton
  657. // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
  658. // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
  659. func (p *asyncProducer) retryHandler() {
  660. var msg *ProducerMessage
  661. buf := queue.New()
  662. for {
  663. if buf.Length() == 0 {
  664. msg = <-p.retries
  665. } else {
  666. select {
  667. case msg = <-p.retries:
  668. case p.input <- buf.Peek().(*ProducerMessage):
  669. buf.Remove()
  670. continue
  671. }
  672. }
  673. if msg == nil {
  674. return
  675. }
  676. buf.Add(msg)
  677. }
  678. }
  679. // utility functions
  680. func (p *asyncProducer) shutdown() {
  681. Logger.Println("Producer shutting down.")
  682. p.inFlight.Add(1)
  683. p.input <- &ProducerMessage{flags: shutdown}
  684. p.inFlight.Wait()
  685. if p.ownClient {
  686. err := p.client.Close()
  687. if err != nil {
  688. Logger.Println("producer/shutdown failed to close the embedded client:", err)
  689. }
  690. }
  691. close(p.input)
  692. close(p.retries)
  693. close(p.errors)
  694. close(p.successes)
  695. }
  696. func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
  697. msg.clear()
  698. pErr := &ProducerError{Msg: msg, Err: err}
  699. if p.conf.Producer.Return.Errors {
  700. p.errors <- pErr
  701. } else {
  702. Logger.Println(pErr)
  703. }
  704. p.inFlight.Done()
  705. }
  706. func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
  707. for _, msg := range batch {
  708. p.returnError(msg, err)
  709. }
  710. }
  711. func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
  712. for _, msg := range batch {
  713. if p.conf.Producer.Return.Successes {
  714. msg.clear()
  715. p.successes <- msg
  716. }
  717. p.inFlight.Done()
  718. }
  719. }
  720. func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
  721. if msg.retries >= p.conf.Producer.Retry.Max {
  722. p.returnError(msg, err)
  723. } else {
  724. msg.retries++
  725. p.retries <- msg
  726. }
  727. }
  728. func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
  729. for _, msg := range batch {
  730. p.retryMessage(msg, err)
  731. }
  732. }
  733. func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage {
  734. p.brokerLock.Lock()
  735. defer p.brokerLock.Unlock()
  736. bp := p.brokers[broker]
  737. if bp == nil {
  738. bp = p.newBrokerProducer(broker)
  739. p.brokers[broker] = bp
  740. p.brokerRefs[bp] = 0
  741. }
  742. p.brokerRefs[bp]++
  743. return bp
  744. }
  745. func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) {
  746. p.brokerLock.Lock()
  747. defer p.brokerLock.Unlock()
  748. p.brokerRefs[bp]--
  749. if p.brokerRefs[bp] == 0 {
  750. close(bp)
  751. delete(p.brokerRefs, bp)
  752. if p.brokers[broker] == bp {
  753. delete(p.brokers, broker)
  754. }
  755. }
  756. }
  757. func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
  758. p.brokerLock.Lock()
  759. defer p.brokerLock.Unlock()
  760. delete(p.brokers, broker)
  761. }