async_producer.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924
  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/eapache/go-resiliency/breaker"
  7. "github.com/eapache/queue"
  8. )
  9. // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
  10. // to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
  11. // and parses responses for errors. You must read from the Errors() channel or the
  12. // producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
  13. // leaks: it will not be garbage-collected automatically when it passes out of
  14. // scope.
  15. type AsyncProducer interface {
  16. // AsyncClose triggers a shutdown of the producer, flushing any messages it may have
  17. // buffered. The shutdown has completed when both the Errors and Successes channels
  18. // have been closed. When calling AsyncClose, you *must* continue to read from those
  19. // channels in order to drain the results of any messages in flight.
  20. AsyncClose()
  21. // Close shuts down the producer and flushes any messages it may have buffered.
  22. // You must call this function before a producer object passes out of scope, as
  23. // it may otherwise leak memory. You must call this before calling Close on the
  24. // underlying client.
  25. Close() error
  26. // Input is the input channel for the user to write messages to that they wish to send.
  27. Input() chan<- *ProducerMessage
  28. // Successes is the success output channel back to the user when AckSuccesses is enabled.
  29. // If Return.Successes is true, you MUST read from this channel or the Producer will deadlock.
  30. // It is suggested that you send and read messages together in a single select statement.
  31. Successes() <-chan *ProducerMessage
  32. // Errors is the error output channel back to the user. You MUST read from this channel
  33. // or the Producer will deadlock when the channel is full. Alternatively, you can set
  34. // Producer.Return.Errors in your config to false, which prevents errors to be returned.
  35. Errors() <-chan *ProducerError
  36. }
  37. type asyncProducer struct {
  38. client Client
  39. conf *Config
  40. ownClient bool
  41. errors chan *ProducerError
  42. input, successes, retries chan *ProducerMessage
  43. inFlight sync.WaitGroup
  44. brokers map[*Broker]chan<- *ProducerMessage
  45. brokerRefs map[chan<- *ProducerMessage]int
  46. brokerLock sync.Mutex
  47. }
  48. // NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
  49. func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
  50. client, err := NewClient(addrs, conf)
  51. if err != nil {
  52. return nil, err
  53. }
  54. p, err := NewAsyncProducerFromClient(client)
  55. if err != nil {
  56. return nil, err
  57. }
  58. p.(*asyncProducer).ownClient = true
  59. return p, nil
  60. }
  61. // NewAsyncProducerFromClient creates a new Producer using the given client. It is still
  62. // necessary to call Close() on the underlying client when shutting down this producer.
  63. func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
  64. // Check that we are not dealing with a closed Client before processing any other arguments
  65. if client.Closed() {
  66. return nil, ErrClosedClient
  67. }
  68. p := &asyncProducer{
  69. client: client,
  70. conf: client.Config(),
  71. errors: make(chan *ProducerError),
  72. input: make(chan *ProducerMessage),
  73. successes: make(chan *ProducerMessage),
  74. retries: make(chan *ProducerMessage),
  75. brokers: make(map[*Broker]chan<- *ProducerMessage),
  76. brokerRefs: make(map[chan<- *ProducerMessage]int),
  77. }
  78. // launch our singleton dispatchers
  79. go withRecover(p.dispatcher)
  80. go withRecover(p.retryHandler)
  81. return p, nil
  82. }
  83. type flagSet int8
  84. const (
  85. chaser flagSet = 1 << iota // message is last in a group that failed
  86. shutdown // start the shutdown process
  87. )
  88. // ProducerMessage is the collection of elements passed to the Producer in order to send a message.
  89. type ProducerMessage struct {
  90. Topic string // The Kafka topic for this message.
  91. Key Encoder // The partitioning key for this message. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
  92. Value Encoder // The actual message to store in Kafka. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
  93. // These are filled in by the producer as the message is processed
  94. Offset int64 // Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if the message was successfully delivered and RequiredAcks is not NoResponse.
  95. Partition int32 // Partition is the partition that the message was sent to. This is only guaranteed to be defined if the message was successfully delivered.
  96. Metadata interface{} // This field is used to hold arbitrary data you wish to include so it will be available when receiving on the Successes and Errors channels. Sarama completely ignores this field and is only to be used for pass-through data.
  97. retries int
  98. flags flagSet
  99. }
  100. func (m *ProducerMessage) byteSize() int {
  101. size := 26 // the metadata overhead of CRC, flags, etc.
  102. if m.Key != nil {
  103. size += m.Key.Length()
  104. }
  105. if m.Value != nil {
  106. size += m.Value.Length()
  107. }
  108. return size
  109. }
  110. func (m *ProducerMessage) clear() {
  111. m.flags = 0
  112. m.retries = 0
  113. }
  114. // ProducerError is the type of error generated when the producer fails to deliver a message.
  115. // It contains the original ProducerMessage as well as the actual error value.
  116. type ProducerError struct {
  117. Msg *ProducerMessage
  118. Err error
  119. }
  120. func (pe ProducerError) Error() string {
  121. return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
  122. }
  123. // ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
  124. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
  125. // when closing a producer.
  126. type ProducerErrors []*ProducerError
  127. func (pe ProducerErrors) Error() string {
  128. return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
  129. }
  130. func (p *asyncProducer) Errors() <-chan *ProducerError {
  131. return p.errors
  132. }
  133. func (p *asyncProducer) Successes() <-chan *ProducerMessage {
  134. return p.successes
  135. }
  136. func (p *asyncProducer) Input() chan<- *ProducerMessage {
  137. return p.input
  138. }
  139. func (p *asyncProducer) Close() error {
  140. p.AsyncClose()
  141. if p.conf.Producer.Return.Successes {
  142. go withRecover(func() {
  143. for _ = range p.successes {
  144. }
  145. })
  146. }
  147. var errors ProducerErrors
  148. if p.conf.Producer.Return.Errors {
  149. for event := range p.errors {
  150. errors = append(errors, event)
  151. }
  152. }
  153. if len(errors) > 0 {
  154. return errors
  155. }
  156. return nil
  157. }
  158. func (p *asyncProducer) AsyncClose() {
  159. go withRecover(p.shutdown)
  160. }
  161. // singleton
  162. // dispatches messages by topic
  163. func (p *asyncProducer) dispatcher() {
  164. handlers := make(map[string]chan<- *ProducerMessage)
  165. shuttingDown := false
  166. for msg := range p.input {
  167. if msg == nil {
  168. Logger.Println("Something tried to send a nil message, it was ignored.")
  169. continue
  170. }
  171. if msg.flags&shutdown != 0 {
  172. shuttingDown = true
  173. p.inFlight.Done()
  174. continue
  175. } else if msg.retries == 0 {
  176. if shuttingDown {
  177. // we can't just call returnError here because that decrements the wait group,
  178. // which hasn't been incremented yet for this message, and shouldn't be
  179. pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
  180. if p.conf.Producer.Return.Errors {
  181. p.errors <- pErr
  182. } else {
  183. Logger.Println(pErr)
  184. }
  185. continue
  186. }
  187. p.inFlight.Add(1)
  188. }
  189. if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||
  190. (msg.byteSize() > p.conf.Producer.MaxMessageBytes) {
  191. p.returnError(msg, ErrMessageSizeTooLarge)
  192. continue
  193. }
  194. handler := handlers[msg.Topic]
  195. if handler == nil {
  196. handler = p.newTopicProducer(msg.Topic)
  197. handlers[msg.Topic] = handler
  198. }
  199. handler <- msg
  200. }
  201. for _, handler := range handlers {
  202. close(handler)
  203. }
  204. }
  205. // one per topic
  206. // partitions messages, then dispatches them by partition
  207. type topicProducer struct {
  208. parent *asyncProducer
  209. topic string
  210. input <-chan *ProducerMessage
  211. breaker *breaker.Breaker
  212. handlers map[int32]chan<- *ProducerMessage
  213. partitioner Partitioner
  214. }
  215. func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
  216. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  217. tp := &topicProducer{
  218. parent: p,
  219. topic: topic,
  220. input: input,
  221. breaker: breaker.New(3, 1, 10*time.Second),
  222. handlers: make(map[int32]chan<- *ProducerMessage),
  223. partitioner: p.conf.Producer.Partitioner(topic),
  224. }
  225. go withRecover(tp.dispatch)
  226. return input
  227. }
  228. func (tp *topicProducer) dispatch() {
  229. for msg := range tp.input {
  230. if msg.retries == 0 {
  231. if err := tp.partitionMessage(msg); err != nil {
  232. tp.parent.returnError(msg, err)
  233. continue
  234. }
  235. }
  236. handler := tp.handlers[msg.Partition]
  237. if handler == nil {
  238. handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
  239. tp.handlers[msg.Partition] = handler
  240. }
  241. handler <- msg
  242. }
  243. for _, handler := range tp.handlers {
  244. close(handler)
  245. }
  246. }
  247. func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
  248. var partitions []int32
  249. err := tp.breaker.Run(func() (err error) {
  250. if tp.partitioner.RequiresConsistency() {
  251. partitions, err = tp.parent.client.Partitions(msg.Topic)
  252. } else {
  253. partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
  254. }
  255. return
  256. })
  257. if err != nil {
  258. return err
  259. }
  260. numPartitions := int32(len(partitions))
  261. if numPartitions == 0 {
  262. return ErrLeaderNotAvailable
  263. }
  264. choice, err := tp.partitioner.Partition(msg, numPartitions)
  265. if err != nil {
  266. return err
  267. } else if choice < 0 || choice >= numPartitions {
  268. return ErrInvalidPartition
  269. }
  270. msg.Partition = partitions[choice]
  271. return nil
  272. }
  273. // one per partition per topic
  274. // dispatches messages to the appropriate broker
  275. // also responsible for maintaining message order during retries
  276. type partitionProducer struct {
  277. parent *asyncProducer
  278. topic string
  279. partition int32
  280. input <-chan *ProducerMessage
  281. leader *Broker
  282. breaker *breaker.Breaker
  283. output chan<- *ProducerMessage
  284. // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
  285. // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
  286. // retryState[msg.retries].expectChaser simply tracks whether we've seen a chaser message for a given level (and
  287. // therefore whether our buffer is complete and safe to flush)
  288. highWatermark int
  289. retryState []partitionRetryState
  290. }
  291. type partitionRetryState struct {
  292. buf []*ProducerMessage
  293. expectChaser bool
  294. }
  295. func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
  296. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  297. pp := &partitionProducer{
  298. parent: p,
  299. topic: topic,
  300. partition: partition,
  301. input: input,
  302. breaker: breaker.New(3, 1, 10*time.Second),
  303. retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
  304. }
  305. go withRecover(pp.dispatch)
  306. return input
  307. }
  308. func (pp *partitionProducer) dispatch() {
  309. // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
  310. // on the first message
  311. pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
  312. if pp.leader != nil {
  313. pp.output = pp.parent.getBrokerProducer(pp.leader)
  314. }
  315. for msg := range pp.input {
  316. if msg.retries > pp.highWatermark {
  317. // a new, higher, retry level; handle it and then back off
  318. pp.newHighWatermark(msg.retries)
  319. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  320. } else if pp.highWatermark > 0 {
  321. // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
  322. if msg.retries < pp.highWatermark {
  323. // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser)
  324. if msg.flags&chaser == chaser {
  325. pp.retryState[msg.retries].expectChaser = false
  326. pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected
  327. } else {
  328. pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
  329. }
  330. continue
  331. } else if msg.flags&chaser == chaser {
  332. // this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set,
  333. // meaning this retry level is done and we can go down (at least) one level and flush that
  334. pp.retryState[pp.highWatermark].expectChaser = false
  335. pp.flushRetryBuffers()
  336. pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected
  337. continue
  338. }
  339. }
  340. // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
  341. // without breaking any of our ordering guarantees
  342. if pp.output == nil {
  343. if err := pp.updateLeader(); err != nil {
  344. pp.parent.returnError(msg, err)
  345. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  346. continue
  347. }
  348. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  349. }
  350. pp.output <- msg
  351. }
  352. if pp.output != nil {
  353. pp.parent.unrefBrokerProducer(pp.leader, pp.output)
  354. }
  355. }
  356. func (pp *partitionProducer) newHighWatermark(hwm int) {
  357. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
  358. pp.highWatermark = hwm
  359. // send off a chaser so that we know when everything "in between" has made it
  360. // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
  361. pp.retryState[pp.highWatermark].expectChaser = true
  362. pp.parent.inFlight.Add(1) // we're generating a chaser message; track it so we don't shut down while it's still inflight
  363. pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: chaser, retries: pp.highWatermark - 1}
  364. // a new HWM means that our current broker selection is out of date
  365. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  366. pp.parent.unrefBrokerProducer(pp.leader, pp.output)
  367. pp.output = nil
  368. }
  369. func (pp *partitionProducer) flushRetryBuffers() {
  370. Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  371. for {
  372. pp.highWatermark--
  373. if pp.output == nil {
  374. if err := pp.updateLeader(); err != nil {
  375. pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
  376. goto flushDone
  377. }
  378. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  379. }
  380. for _, msg := range pp.retryState[pp.highWatermark].buf {
  381. pp.output <- msg
  382. }
  383. flushDone:
  384. pp.retryState[pp.highWatermark].buf = nil
  385. if pp.retryState[pp.highWatermark].expectChaser {
  386. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  387. break
  388. } else if pp.highWatermark == 0 {
  389. Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
  390. break
  391. }
  392. }
  393. }
  394. func (pp *partitionProducer) updateLeader() error {
  395. return pp.breaker.Run(func() (err error) {
  396. if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
  397. return err
  398. }
  399. if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
  400. return err
  401. }
  402. pp.output = pp.parent.getBrokerProducer(pp.leader)
  403. return nil
  404. })
  405. }
  406. // one per broker, constructs both an aggregator and a flusher
  407. func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
  408. input := make(chan *ProducerMessage)
  409. bridge := make(chan []*ProducerMessage)
  410. a := &aggregator{
  411. parent: p,
  412. broker: broker,
  413. input: input,
  414. output: bridge,
  415. }
  416. go withRecover(a.run)
  417. f := &flusher{
  418. parent: p,
  419. broker: broker,
  420. input: bridge,
  421. currentRetries: make(map[string]map[int32]error),
  422. }
  423. go withRecover(f.run)
  424. return input
  425. }
  426. // groups messages together into appropriately-sized batches for sending to the broker
  427. // based on https://godoc.org/github.com/eapache/channels#BatchingChannel
  428. type aggregator struct {
  429. parent *asyncProducer
  430. broker *Broker
  431. input <-chan *ProducerMessage
  432. output chan<- []*ProducerMessage
  433. buffer []*ProducerMessage
  434. bufferBytes int
  435. timer <-chan time.Time
  436. }
  437. func (a *aggregator) run() {
  438. var output chan<- []*ProducerMessage
  439. for {
  440. select {
  441. case msg := <-a.input:
  442. if msg == nil {
  443. goto shutdown
  444. }
  445. if a.wouldOverflow(msg) {
  446. Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID())
  447. a.output <- a.buffer
  448. a.reset()
  449. output = nil
  450. }
  451. a.buffer = append(a.buffer, msg)
  452. a.bufferBytes += msg.byteSize()
  453. if a.readyToFlush(msg) {
  454. output = a.output
  455. } else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil {
  456. a.timer = time.After(a.parent.conf.Producer.Flush.Frequency)
  457. }
  458. case <-a.timer:
  459. output = a.output
  460. case output <- a.buffer:
  461. a.reset()
  462. output = nil
  463. }
  464. }
  465. shutdown:
  466. if len(a.buffer) > 0 {
  467. a.output <- a.buffer
  468. }
  469. close(a.output)
  470. }
  471. func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool {
  472. switch {
  473. // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
  474. case a.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)):
  475. return true
  476. // Would we overflow the size-limit of a compressed message-batch?
  477. case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes:
  478. return true
  479. // Would we overflow simply in number of messages?
  480. case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages:
  481. return true
  482. default:
  483. return false
  484. }
  485. }
  486. func (a *aggregator) readyToFlush(msg *ProducerMessage) bool {
  487. switch {
  488. // If all three config values are 0, we always flush as-fast-as-possible
  489. case a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0:
  490. return true
  491. // If the messages is a chaser we must flush to maintain the state-machine
  492. case msg.flags&chaser == chaser:
  493. return true
  494. // If we've passed the message trigger-point
  495. case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages:
  496. return true
  497. // If we've passed the byte trigger-point
  498. case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes:
  499. return true
  500. default:
  501. return false
  502. }
  503. }
  504. func (a *aggregator) reset() {
  505. a.timer = nil
  506. a.buffer = nil
  507. a.bufferBytes = 0
  508. }
  509. // takes a batch at a time from the aggregator and sends to the broker
  510. type flusher struct {
  511. parent *asyncProducer
  512. broker *Broker
  513. input <-chan []*ProducerMessage
  514. currentRetries map[string]map[int32]error
  515. }
  516. func (f *flusher) run() {
  517. var closing error
  518. Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID())
  519. for batch := range f.input {
  520. if closing != nil {
  521. f.parent.retryMessages(batch, closing)
  522. continue
  523. }
  524. msgSets := f.groupAndFilter(batch)
  525. request := f.parent.buildRequest(msgSets)
  526. if request == nil {
  527. continue
  528. }
  529. response, err := f.broker.Produce(request)
  530. switch err.(type) {
  531. case nil:
  532. break
  533. case PacketEncodingError:
  534. f.parent.returnErrors(batch, err)
  535. continue
  536. default:
  537. Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err)
  538. f.parent.abandonBrokerConnection(f.broker)
  539. _ = f.broker.Close()
  540. closing = err
  541. f.parent.retryMessages(batch, err)
  542. continue
  543. }
  544. if response == nil {
  545. // this only happens when RequiredAcks is NoResponse, so we have to assume success
  546. f.parent.returnSuccesses(batch)
  547. continue
  548. }
  549. f.parseResponse(msgSets, response)
  550. }
  551. Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
  552. }
  553. func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage {
  554. msgSets := make(map[string]map[int32][]*ProducerMessage)
  555. for i, msg := range batch {
  556. if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
  557. // we're currently retrying this partition so we need to filter out this message
  558. f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])
  559. batch[i] = nil
  560. if msg.flags&chaser == chaser {
  561. // ...but now we can start processing future messages again
  562. Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
  563. f.broker.ID(), msg.Topic, msg.Partition)
  564. delete(f.currentRetries[msg.Topic], msg.Partition)
  565. }
  566. continue
  567. }
  568. partitionSet := msgSets[msg.Topic]
  569. if partitionSet == nil {
  570. partitionSet = make(map[int32][]*ProducerMessage)
  571. msgSets[msg.Topic] = partitionSet
  572. }
  573. partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
  574. }
  575. return msgSets
  576. }
  577. func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, response *ProduceResponse) {
  578. // we iterate through the blocks in the request set, not the response, so that we notice
  579. // if the response is missing a block completely
  580. for topic, partitionSet := range msgSets {
  581. for partition, msgs := range partitionSet {
  582. block := response.GetBlock(topic, partition)
  583. if block == nil {
  584. f.parent.returnErrors(msgs, ErrIncompleteResponse)
  585. continue
  586. }
  587. switch block.Err {
  588. // Success
  589. case ErrNoError:
  590. for i := range msgs {
  591. msgs[i].Offset = block.Offset + int64(i)
  592. }
  593. f.parent.returnSuccesses(msgs)
  594. // Retriable errors
  595. case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
  596. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  597. Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
  598. f.broker.ID(), topic, partition, block.Err)
  599. if f.currentRetries[topic] == nil {
  600. f.currentRetries[topic] = make(map[int32]error)
  601. }
  602. f.currentRetries[topic][partition] = block.Err
  603. f.parent.retryMessages(msgs, block.Err)
  604. // Other non-retriable errors
  605. default:
  606. f.parent.returnErrors(msgs, block.Err)
  607. }
  608. }
  609. }
  610. }
  611. // singleton
  612. // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
  613. // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
  614. func (p *asyncProducer) retryHandler() {
  615. var msg *ProducerMessage
  616. buf := queue.New()
  617. for {
  618. if buf.Length() == 0 {
  619. msg = <-p.retries
  620. } else {
  621. select {
  622. case msg = <-p.retries:
  623. case p.input <- buf.Peek().(*ProducerMessage):
  624. buf.Remove()
  625. continue
  626. }
  627. }
  628. if msg == nil {
  629. return
  630. }
  631. buf.Add(msg)
  632. }
  633. }
  634. // utility functions
  635. func (p *asyncProducer) shutdown() {
  636. Logger.Println("Producer shutting down.")
  637. p.inFlight.Add(1)
  638. p.input <- &ProducerMessage{flags: shutdown}
  639. p.inFlight.Wait()
  640. if p.ownClient {
  641. err := p.client.Close()
  642. if err != nil {
  643. Logger.Println("producer/shutdown failed to close the embedded client:", err)
  644. }
  645. }
  646. close(p.input)
  647. close(p.retries)
  648. close(p.errors)
  649. close(p.successes)
  650. }
  651. func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
  652. req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}
  653. empty := true
  654. for topic, partitionSet := range batch {
  655. for partition, msgSet := range partitionSet {
  656. setToSend := new(MessageSet)
  657. setSize := 0
  658. for _, msg := range msgSet {
  659. var keyBytes, valBytes []byte
  660. var err error
  661. if msg.Key != nil {
  662. if keyBytes, err = msg.Key.Encode(); err != nil {
  663. p.returnError(msg, err)
  664. continue
  665. }
  666. }
  667. if msg.Value != nil {
  668. if valBytes, err = msg.Value.Encode(); err != nil {
  669. p.returnError(msg, err)
  670. continue
  671. }
  672. }
  673. if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes {
  674. // compression causes message-sets to be wrapped as single messages, which have tighter
  675. // size requirements, so we have to respect those limits
  676. valBytes, err := encode(setToSend)
  677. if err != nil {
  678. Logger.Println(err) // if this happens, it's basically our fault.
  679. panic(err)
  680. }
  681. req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
  682. setToSend = new(MessageSet)
  683. setSize = 0
  684. }
  685. setSize += msg.byteSize()
  686. setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes})
  687. empty = false
  688. }
  689. if p.conf.Producer.Compression == CompressionNone {
  690. req.AddSet(topic, partition, setToSend)
  691. } else {
  692. valBytes, err := encode(setToSend)
  693. if err != nil {
  694. Logger.Println(err) // if this happens, it's basically our fault.
  695. panic(err)
  696. }
  697. req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
  698. }
  699. }
  700. }
  701. if empty {
  702. return nil
  703. }
  704. return req
  705. }
  706. func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
  707. msg.clear()
  708. pErr := &ProducerError{Msg: msg, Err: err}
  709. if p.conf.Producer.Return.Errors {
  710. p.errors <- pErr
  711. } else {
  712. Logger.Println(pErr)
  713. }
  714. p.inFlight.Done()
  715. }
  716. func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
  717. for _, msg := range batch {
  718. if msg != nil {
  719. p.returnError(msg, err)
  720. }
  721. }
  722. }
  723. func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
  724. for _, msg := range batch {
  725. if msg == nil {
  726. continue
  727. }
  728. if p.conf.Producer.Return.Successes {
  729. msg.clear()
  730. p.successes <- msg
  731. }
  732. p.inFlight.Done()
  733. }
  734. }
  735. func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
  736. for _, msg := range batch {
  737. if msg == nil {
  738. continue
  739. }
  740. if msg.retries >= p.conf.Producer.Retry.Max {
  741. p.returnError(msg, err)
  742. } else {
  743. msg.retries++
  744. p.retries <- msg
  745. }
  746. }
  747. }
  748. func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage {
  749. p.brokerLock.Lock()
  750. defer p.brokerLock.Unlock()
  751. bp := p.brokers[broker]
  752. if bp == nil {
  753. bp = p.newBrokerProducer(broker)
  754. p.brokers[broker] = bp
  755. p.brokerRefs[bp] = 0
  756. }
  757. p.brokerRefs[bp]++
  758. return bp
  759. }
  760. func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) {
  761. p.brokerLock.Lock()
  762. defer p.brokerLock.Unlock()
  763. p.brokerRefs[bp]--
  764. if p.brokerRefs[bp] == 0 {
  765. close(bp)
  766. delete(p.brokerRefs, bp)
  767. if p.brokers[broker] == bp {
  768. delete(p.brokers, broker)
  769. }
  770. }
  771. }
  772. func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
  773. p.brokerLock.Lock()
  774. defer p.brokerLock.Unlock()
  775. delete(p.brokers, broker)
  776. }