async_producer.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948
  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/eapache/go-resiliency/breaker"
  7. "github.com/eapache/queue"
  8. )
  9. // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
  10. // to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
  11. // and parses responses for errors. You must read from the Errors() channel or the
  12. // producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
  13. // leaks: it will not be garbage-collected automatically when it passes out of
  14. // scope.
  15. type AsyncProducer interface {
  16. // AsyncClose triggers a shutdown of the producer, flushing any messages it may
  17. // have buffered. The shutdown has completed when both the Errors and Successes
  18. // channels have been closed. When calling AsyncClose, you *must* continue to
  19. // read from those channels in order to drain the results of any messages in
  20. // flight.
  21. AsyncClose()
  22. // Close shuts down the producer and flushes any messages it may have buffered.
  23. // You must call this function before a producer object passes out of scope, as
  24. // it may otherwise leak memory. You must call this before calling Close on the
  25. // underlying client.
  26. Close() error
  27. // Input is the input channel for the user to write messages to that they
  28. // wish to send.
  29. Input() chan<- *ProducerMessage
  30. // Successes is the success output channel back to the user when AckSuccesses is
  31. // enabled. If Return.Successes is true, you MUST read from this channel or the
  32. // Producer will deadlock. It is suggested that you send and read messages
  33. // together in a single select statement.
  34. Successes() <-chan *ProducerMessage
  35. // Errors is the error output channel back to the user. You MUST read from this
  36. // channel or the Producer will deadlock when the channel is full. Alternatively,
  37. // you can set Producer.Return.Errors in your config to false, which prevents
  38. // errors to be returned.
  39. Errors() <-chan *ProducerError
  40. }
  41. type asyncProducer struct {
  42. client Client
  43. conf *Config
  44. ownClient bool
  45. errors chan *ProducerError
  46. input, successes, retries chan *ProducerMessage
  47. inFlight sync.WaitGroup
  48. brokers map[*Broker]chan<- *ProducerMessage
  49. brokerRefs map[chan<- *ProducerMessage]int
  50. brokerLock sync.Mutex
  51. }
  52. // NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
  53. func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
  54. client, err := NewClient(addrs, conf)
  55. if err != nil {
  56. return nil, err
  57. }
  58. p, err := NewAsyncProducerFromClient(client)
  59. if err != nil {
  60. return nil, err
  61. }
  62. p.(*asyncProducer).ownClient = true
  63. return p, nil
  64. }
  65. // NewAsyncProducerFromClient creates a new Producer using the given client. It is still
  66. // necessary to call Close() on the underlying client when shutting down this producer.
  67. func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
  68. // Check that we are not dealing with a closed Client before processing any other arguments
  69. if client.Closed() {
  70. return nil, ErrClosedClient
  71. }
  72. p := &asyncProducer{
  73. client: client,
  74. conf: client.Config(),
  75. errors: make(chan *ProducerError),
  76. input: make(chan *ProducerMessage),
  77. successes: make(chan *ProducerMessage),
  78. retries: make(chan *ProducerMessage),
  79. brokers: make(map[*Broker]chan<- *ProducerMessage),
  80. brokerRefs: make(map[chan<- *ProducerMessage]int),
  81. }
  82. // launch our singleton dispatchers
  83. go withRecover(p.dispatcher)
  84. go withRecover(p.retryHandler)
  85. return p, nil
  86. }
  87. type flagSet int8
  88. const (
  89. chaser flagSet = 1 << iota // message is last in a group that failed
  90. shutdown // start the shutdown process
  91. )
  92. // ProducerMessage is the collection of elements passed to the Producer in order to send a message.
  93. type ProducerMessage struct {
  94. Topic string // The Kafka topic for this message.
  95. // The partitioning key for this message. Pre-existing Encoders include
  96. // StringEncoder and ByteEncoder.
  97. Key Encoder
  98. // The actual message to store in Kafka. Pre-existing Encoders include
  99. // StringEncoder and ByteEncoder.
  100. Value Encoder
  101. // This field is used to hold arbitrary data you wish to include so it
  102. // will be available when receiving on the Successes and Errors channels.
  103. // Sarama completely ignores this field and is only to be used for
  104. // pass-through data.
  105. Metadata interface{}
  106. // Below this point are filled in by the producer as the message is processed
  107. // Offset is the offset of the message stored on the broker. This is only
  108. // guaranteed to be defined if the message was successfully delivered and
  109. // RequiredAcks is not NoResponse.
  110. Offset int64
  111. // Partition is the partition that the message was sent to. This is only
  112. // guaranteed to be defined if the message was successfully delivered.
  113. Partition int32
  114. retries int
  115. flags flagSet
  116. keyCache, valueCache []byte
  117. }
  118. func (m *ProducerMessage) byteSize() int {
  119. size := 26 // the metadata overhead of CRC, flags, etc.
  120. if m.Key != nil {
  121. size += m.Key.Length()
  122. }
  123. if m.Value != nil {
  124. size += m.Value.Length()
  125. }
  126. return size
  127. }
  128. func (m *ProducerMessage) clear() {
  129. m.flags = 0
  130. m.retries = 0
  131. m.keyCache = nil
  132. m.valueCache = nil
  133. }
  134. // ProducerError is the type of error generated when the producer fails to deliver a message.
  135. // It contains the original ProducerMessage as well as the actual error value.
  136. type ProducerError struct {
  137. Msg *ProducerMessage
  138. Err error
  139. }
  140. func (pe ProducerError) Error() string {
  141. return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
  142. }
  143. // ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
  144. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
  145. // when closing a producer.
  146. type ProducerErrors []*ProducerError
  147. func (pe ProducerErrors) Error() string {
  148. return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
  149. }
  150. func (p *asyncProducer) Errors() <-chan *ProducerError {
  151. return p.errors
  152. }
  153. func (p *asyncProducer) Successes() <-chan *ProducerMessage {
  154. return p.successes
  155. }
  156. func (p *asyncProducer) Input() chan<- *ProducerMessage {
  157. return p.input
  158. }
  159. func (p *asyncProducer) Close() error {
  160. p.AsyncClose()
  161. if p.conf.Producer.Return.Successes {
  162. go withRecover(func() {
  163. for _ = range p.successes {
  164. }
  165. })
  166. }
  167. var errors ProducerErrors
  168. if p.conf.Producer.Return.Errors {
  169. for event := range p.errors {
  170. errors = append(errors, event)
  171. }
  172. }
  173. if len(errors) > 0 {
  174. return errors
  175. }
  176. return nil
  177. }
  178. func (p *asyncProducer) AsyncClose() {
  179. go withRecover(p.shutdown)
  180. }
  181. // singleton
  182. // dispatches messages by topic
  183. func (p *asyncProducer) dispatcher() {
  184. handlers := make(map[string]chan<- *ProducerMessage)
  185. shuttingDown := false
  186. for msg := range p.input {
  187. if msg == nil {
  188. Logger.Println("Something tried to send a nil message, it was ignored.")
  189. continue
  190. }
  191. if msg.flags&shutdown != 0 {
  192. shuttingDown = true
  193. p.inFlight.Done()
  194. continue
  195. } else if msg.retries == 0 {
  196. if shuttingDown {
  197. // we can't just call returnError here because that decrements the wait group,
  198. // which hasn't been incremented yet for this message, and shouldn't be
  199. pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
  200. if p.conf.Producer.Return.Errors {
  201. p.errors <- pErr
  202. } else {
  203. Logger.Println(pErr)
  204. }
  205. continue
  206. }
  207. p.inFlight.Add(1)
  208. }
  209. if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||
  210. (msg.byteSize() > p.conf.Producer.MaxMessageBytes) {
  211. p.returnError(msg, ErrMessageSizeTooLarge)
  212. continue
  213. }
  214. handler := handlers[msg.Topic]
  215. if handler == nil {
  216. handler = p.newTopicProducer(msg.Topic)
  217. handlers[msg.Topic] = handler
  218. }
  219. handler <- msg
  220. }
  221. for _, handler := range handlers {
  222. close(handler)
  223. }
  224. }
  225. // one per topic
  226. // partitions messages, then dispatches them by partition
  227. type topicProducer struct {
  228. parent *asyncProducer
  229. topic string
  230. input <-chan *ProducerMessage
  231. breaker *breaker.Breaker
  232. handlers map[int32]chan<- *ProducerMessage
  233. partitioner Partitioner
  234. }
  235. func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
  236. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  237. tp := &topicProducer{
  238. parent: p,
  239. topic: topic,
  240. input: input,
  241. breaker: breaker.New(3, 1, 10*time.Second),
  242. handlers: make(map[int32]chan<- *ProducerMessage),
  243. partitioner: p.conf.Producer.Partitioner(topic),
  244. }
  245. go withRecover(tp.dispatch)
  246. return input
  247. }
  248. func (tp *topicProducer) dispatch() {
  249. for msg := range tp.input {
  250. if msg.retries == 0 {
  251. if err := tp.partitionMessage(msg); err != nil {
  252. tp.parent.returnError(msg, err)
  253. continue
  254. }
  255. }
  256. handler := tp.handlers[msg.Partition]
  257. if handler == nil {
  258. handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
  259. tp.handlers[msg.Partition] = handler
  260. }
  261. handler <- msg
  262. }
  263. for _, handler := range tp.handlers {
  264. close(handler)
  265. }
  266. }
  267. func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
  268. var partitions []int32
  269. err := tp.breaker.Run(func() (err error) {
  270. if tp.partitioner.RequiresConsistency() {
  271. partitions, err = tp.parent.client.Partitions(msg.Topic)
  272. } else {
  273. partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
  274. }
  275. return
  276. })
  277. if err != nil {
  278. return err
  279. }
  280. numPartitions := int32(len(partitions))
  281. if numPartitions == 0 {
  282. return ErrLeaderNotAvailable
  283. }
  284. choice, err := tp.partitioner.Partition(msg, numPartitions)
  285. if err != nil {
  286. return err
  287. } else if choice < 0 || choice >= numPartitions {
  288. return ErrInvalidPartition
  289. }
  290. msg.Partition = partitions[choice]
  291. return nil
  292. }
  293. // one per partition per topic
  294. // dispatches messages to the appropriate broker
  295. // also responsible for maintaining message order during retries
  296. type partitionProducer struct {
  297. parent *asyncProducer
  298. topic string
  299. partition int32
  300. input <-chan *ProducerMessage
  301. leader *Broker
  302. breaker *breaker.Breaker
  303. output chan<- *ProducerMessage
  304. // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
  305. // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
  306. // retryState[msg.retries].expectChaser simply tracks whether we've seen a chaser message for a given level (and
  307. // therefore whether our buffer is complete and safe to flush)
  308. highWatermark int
  309. retryState []partitionRetryState
  310. }
  311. type partitionRetryState struct {
  312. buf []*ProducerMessage
  313. expectChaser bool
  314. }
  315. func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
  316. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  317. pp := &partitionProducer{
  318. parent: p,
  319. topic: topic,
  320. partition: partition,
  321. input: input,
  322. breaker: breaker.New(3, 1, 10*time.Second),
  323. retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
  324. }
  325. go withRecover(pp.dispatch)
  326. return input
  327. }
  328. func (pp *partitionProducer) dispatch() {
  329. // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
  330. // on the first message
  331. pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
  332. if pp.leader != nil {
  333. pp.output = pp.parent.getBrokerProducer(pp.leader)
  334. }
  335. for msg := range pp.input {
  336. if msg.retries > pp.highWatermark {
  337. // a new, higher, retry level; handle it and then back off
  338. pp.newHighWatermark(msg.retries)
  339. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  340. } else if pp.highWatermark > 0 {
  341. // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
  342. if msg.retries < pp.highWatermark {
  343. // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser)
  344. if msg.flags&chaser == chaser {
  345. pp.retryState[msg.retries].expectChaser = false
  346. pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected
  347. } else {
  348. pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
  349. }
  350. continue
  351. } else if msg.flags&chaser == chaser {
  352. // this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set,
  353. // meaning this retry level is done and we can go down (at least) one level and flush that
  354. pp.retryState[pp.highWatermark].expectChaser = false
  355. pp.flushRetryBuffers()
  356. pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected
  357. continue
  358. }
  359. }
  360. // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
  361. // without breaking any of our ordering guarantees
  362. if pp.output == nil {
  363. if err := pp.updateLeader(); err != nil {
  364. pp.parent.returnError(msg, err)
  365. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  366. continue
  367. }
  368. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  369. }
  370. pp.output <- msg
  371. }
  372. if pp.output != nil {
  373. pp.parent.unrefBrokerProducer(pp.leader, pp.output)
  374. }
  375. }
  376. func (pp *partitionProducer) newHighWatermark(hwm int) {
  377. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
  378. pp.highWatermark = hwm
  379. // send off a chaser so that we know when everything "in between" has made it
  380. // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
  381. pp.retryState[pp.highWatermark].expectChaser = true
  382. pp.parent.inFlight.Add(1) // we're generating a chaser message; track it so we don't shut down while it's still inflight
  383. pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: chaser, retries: pp.highWatermark - 1}
  384. // a new HWM means that our current broker selection is out of date
  385. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  386. pp.parent.unrefBrokerProducer(pp.leader, pp.output)
  387. pp.output = nil
  388. }
  389. func (pp *partitionProducer) flushRetryBuffers() {
  390. Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  391. for {
  392. pp.highWatermark--
  393. if pp.output == nil {
  394. if err := pp.updateLeader(); err != nil {
  395. pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
  396. goto flushDone
  397. }
  398. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  399. }
  400. for _, msg := range pp.retryState[pp.highWatermark].buf {
  401. pp.output <- msg
  402. }
  403. flushDone:
  404. pp.retryState[pp.highWatermark].buf = nil
  405. if pp.retryState[pp.highWatermark].expectChaser {
  406. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  407. break
  408. } else if pp.highWatermark == 0 {
  409. Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
  410. break
  411. }
  412. }
  413. }
  414. func (pp *partitionProducer) updateLeader() error {
  415. return pp.breaker.Run(func() (err error) {
  416. if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
  417. return err
  418. }
  419. if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
  420. return err
  421. }
  422. pp.output = pp.parent.getBrokerProducer(pp.leader)
  423. return nil
  424. })
  425. }
  426. // one per broker, constructs both an aggregator and a flusher
  427. func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
  428. input := make(chan *ProducerMessage)
  429. bridge := make(chan []*ProducerMessage)
  430. a := &aggregator{
  431. parent: p,
  432. broker: broker,
  433. input: input,
  434. output: bridge,
  435. }
  436. go withRecover(a.run)
  437. f := &flusher{
  438. parent: p,
  439. broker: broker,
  440. input: bridge,
  441. currentRetries: make(map[string]map[int32]error),
  442. }
  443. go withRecover(f.run)
  444. return input
  445. }
  446. // groups messages together into appropriately-sized batches for sending to the broker
  447. // based on https://godoc.org/github.com/eapache/channels#BatchingChannel
  448. type aggregator struct {
  449. parent *asyncProducer
  450. broker *Broker
  451. input <-chan *ProducerMessage
  452. output chan<- []*ProducerMessage
  453. buffer []*ProducerMessage
  454. bufferBytes int
  455. timer <-chan time.Time
  456. }
  457. func (a *aggregator) run() {
  458. var output chan<- []*ProducerMessage
  459. for {
  460. select {
  461. case msg := <-a.input:
  462. if msg == nil {
  463. goto shutdown
  464. }
  465. if a.wouldOverflow(msg) {
  466. Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID())
  467. a.output <- a.buffer
  468. a.reset()
  469. output = nil
  470. }
  471. a.buffer = append(a.buffer, msg)
  472. a.bufferBytes += msg.byteSize()
  473. if a.readyToFlush(msg) {
  474. output = a.output
  475. } else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil {
  476. a.timer = time.After(a.parent.conf.Producer.Flush.Frequency)
  477. }
  478. case <-a.timer:
  479. output = a.output
  480. case output <- a.buffer:
  481. a.reset()
  482. output = nil
  483. }
  484. }
  485. shutdown:
  486. if len(a.buffer) > 0 {
  487. a.output <- a.buffer
  488. }
  489. close(a.output)
  490. }
  491. func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool {
  492. switch {
  493. // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
  494. case a.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)):
  495. return true
  496. // Would we overflow the size-limit of a compressed message-batch?
  497. case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes:
  498. return true
  499. // Would we overflow simply in number of messages?
  500. case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages:
  501. return true
  502. default:
  503. return false
  504. }
  505. }
  506. func (a *aggregator) readyToFlush(msg *ProducerMessage) bool {
  507. switch {
  508. // If all three config values are 0, we always flush as-fast-as-possible
  509. case a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0:
  510. return true
  511. // If the messages is a chaser we must flush to maintain the state-machine
  512. case msg.flags&chaser == chaser:
  513. return true
  514. // If we've passed the message trigger-point
  515. case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages:
  516. return true
  517. // If we've passed the byte trigger-point
  518. case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes:
  519. return true
  520. default:
  521. return false
  522. }
  523. }
  524. func (a *aggregator) reset() {
  525. a.timer = nil
  526. a.buffer = nil
  527. a.bufferBytes = 0
  528. }
  529. // takes a batch at a time from the aggregator and sends to the broker
  530. type flusher struct {
  531. parent *asyncProducer
  532. broker *Broker
  533. input <-chan []*ProducerMessage
  534. currentRetries map[string]map[int32]error
  535. }
  536. func (f *flusher) run() {
  537. var closing error
  538. Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID())
  539. for batch := range f.input {
  540. if closing != nil {
  541. f.parent.retryMessages(batch, closing)
  542. continue
  543. }
  544. msgSets := f.groupAndFilter(batch)
  545. request := f.parent.buildRequest(msgSets)
  546. if request == nil {
  547. continue
  548. }
  549. response, err := f.broker.Produce(request)
  550. switch err.(type) {
  551. case nil:
  552. break
  553. case PacketEncodingError:
  554. f.parent.returnErrors(batch, err)
  555. continue
  556. default:
  557. Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err)
  558. f.parent.abandonBrokerConnection(f.broker)
  559. _ = f.broker.Close()
  560. closing = err
  561. f.parent.retryMessages(batch, err)
  562. continue
  563. }
  564. if response == nil {
  565. // this only happens when RequiredAcks is NoResponse, so we have to assume success
  566. f.parent.returnSuccesses(batch)
  567. continue
  568. }
  569. f.parseResponse(msgSets, response)
  570. }
  571. Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
  572. }
  573. func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage {
  574. var err error
  575. msgSets := make(map[string]map[int32][]*ProducerMessage)
  576. for i, msg := range batch {
  577. if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
  578. // we're currently retrying this partition so we need to filter out this message
  579. f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])
  580. batch[i] = nil
  581. if msg.flags&chaser == chaser {
  582. // ...but now we can start processing future messages again
  583. Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
  584. f.broker.ID(), msg.Topic, msg.Partition)
  585. delete(f.currentRetries[msg.Topic], msg.Partition)
  586. }
  587. continue
  588. }
  589. if msg.Key != nil {
  590. if msg.keyCache, err = msg.Key.Encode(); err != nil {
  591. f.parent.returnError(msg, err)
  592. batch[i] = nil
  593. continue
  594. }
  595. }
  596. if msg.Value != nil {
  597. if msg.valueCache, err = msg.Value.Encode(); err != nil {
  598. f.parent.returnError(msg, err)
  599. batch[i] = nil
  600. continue
  601. }
  602. }
  603. partitionSet := msgSets[msg.Topic]
  604. if partitionSet == nil {
  605. partitionSet = make(map[int32][]*ProducerMessage)
  606. msgSets[msg.Topic] = partitionSet
  607. }
  608. partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
  609. }
  610. return msgSets
  611. }
  612. func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, response *ProduceResponse) {
  613. // we iterate through the blocks in the request set, not the response, so that we notice
  614. // if the response is missing a block completely
  615. for topic, partitionSet := range msgSets {
  616. for partition, msgs := range partitionSet {
  617. block := response.GetBlock(topic, partition)
  618. if block == nil {
  619. f.parent.returnErrors(msgs, ErrIncompleteResponse)
  620. continue
  621. }
  622. switch block.Err {
  623. // Success
  624. case ErrNoError:
  625. for i := range msgs {
  626. msgs[i].Offset = block.Offset + int64(i)
  627. }
  628. f.parent.returnSuccesses(msgs)
  629. // Retriable errors
  630. case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
  631. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  632. Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
  633. f.broker.ID(), topic, partition, block.Err)
  634. if f.currentRetries[topic] == nil {
  635. f.currentRetries[topic] = make(map[int32]error)
  636. }
  637. f.currentRetries[topic][partition] = block.Err
  638. f.parent.retryMessages(msgs, block.Err)
  639. // Other non-retriable errors
  640. default:
  641. f.parent.returnErrors(msgs, block.Err)
  642. }
  643. }
  644. }
  645. }
  646. // singleton
  647. // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
  648. // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
  649. func (p *asyncProducer) retryHandler() {
  650. var msg *ProducerMessage
  651. buf := queue.New()
  652. for {
  653. if buf.Length() == 0 {
  654. msg = <-p.retries
  655. } else {
  656. select {
  657. case msg = <-p.retries:
  658. case p.input <- buf.Peek().(*ProducerMessage):
  659. buf.Remove()
  660. continue
  661. }
  662. }
  663. if msg == nil {
  664. return
  665. }
  666. buf.Add(msg)
  667. }
  668. }
  669. // utility functions
  670. func (p *asyncProducer) shutdown() {
  671. Logger.Println("Producer shutting down.")
  672. p.inFlight.Add(1)
  673. p.input <- &ProducerMessage{flags: shutdown}
  674. p.inFlight.Wait()
  675. if p.ownClient {
  676. err := p.client.Close()
  677. if err != nil {
  678. Logger.Println("producer/shutdown failed to close the embedded client:", err)
  679. }
  680. }
  681. close(p.input)
  682. close(p.retries)
  683. close(p.errors)
  684. close(p.successes)
  685. }
  686. func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
  687. req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}
  688. empty := true
  689. for topic, partitionSet := range batch {
  690. for partition, msgSet := range partitionSet {
  691. setToSend := new(MessageSet)
  692. setSize := 0
  693. for _, msg := range msgSet {
  694. if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes {
  695. // compression causes message-sets to be wrapped as single messages, which have tighter
  696. // size requirements, so we have to respect those limits
  697. valBytes, err := encode(setToSend)
  698. if err != nil {
  699. Logger.Println(err) // if this happens, it's basically our fault.
  700. panic(err)
  701. }
  702. req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
  703. setToSend = new(MessageSet)
  704. setSize = 0
  705. }
  706. setSize += msg.byteSize()
  707. setToSend.addMessage(&Message{Codec: CompressionNone, Key: msg.keyCache, Value: msg.valueCache})
  708. empty = false
  709. }
  710. if p.conf.Producer.Compression == CompressionNone {
  711. req.AddSet(topic, partition, setToSend)
  712. } else {
  713. valBytes, err := encode(setToSend)
  714. if err != nil {
  715. Logger.Println(err) // if this happens, it's basically our fault.
  716. panic(err)
  717. }
  718. req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
  719. }
  720. }
  721. }
  722. if empty {
  723. return nil
  724. }
  725. return req
  726. }
  727. func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
  728. msg.clear()
  729. pErr := &ProducerError{Msg: msg, Err: err}
  730. if p.conf.Producer.Return.Errors {
  731. p.errors <- pErr
  732. } else {
  733. Logger.Println(pErr)
  734. }
  735. p.inFlight.Done()
  736. }
  737. func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
  738. for _, msg := range batch {
  739. if msg != nil {
  740. p.returnError(msg, err)
  741. }
  742. }
  743. }
  744. func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
  745. for _, msg := range batch {
  746. if msg == nil {
  747. continue
  748. }
  749. if p.conf.Producer.Return.Successes {
  750. msg.clear()
  751. p.successes <- msg
  752. }
  753. p.inFlight.Done()
  754. }
  755. }
  756. func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
  757. for _, msg := range batch {
  758. if msg == nil {
  759. continue
  760. }
  761. if msg.retries >= p.conf.Producer.Retry.Max {
  762. p.returnError(msg, err)
  763. } else {
  764. msg.retries++
  765. p.retries <- msg
  766. }
  767. }
  768. }
  769. func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage {
  770. p.brokerLock.Lock()
  771. defer p.brokerLock.Unlock()
  772. bp := p.brokers[broker]
  773. if bp == nil {
  774. bp = p.newBrokerProducer(broker)
  775. p.brokers[broker] = bp
  776. p.brokerRefs[bp] = 0
  777. }
  778. p.brokerRefs[bp]++
  779. return bp
  780. }
  781. func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) {
  782. p.brokerLock.Lock()
  783. defer p.brokerLock.Unlock()
  784. p.brokerRefs[bp]--
  785. if p.brokerRefs[bp] == 0 {
  786. close(bp)
  787. delete(p.brokerRefs, bp)
  788. if p.brokers[broker] == bp {
  789. delete(p.brokers, broker)
  790. }
  791. }
  792. }
  793. func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
  794. p.brokerLock.Lock()
  795. defer p.brokerLock.Unlock()
  796. delete(p.brokers, broker)
  797. }