async_producer.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881
  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/eapache/go-resiliency/breaker"
  7. "github.com/eapache/queue"
  8. )
  9. // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
  10. // to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
  11. // and parses responses for errors. You must read from the Errors() channel or the
  12. // producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
  13. // leaks: it will not be garbage-collected automatically when it passes out of
  14. // scope.
  15. type AsyncProducer interface {
  16. // AsyncClose triggers a shutdown of the producer, flushing any messages it may
  17. // have buffered. The shutdown has completed when both the Errors and Successes
  18. // channels have been closed. When calling AsyncClose, you *must* continue to
  19. // read from those channels in order to drain the results of any messages in
  20. // flight.
  21. AsyncClose()
  22. // Close shuts down the producer and flushes any messages it may have buffered.
  23. // You must call this function before a producer object passes out of scope, as
  24. // it may otherwise leak memory. You must call this before calling Close on the
  25. // underlying client.
  26. Close() error
  27. // Input is the input channel for the user to write messages to that they
  28. // wish to send.
  29. Input() chan<- *ProducerMessage
  30. // Successes is the success output channel back to the user when AckSuccesses is
  31. // enabled. If Return.Successes is true, you MUST read from this channel or the
  32. // Producer will deadlock. It is suggested that you send and read messages
  33. // together in a single select statement.
  34. Successes() <-chan *ProducerMessage
  35. // Errors is the error output channel back to the user. You MUST read from this
  36. // channel or the Producer will deadlock when the channel is full. Alternatively,
  37. // you can set Producer.Return.Errors in your config to false, which prevents
  38. // errors to be returned.
  39. Errors() <-chan *ProducerError
  40. }
  41. type asyncProducer struct {
  42. client Client
  43. conf *Config
  44. ownClient bool
  45. errors chan *ProducerError
  46. input, successes, retries chan *ProducerMessage
  47. inFlight sync.WaitGroup
  48. brokers map[*Broker]chan<- *ProducerMessage
  49. brokerRefs map[chan<- *ProducerMessage]int
  50. brokerLock sync.Mutex
  51. }
  52. // NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
  53. func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
  54. client, err := NewClient(addrs, conf)
  55. if err != nil {
  56. return nil, err
  57. }
  58. p, err := NewAsyncProducerFromClient(client)
  59. if err != nil {
  60. return nil, err
  61. }
  62. p.(*asyncProducer).ownClient = true
  63. return p, nil
  64. }
  65. // NewAsyncProducerFromClient creates a new Producer using the given client. It is still
  66. // necessary to call Close() on the underlying client when shutting down this producer.
  67. func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
  68. // Check that we are not dealing with a closed Client before processing any other arguments
  69. if client.Closed() {
  70. return nil, ErrClosedClient
  71. }
  72. p := &asyncProducer{
  73. client: client,
  74. conf: client.Config(),
  75. errors: make(chan *ProducerError),
  76. input: make(chan *ProducerMessage),
  77. successes: make(chan *ProducerMessage),
  78. retries: make(chan *ProducerMessage),
  79. brokers: make(map[*Broker]chan<- *ProducerMessage),
  80. brokerRefs: make(map[chan<- *ProducerMessage]int),
  81. }
  82. // launch our singleton dispatchers
  83. go withRecover(p.dispatcher)
  84. go withRecover(p.retryHandler)
  85. return p, nil
  86. }
  87. type flagSet int8
  88. const (
  89. chaser flagSet = 1 << iota // message is last in a group that failed
  90. shutdown // start the shutdown process
  91. )
  92. // ProducerMessage is the collection of elements passed to the Producer in order to send a message.
  93. type ProducerMessage struct {
  94. Topic string // The Kafka topic for this message.
  95. // The partitioning key for this message. Pre-existing Encoders include
  96. // StringEncoder and ByteEncoder.
  97. Key Encoder
  98. // The actual message to store in Kafka. Pre-existing Encoders include
  99. // StringEncoder and ByteEncoder.
  100. Value Encoder
  101. // This field is used to hold arbitrary data you wish to include so it
  102. // will be available when receiving on the Successes and Errors channels.
  103. // Sarama completely ignores this field and is only to be used for
  104. // pass-through data.
  105. Metadata interface{}
  106. // Below this point are filled in by the producer as the message is processed
  107. // Offset is the offset of the message stored on the broker. This is only
  108. // guaranteed to be defined if the message was successfully delivered and
  109. // RequiredAcks is not NoResponse.
  110. Offset int64
  111. // Partition is the partition that the message was sent to. This is only
  112. // guaranteed to be defined if the message was successfully delivered.
  113. Partition int32
  114. retries int
  115. flags flagSet
  116. }
  117. const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
  118. func (m *ProducerMessage) byteSize() int {
  119. size := producerMessageOverhead
  120. if m.Key != nil {
  121. size += m.Key.Length()
  122. }
  123. if m.Value != nil {
  124. size += m.Value.Length()
  125. }
  126. return size
  127. }
  128. func (m *ProducerMessage) clear() {
  129. m.flags = 0
  130. m.retries = 0
  131. }
  132. // ProducerError is the type of error generated when the producer fails to deliver a message.
  133. // It contains the original ProducerMessage as well as the actual error value.
  134. type ProducerError struct {
  135. Msg *ProducerMessage
  136. Err error
  137. }
  138. func (pe ProducerError) Error() string {
  139. return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
  140. }
  141. // ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
  142. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
  143. // when closing a producer.
  144. type ProducerErrors []*ProducerError
  145. func (pe ProducerErrors) Error() string {
  146. return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
  147. }
  148. func (p *asyncProducer) Errors() <-chan *ProducerError {
  149. return p.errors
  150. }
  151. func (p *asyncProducer) Successes() <-chan *ProducerMessage {
  152. return p.successes
  153. }
  154. func (p *asyncProducer) Input() chan<- *ProducerMessage {
  155. return p.input
  156. }
  157. func (p *asyncProducer) Close() error {
  158. p.AsyncClose()
  159. if p.conf.Producer.Return.Successes {
  160. go withRecover(func() {
  161. for _ = range p.successes {
  162. }
  163. })
  164. }
  165. var errors ProducerErrors
  166. if p.conf.Producer.Return.Errors {
  167. for event := range p.errors {
  168. errors = append(errors, event)
  169. }
  170. }
  171. if len(errors) > 0 {
  172. return errors
  173. }
  174. return nil
  175. }
  176. func (p *asyncProducer) AsyncClose() {
  177. go withRecover(p.shutdown)
  178. }
  179. // singleton
  180. // dispatches messages by topic
  181. func (p *asyncProducer) dispatcher() {
  182. handlers := make(map[string]chan<- *ProducerMessage)
  183. shuttingDown := false
  184. for msg := range p.input {
  185. if msg == nil {
  186. Logger.Println("Something tried to send a nil message, it was ignored.")
  187. continue
  188. }
  189. if msg.flags&shutdown != 0 {
  190. shuttingDown = true
  191. p.inFlight.Done()
  192. continue
  193. } else if msg.retries == 0 {
  194. if shuttingDown {
  195. // we can't just call returnError here because that decrements the wait group,
  196. // which hasn't been incremented yet for this message, and shouldn't be
  197. pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
  198. if p.conf.Producer.Return.Errors {
  199. p.errors <- pErr
  200. } else {
  201. Logger.Println(pErr)
  202. }
  203. continue
  204. }
  205. p.inFlight.Add(1)
  206. }
  207. if msg.byteSize() > p.conf.Producer.MaxMessageBytes {
  208. p.returnError(msg, ErrMessageSizeTooLarge)
  209. continue
  210. }
  211. handler := handlers[msg.Topic]
  212. if handler == nil {
  213. handler = p.newTopicProducer(msg.Topic)
  214. handlers[msg.Topic] = handler
  215. }
  216. handler <- msg
  217. }
  218. for _, handler := range handlers {
  219. close(handler)
  220. }
  221. }
  222. // one per topic
  223. // partitions messages, then dispatches them by partition
  224. type topicProducer struct {
  225. parent *asyncProducer
  226. topic string
  227. input <-chan *ProducerMessage
  228. breaker *breaker.Breaker
  229. handlers map[int32]chan<- *ProducerMessage
  230. partitioner Partitioner
  231. }
  232. func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
  233. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  234. tp := &topicProducer{
  235. parent: p,
  236. topic: topic,
  237. input: input,
  238. breaker: breaker.New(3, 1, 10*time.Second),
  239. handlers: make(map[int32]chan<- *ProducerMessage),
  240. partitioner: p.conf.Producer.Partitioner(topic),
  241. }
  242. go withRecover(tp.dispatch)
  243. return input
  244. }
  245. func (tp *topicProducer) dispatch() {
  246. for msg := range tp.input {
  247. if msg.retries == 0 {
  248. if err := tp.partitionMessage(msg); err != nil {
  249. tp.parent.returnError(msg, err)
  250. continue
  251. }
  252. }
  253. handler := tp.handlers[msg.Partition]
  254. if handler == nil {
  255. handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
  256. tp.handlers[msg.Partition] = handler
  257. }
  258. handler <- msg
  259. }
  260. for _, handler := range tp.handlers {
  261. close(handler)
  262. }
  263. }
  264. func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
  265. var partitions []int32
  266. err := tp.breaker.Run(func() (err error) {
  267. if tp.partitioner.RequiresConsistency() {
  268. partitions, err = tp.parent.client.Partitions(msg.Topic)
  269. } else {
  270. partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
  271. }
  272. return
  273. })
  274. if err != nil {
  275. return err
  276. }
  277. numPartitions := int32(len(partitions))
  278. if numPartitions == 0 {
  279. return ErrLeaderNotAvailable
  280. }
  281. choice, err := tp.partitioner.Partition(msg, numPartitions)
  282. if err != nil {
  283. return err
  284. } else if choice < 0 || choice >= numPartitions {
  285. return ErrInvalidPartition
  286. }
  287. msg.Partition = partitions[choice]
  288. return nil
  289. }
  290. // one per partition per topic
  291. // dispatches messages to the appropriate broker
  292. // also responsible for maintaining message order during retries
  293. type partitionProducer struct {
  294. parent *asyncProducer
  295. topic string
  296. partition int32
  297. input <-chan *ProducerMessage
  298. leader *Broker
  299. breaker *breaker.Breaker
  300. output chan<- *ProducerMessage
  301. // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
  302. // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
  303. // retryState[msg.retries].expectChaser simply tracks whether we've seen a chaser message for a given level (and
  304. // therefore whether our buffer is complete and safe to flush)
  305. highWatermark int
  306. retryState []partitionRetryState
  307. }
  308. type partitionRetryState struct {
  309. buf []*ProducerMessage
  310. expectChaser bool
  311. }
  312. func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
  313. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  314. pp := &partitionProducer{
  315. parent: p,
  316. topic: topic,
  317. partition: partition,
  318. input: input,
  319. breaker: breaker.New(3, 1, 10*time.Second),
  320. retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
  321. }
  322. go withRecover(pp.dispatch)
  323. return input
  324. }
  325. func (pp *partitionProducer) dispatch() {
  326. // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
  327. // on the first message
  328. pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
  329. if pp.leader != nil {
  330. pp.output = pp.parent.getBrokerProducer(pp.leader)
  331. }
  332. for msg := range pp.input {
  333. if msg.retries > pp.highWatermark {
  334. // a new, higher, retry level; handle it and then back off
  335. pp.newHighWatermark(msg.retries)
  336. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  337. } else if pp.highWatermark > 0 {
  338. // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
  339. if msg.retries < pp.highWatermark {
  340. // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser)
  341. if msg.flags&chaser == chaser {
  342. pp.retryState[msg.retries].expectChaser = false
  343. pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected
  344. } else {
  345. pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
  346. }
  347. continue
  348. } else if msg.flags&chaser == chaser {
  349. // this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set,
  350. // meaning this retry level is done and we can go down (at least) one level and flush that
  351. pp.retryState[pp.highWatermark].expectChaser = false
  352. pp.flushRetryBuffers()
  353. pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected
  354. continue
  355. }
  356. }
  357. // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
  358. // without breaking any of our ordering guarantees
  359. if pp.output == nil {
  360. if err := pp.updateLeader(); err != nil {
  361. pp.parent.returnError(msg, err)
  362. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  363. continue
  364. }
  365. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  366. }
  367. pp.output <- msg
  368. }
  369. if pp.output != nil {
  370. pp.parent.unrefBrokerProducer(pp.leader, pp.output)
  371. }
  372. }
  373. func (pp *partitionProducer) newHighWatermark(hwm int) {
  374. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
  375. pp.highWatermark = hwm
  376. // send off a chaser so that we know when everything "in between" has made it
  377. // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
  378. pp.retryState[pp.highWatermark].expectChaser = true
  379. pp.parent.inFlight.Add(1) // we're generating a chaser message; track it so we don't shut down while it's still inflight
  380. pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: chaser, retries: pp.highWatermark - 1}
  381. // a new HWM means that our current broker selection is out of date
  382. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  383. pp.parent.unrefBrokerProducer(pp.leader, pp.output)
  384. pp.output = nil
  385. }
  386. func (pp *partitionProducer) flushRetryBuffers() {
  387. Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  388. for {
  389. pp.highWatermark--
  390. if pp.output == nil {
  391. if err := pp.updateLeader(); err != nil {
  392. pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
  393. goto flushDone
  394. }
  395. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  396. }
  397. for _, msg := range pp.retryState[pp.highWatermark].buf {
  398. pp.output <- msg
  399. }
  400. flushDone:
  401. pp.retryState[pp.highWatermark].buf = nil
  402. if pp.retryState[pp.highWatermark].expectChaser {
  403. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  404. break
  405. } else if pp.highWatermark == 0 {
  406. Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
  407. break
  408. }
  409. }
  410. }
  411. func (pp *partitionProducer) updateLeader() error {
  412. return pp.breaker.Run(func() (err error) {
  413. if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
  414. return err
  415. }
  416. if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
  417. return err
  418. }
  419. pp.output = pp.parent.getBrokerProducer(pp.leader)
  420. return nil
  421. })
  422. }
  423. // one per broker; also constructs an associated flusher
  424. func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
  425. var (
  426. input = make(chan *ProducerMessage)
  427. bridge = make(chan *produceSet)
  428. responses = make(chan *brokerProducerResponse)
  429. )
  430. bp := &brokerProducer{
  431. parent: p,
  432. broker: broker,
  433. input: input,
  434. output: bridge,
  435. responses: responses,
  436. buffer: newProduceSet(p),
  437. currentRetries: make(map[string]map[int32]error),
  438. }
  439. go withRecover(bp.run)
  440. // minimal bridge to make the network response `select`able
  441. go withRecover(func() {
  442. for set := range bridge {
  443. request := set.buildRequest()
  444. response, err := broker.Produce(request)
  445. responses <- &brokerProducerResponse{
  446. set: set,
  447. err: err,
  448. res: response,
  449. }
  450. }
  451. close(responses)
  452. })
  453. return input
  454. }
  455. type brokerProducerResponse struct {
  456. set *produceSet
  457. err error
  458. res *ProduceResponse
  459. }
  460. // groups messages together into appropriately-sized batches for sending to the broker
  461. // handles state related to retries etc
  462. type brokerProducer struct {
  463. parent *asyncProducer
  464. broker *Broker
  465. input <-chan *ProducerMessage
  466. output chan<- *produceSet
  467. responses <-chan *brokerProducerResponse
  468. buffer *produceSet
  469. timer <-chan time.Time
  470. timerFired bool
  471. closing error
  472. currentRetries map[string]map[int32]error
  473. }
  474. func (bp *brokerProducer) run() {
  475. var output chan<- *produceSet
  476. Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
  477. for {
  478. select {
  479. case msg := <-bp.input:
  480. if msg == nil {
  481. goto shutdown
  482. }
  483. if reason := bp.needsRetry(msg); reason != nil {
  484. bp.parent.retryMessage(msg, reason)
  485. if bp.closing == nil && msg.flags&chaser == chaser {
  486. // we were retrying this partition but we can start processing again
  487. delete(bp.currentRetries[msg.Topic], msg.Partition)
  488. Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n",
  489. bp.broker.ID(), msg.Topic, msg.Partition)
  490. }
  491. continue
  492. }
  493. if bp.buffer.wouldOverflow(msg) {
  494. if err := bp.waitForSpace(msg); err != nil {
  495. bp.parent.retryMessage(msg, err)
  496. continue
  497. }
  498. }
  499. if err := bp.buffer.add(msg); err != nil {
  500. bp.parent.returnError(msg, err)
  501. continue
  502. }
  503. if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
  504. bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
  505. }
  506. case <-bp.timer:
  507. bp.timerFired = true
  508. case output <- bp.buffer:
  509. bp.rollOver()
  510. case response := <-bp.responses:
  511. bp.handleResponse(response)
  512. }
  513. if bp.timerFired || bp.buffer.readyToFlush() {
  514. output = bp.output
  515. } else {
  516. output = nil
  517. }
  518. }
  519. shutdown:
  520. for !bp.buffer.empty() {
  521. select {
  522. case response := <-bp.responses:
  523. bp.handleResponse(response)
  524. case bp.output <- bp.buffer:
  525. bp.rollOver()
  526. }
  527. }
  528. close(bp.output)
  529. for response := range bp.responses {
  530. bp.handleResponse(response)
  531. }
  532. Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
  533. }
  534. func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
  535. if bp.closing != nil {
  536. return bp.closing
  537. }
  538. if bp.currentRetries[msg.Topic] == nil {
  539. return nil
  540. }
  541. return bp.currentRetries[msg.Topic][msg.Partition]
  542. }
  543. func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
  544. Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
  545. for {
  546. select {
  547. case response := <-bp.responses:
  548. bp.handleResponse(response)
  549. // handling a response can change our state, so re-check some things
  550. if reason := bp.needsRetry(msg); reason != nil {
  551. return reason
  552. } else if !bp.buffer.wouldOverflow(msg) {
  553. return nil
  554. }
  555. case bp.output <- bp.buffer:
  556. bp.rollOver()
  557. return nil
  558. }
  559. }
  560. }
  561. func (bp *brokerProducer) rollOver() {
  562. bp.timer = nil
  563. bp.timerFired = false
  564. bp.buffer = newProduceSet(bp.parent)
  565. }
  566. func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
  567. if response.err != nil {
  568. bp.handleError(response.set, response.err)
  569. } else {
  570. bp.handleSuccess(response.set, response.res)
  571. }
  572. if bp.buffer.empty() {
  573. bp.rollOver() // this can happen if the response invalidated our buffer
  574. }
  575. }
  576. func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
  577. // we iterate through the blocks in the request set, not the response, so that we notice
  578. // if the response is missing a block completely
  579. sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
  580. if response == nil {
  581. // this only happens when RequiredAcks is NoResponse, so we have to assume success
  582. bp.parent.returnSuccesses(msgs)
  583. return
  584. }
  585. block := response.GetBlock(topic, partition)
  586. if block == nil {
  587. bp.parent.returnErrors(msgs, ErrIncompleteResponse)
  588. return
  589. }
  590. switch block.Err {
  591. // Success
  592. case ErrNoError:
  593. for i, msg := range msgs {
  594. msg.Offset = block.Offset + int64(i)
  595. }
  596. bp.parent.returnSuccesses(msgs)
  597. // Retriable errors
  598. case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
  599. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  600. Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
  601. bp.broker.ID(), topic, partition, block.Err)
  602. if bp.currentRetries[topic] == nil {
  603. bp.currentRetries[topic] = make(map[int32]error)
  604. }
  605. bp.currentRetries[topic][partition] = block.Err
  606. bp.parent.retryMessages(msgs, block.Err)
  607. bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
  608. // Other non-retriable errors
  609. default:
  610. bp.parent.returnErrors(msgs, block.Err)
  611. }
  612. })
  613. }
  614. func (bp *brokerProducer) handleError(sent *produceSet, err error) {
  615. switch err.(type) {
  616. case PacketEncodingError:
  617. sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
  618. bp.parent.returnErrors(msgs, err)
  619. })
  620. default:
  621. Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
  622. bp.parent.abandonBrokerConnection(bp.broker)
  623. _ = bp.broker.Close()
  624. bp.closing = err
  625. sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
  626. bp.parent.retryMessages(msgs, err)
  627. })
  628. bp.buffer.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
  629. bp.parent.retryMessages(msgs, err)
  630. })
  631. bp.rollOver()
  632. }
  633. }
  634. // singleton
  635. // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
  636. // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
  637. func (p *asyncProducer) retryHandler() {
  638. var msg *ProducerMessage
  639. buf := queue.New()
  640. for {
  641. if buf.Length() == 0 {
  642. msg = <-p.retries
  643. } else {
  644. select {
  645. case msg = <-p.retries:
  646. case p.input <- buf.Peek().(*ProducerMessage):
  647. buf.Remove()
  648. continue
  649. }
  650. }
  651. if msg == nil {
  652. return
  653. }
  654. buf.Add(msg)
  655. }
  656. }
  657. // utility functions
  658. func (p *asyncProducer) shutdown() {
  659. Logger.Println("Producer shutting down.")
  660. p.inFlight.Add(1)
  661. p.input <- &ProducerMessage{flags: shutdown}
  662. p.inFlight.Wait()
  663. if p.ownClient {
  664. err := p.client.Close()
  665. if err != nil {
  666. Logger.Println("producer/shutdown failed to close the embedded client:", err)
  667. }
  668. }
  669. close(p.input)
  670. close(p.retries)
  671. close(p.errors)
  672. close(p.successes)
  673. }
  674. func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
  675. msg.clear()
  676. pErr := &ProducerError{Msg: msg, Err: err}
  677. if p.conf.Producer.Return.Errors {
  678. p.errors <- pErr
  679. } else {
  680. Logger.Println(pErr)
  681. }
  682. p.inFlight.Done()
  683. }
  684. func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
  685. for _, msg := range batch {
  686. p.returnError(msg, err)
  687. }
  688. }
  689. func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
  690. for _, msg := range batch {
  691. if p.conf.Producer.Return.Successes {
  692. msg.clear()
  693. p.successes <- msg
  694. }
  695. p.inFlight.Done()
  696. }
  697. }
  698. func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
  699. if msg.retries >= p.conf.Producer.Retry.Max {
  700. p.returnError(msg, err)
  701. } else {
  702. msg.retries++
  703. p.retries <- msg
  704. }
  705. }
  706. func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
  707. for _, msg := range batch {
  708. p.retryMessage(msg, err)
  709. }
  710. }
  711. func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage {
  712. p.brokerLock.Lock()
  713. defer p.brokerLock.Unlock()
  714. bp := p.brokers[broker]
  715. if bp == nil {
  716. bp = p.newBrokerProducer(broker)
  717. p.brokers[broker] = bp
  718. p.brokerRefs[bp] = 0
  719. }
  720. p.brokerRefs[bp]++
  721. return bp
  722. }
  723. func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) {
  724. p.brokerLock.Lock()
  725. defer p.brokerLock.Unlock()
  726. p.brokerRefs[bp]--
  727. if p.brokerRefs[bp] == 0 {
  728. close(bp)
  729. delete(p.brokerRefs, bp)
  730. if p.brokers[broker] == bp {
  731. delete(p.brokers, broker)
  732. }
  733. }
  734. }
  735. func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
  736. p.brokerLock.Lock()
  737. defer p.brokerLock.Unlock()
  738. delete(p.brokers, broker)
  739. }