async_producer.go 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/eapache/go-resiliency/breaker"
  8. "github.com/eapache/queue"
  9. )
  10. // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
  11. // to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
  12. // and parses responses for errors. You must read from the Errors() channel or the
  13. // producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
  14. // leaks: it will not be garbage-collected automatically when it passes out of
  15. // scope.
  16. type AsyncProducer interface {
  17. // AsyncClose triggers a shutdown of the producer. The shutdown has completed
  18. // when both the Errors and Successes channels have been closed. When calling
  19. // AsyncClose, you *must* continue to read from those channels in order to
  20. // drain the results of any messages in flight.
  21. AsyncClose()
  22. // Close shuts down the producer and waits for any buffered messages to be
  23. // flushed. You must call this function before a producer object passes out of
  24. // scope, as it may otherwise leak memory. You must call this before calling
  25. // Close on the underlying client.
  26. Close() error
  27. // Input is the input channel for the user to write messages to that they
  28. // wish to send.
  29. Input() chan<- *ProducerMessage
  30. // Successes is the success output channel back to the user when Return.Successes is
  31. // enabled. If Return.Successes is true, you MUST read from this channel or the
  32. // Producer will deadlock. It is suggested that you send and read messages
  33. // together in a single select statement.
  34. Successes() <-chan *ProducerMessage
  35. // Errors is the error output channel back to the user. You MUST read from this
  36. // channel or the Producer will deadlock when the channel is full. Alternatively,
  37. // you can set Producer.Return.Errors in your config to false, which prevents
  38. // errors to be returned.
  39. Errors() <-chan *ProducerError
  40. }
  41. // transactionManager keeps the state necessary to ensure idempotent production
  42. type transactionManager struct {
  43. producerID int64
  44. producerEpoch int16
  45. sequenceNumbers map[string]int32
  46. mutex sync.Mutex
  47. }
  48. const (
  49. noProducerID = -1
  50. noProducerEpoch = -1
  51. )
  52. func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
  53. key := fmt.Sprintf("%s-%d", topic, partition)
  54. t.mutex.Lock()
  55. defer t.mutex.Unlock()
  56. sequence := t.sequenceNumbers[key]
  57. t.sequenceNumbers[key] = sequence + 1
  58. return sequence
  59. }
  60. func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
  61. txnmgr := &transactionManager{
  62. producerID: noProducerID,
  63. producerEpoch: noProducerEpoch,
  64. }
  65. if conf.Producer.Idempotent {
  66. initProducerIDResponse, err := client.InitProducerID()
  67. if err != nil {
  68. return nil, err
  69. }
  70. txnmgr.producerID = initProducerIDResponse.ProducerID
  71. txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
  72. txnmgr.sequenceNumbers = make(map[string]int32)
  73. txnmgr.mutex = sync.Mutex{}
  74. Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
  75. }
  76. return txnmgr, nil
  77. }
  78. type asyncProducer struct {
  79. client Client
  80. conf *Config
  81. errors chan *ProducerError
  82. input, successes, retries chan *ProducerMessage
  83. inFlight sync.WaitGroup
  84. brokers map[*Broker]*brokerProducer
  85. brokerRefs map[*brokerProducer]int
  86. brokerLock sync.Mutex
  87. txnmgr *transactionManager
  88. }
  89. // NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
  90. func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
  91. client, err := NewClient(addrs, conf)
  92. if err != nil {
  93. return nil, err
  94. }
  95. return newAsyncProducer(client)
  96. }
  97. // NewAsyncProducerFromClient creates a new Producer using the given client. It is still
  98. // necessary to call Close() on the underlying client when shutting down this producer.
  99. func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
  100. // For clients passed in by the client, ensure we don't
  101. // call Close() on it.
  102. cli := &nopCloserClient{client}
  103. return newAsyncProducer(cli)
  104. }
  105. func newAsyncProducer(client Client) (AsyncProducer, error) {
  106. // Check that we are not dealing with a closed Client before processing any other arguments
  107. if client.Closed() {
  108. return nil, ErrClosedClient
  109. }
  110. txnmgr, err := newTransactionManager(client.Config(), client)
  111. if err != nil {
  112. return nil, err
  113. }
  114. p := &asyncProducer{
  115. client: client,
  116. conf: client.Config(),
  117. errors: make(chan *ProducerError),
  118. input: make(chan *ProducerMessage),
  119. successes: make(chan *ProducerMessage),
  120. retries: make(chan *ProducerMessage),
  121. brokers: make(map[*Broker]*brokerProducer),
  122. brokerRefs: make(map[*brokerProducer]int),
  123. txnmgr: txnmgr,
  124. }
  125. // launch our singleton dispatchers
  126. go withRecover(p.dispatcher)
  127. go withRecover(p.retryHandler)
  128. return p, nil
  129. }
  130. type flagSet int8
  131. const (
  132. syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
  133. fin // final message from partitionProducer to brokerProducer and back
  134. shutdown // start the shutdown process
  135. )
  136. // ProducerMessage is the collection of elements passed to the Producer in order to send a message.
  137. type ProducerMessage struct {
  138. Topic string // The Kafka topic for this message.
  139. // The partitioning key for this message. Pre-existing Encoders include
  140. // StringEncoder and ByteEncoder.
  141. Key Encoder
  142. // The actual message to store in Kafka. Pre-existing Encoders include
  143. // StringEncoder and ByteEncoder.
  144. Value Encoder
  145. // The headers are key-value pairs that are transparently passed
  146. // by Kafka between producers and consumers.
  147. Headers []RecordHeader
  148. // This field is used to hold arbitrary data you wish to include so it
  149. // will be available when receiving on the Successes and Errors channels.
  150. // Sarama completely ignores this field and is only to be used for
  151. // pass-through data.
  152. Metadata interface{}
  153. // Below this point are filled in by the producer as the message is processed
  154. // Offset is the offset of the message stored on the broker. This is only
  155. // guaranteed to be defined if the message was successfully delivered and
  156. // RequiredAcks is not NoResponse.
  157. Offset int64
  158. // Partition is the partition that the message was sent to. This is only
  159. // guaranteed to be defined if the message was successfully delivered.
  160. Partition int32
  161. // Timestamp is the timestamp assigned to the message by the broker. This
  162. // is only guaranteed to be defined if the message was successfully
  163. // delivered, RequiredAcks is not NoResponse, and the Kafka broker is at
  164. // least version 0.10.0.
  165. Timestamp time.Time
  166. retries int
  167. flags flagSet
  168. expectation chan *ProducerError
  169. sequenceNumber int32
  170. }
  171. const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
  172. func (m *ProducerMessage) byteSize(version int) int {
  173. var size int
  174. if version >= 2 {
  175. size = maximumRecordOverhead
  176. for _, h := range m.Headers {
  177. size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
  178. }
  179. } else {
  180. size = producerMessageOverhead
  181. }
  182. if m.Key != nil {
  183. size += m.Key.Length()
  184. }
  185. if m.Value != nil {
  186. size += m.Value.Length()
  187. }
  188. return size
  189. }
  190. func (m *ProducerMessage) clear() {
  191. m.flags = 0
  192. m.retries = 0
  193. }
  194. // ProducerError is the type of error generated when the producer fails to deliver a message.
  195. // It contains the original ProducerMessage as well as the actual error value.
  196. type ProducerError struct {
  197. Msg *ProducerMessage
  198. Err error
  199. }
  200. func (pe ProducerError) Error() string {
  201. return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
  202. }
  203. // ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
  204. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
  205. // when closing a producer.
  206. type ProducerErrors []*ProducerError
  207. func (pe ProducerErrors) Error() string {
  208. return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
  209. }
  210. func (p *asyncProducer) Errors() <-chan *ProducerError {
  211. return p.errors
  212. }
  213. func (p *asyncProducer) Successes() <-chan *ProducerMessage {
  214. return p.successes
  215. }
  216. func (p *asyncProducer) Input() chan<- *ProducerMessage {
  217. return p.input
  218. }
  219. func (p *asyncProducer) Close() error {
  220. p.AsyncClose()
  221. if p.conf.Producer.Return.Successes {
  222. go withRecover(func() {
  223. for range p.successes {
  224. }
  225. })
  226. }
  227. var errors ProducerErrors
  228. if p.conf.Producer.Return.Errors {
  229. for event := range p.errors {
  230. errors = append(errors, event)
  231. }
  232. } else {
  233. <-p.errors
  234. }
  235. if len(errors) > 0 {
  236. return errors
  237. }
  238. return nil
  239. }
  240. func (p *asyncProducer) AsyncClose() {
  241. go withRecover(p.shutdown)
  242. }
  243. // singleton
  244. // dispatches messages by topic
  245. func (p *asyncProducer) dispatcher() {
  246. handlers := make(map[string]chan<- *ProducerMessage)
  247. shuttingDown := false
  248. for msg := range p.input {
  249. if msg == nil {
  250. Logger.Println("Something tried to send a nil message, it was ignored.")
  251. continue
  252. }
  253. if msg.flags&shutdown != 0 {
  254. shuttingDown = true
  255. p.inFlight.Done()
  256. continue
  257. } else if msg.retries == 0 {
  258. if shuttingDown {
  259. // we can't just call returnError here because that decrements the wait group,
  260. // which hasn't been incremented yet for this message, and shouldn't be
  261. pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
  262. if p.conf.Producer.Return.Errors {
  263. p.errors <- pErr
  264. } else {
  265. Logger.Println(pErr)
  266. }
  267. continue
  268. }
  269. p.inFlight.Add(1)
  270. }
  271. version := 1
  272. if p.conf.Version.IsAtLeast(V0_11_0_0) {
  273. version = 2
  274. } else if msg.Headers != nil {
  275. p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
  276. continue
  277. }
  278. if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
  279. p.returnError(msg, ErrMessageSizeTooLarge)
  280. continue
  281. }
  282. handler := handlers[msg.Topic]
  283. if handler == nil {
  284. handler = p.newTopicProducer(msg.Topic)
  285. handlers[msg.Topic] = handler
  286. }
  287. handler <- msg
  288. }
  289. for _, handler := range handlers {
  290. close(handler)
  291. }
  292. }
  293. // one per topic
  294. // partitions messages, then dispatches them by partition
  295. type topicProducer struct {
  296. parent *asyncProducer
  297. topic string
  298. input <-chan *ProducerMessage
  299. breaker *breaker.Breaker
  300. handlers map[int32]chan<- *ProducerMessage
  301. partitioner Partitioner
  302. }
  303. func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
  304. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  305. tp := &topicProducer{
  306. parent: p,
  307. topic: topic,
  308. input: input,
  309. breaker: breaker.New(3, 1, 10*time.Second),
  310. handlers: make(map[int32]chan<- *ProducerMessage),
  311. partitioner: p.conf.Producer.Partitioner(topic),
  312. }
  313. go withRecover(tp.dispatch)
  314. return input
  315. }
  316. func (tp *topicProducer) dispatch() {
  317. for msg := range tp.input {
  318. if msg.retries == 0 {
  319. if err := tp.partitionMessage(msg); err != nil {
  320. tp.parent.returnError(msg, err)
  321. continue
  322. }
  323. }
  324. // All messages being retried (sent or not) have already had their retry count updated
  325. if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
  326. msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
  327. }
  328. handler := tp.handlers[msg.Partition]
  329. if handler == nil {
  330. handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
  331. tp.handlers[msg.Partition] = handler
  332. }
  333. handler <- msg
  334. }
  335. for _, handler := range tp.handlers {
  336. close(handler)
  337. }
  338. }
  339. func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
  340. var partitions []int32
  341. err := tp.breaker.Run(func() (err error) {
  342. var requiresConsistency = false
  343. if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
  344. requiresConsistency = ep.MessageRequiresConsistency(msg)
  345. } else {
  346. requiresConsistency = tp.partitioner.RequiresConsistency()
  347. }
  348. if requiresConsistency {
  349. partitions, err = tp.parent.client.Partitions(msg.Topic)
  350. } else {
  351. partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
  352. }
  353. return
  354. })
  355. if err != nil {
  356. return err
  357. }
  358. numPartitions := int32(len(partitions))
  359. if numPartitions == 0 {
  360. return ErrLeaderNotAvailable
  361. }
  362. choice, err := tp.partitioner.Partition(msg, numPartitions)
  363. if err != nil {
  364. return err
  365. } else if choice < 0 || choice >= numPartitions {
  366. return ErrInvalidPartition
  367. }
  368. msg.Partition = partitions[choice]
  369. return nil
  370. }
  371. // one per partition per topic
  372. // dispatches messages to the appropriate broker
  373. // also responsible for maintaining message order during retries
  374. type partitionProducer struct {
  375. parent *asyncProducer
  376. topic string
  377. partition int32
  378. input <-chan *ProducerMessage
  379. leader *Broker
  380. breaker *breaker.Breaker
  381. brokerProducer *brokerProducer
  382. // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
  383. // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
  384. // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
  385. // therefore whether our buffer is complete and safe to flush)
  386. highWatermark int
  387. retryState []partitionRetryState
  388. }
  389. type partitionRetryState struct {
  390. buf []*ProducerMessage
  391. expectChaser bool
  392. }
  393. func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
  394. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  395. pp := &partitionProducer{
  396. parent: p,
  397. topic: topic,
  398. partition: partition,
  399. input: input,
  400. breaker: breaker.New(3, 1, 10*time.Second),
  401. retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
  402. }
  403. go withRecover(pp.dispatch)
  404. return input
  405. }
  406. func (pp *partitionProducer) backoff(retries int) {
  407. var backoff time.Duration
  408. if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
  409. maxRetries := pp.parent.conf.Producer.Retry.Max
  410. backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
  411. } else {
  412. backoff = pp.parent.conf.Producer.Retry.Backoff
  413. }
  414. if backoff > 0 {
  415. time.Sleep(backoff)
  416. }
  417. }
  418. func (pp *partitionProducer) dispatch() {
  419. // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
  420. // on the first message
  421. pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
  422. if pp.leader != nil {
  423. pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
  424. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  425. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  426. }
  427. defer func() {
  428. if pp.brokerProducer != nil {
  429. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  430. }
  431. }()
  432. for msg := range pp.input {
  433. if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
  434. select {
  435. case <-pp.brokerProducer.abandoned:
  436. // a message on the abandoned channel means that our current broker selection is out of date
  437. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  438. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  439. pp.brokerProducer = nil
  440. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  441. default:
  442. // producer connection is still open.
  443. }
  444. }
  445. if msg.retries > pp.highWatermark {
  446. // a new, higher, retry level; handle it and then back off
  447. pp.newHighWatermark(msg.retries)
  448. pp.backoff(msg.retries)
  449. } else if pp.highWatermark > 0 {
  450. // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
  451. if msg.retries < pp.highWatermark {
  452. // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
  453. if msg.flags&fin == fin {
  454. pp.retryState[msg.retries].expectChaser = false
  455. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  456. } else {
  457. pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
  458. }
  459. continue
  460. } else if msg.flags&fin == fin {
  461. // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
  462. // meaning this retry level is done and we can go down (at least) one level and flush that
  463. pp.retryState[pp.highWatermark].expectChaser = false
  464. pp.flushRetryBuffers()
  465. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  466. continue
  467. }
  468. }
  469. // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
  470. // without breaking any of our ordering guarantees
  471. if pp.brokerProducer == nil {
  472. if err := pp.updateLeader(); err != nil {
  473. pp.parent.returnError(msg, err)
  474. pp.backoff(msg.retries)
  475. continue
  476. }
  477. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  478. }
  479. pp.brokerProducer.input <- msg
  480. }
  481. }
  482. func (pp *partitionProducer) newHighWatermark(hwm int) {
  483. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
  484. pp.highWatermark = hwm
  485. // send off a fin so that we know when everything "in between" has made it
  486. // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
  487. pp.retryState[pp.highWatermark].expectChaser = true
  488. pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
  489. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
  490. // a new HWM means that our current broker selection is out of date
  491. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  492. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  493. pp.brokerProducer = nil
  494. }
  495. func (pp *partitionProducer) flushRetryBuffers() {
  496. Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  497. for {
  498. pp.highWatermark--
  499. if pp.brokerProducer == nil {
  500. if err := pp.updateLeader(); err != nil {
  501. pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
  502. goto flushDone
  503. }
  504. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  505. }
  506. for _, msg := range pp.retryState[pp.highWatermark].buf {
  507. pp.brokerProducer.input <- msg
  508. }
  509. flushDone:
  510. pp.retryState[pp.highWatermark].buf = nil
  511. if pp.retryState[pp.highWatermark].expectChaser {
  512. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  513. break
  514. } else if pp.highWatermark == 0 {
  515. Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
  516. break
  517. }
  518. }
  519. }
  520. func (pp *partitionProducer) updateLeader() error {
  521. return pp.breaker.Run(func() (err error) {
  522. if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
  523. return err
  524. }
  525. if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
  526. return err
  527. }
  528. pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
  529. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  530. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  531. return nil
  532. })
  533. }
  534. // one per broker; also constructs an associated flusher
  535. func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
  536. var (
  537. input = make(chan *ProducerMessage)
  538. bridge = make(chan *produceSet)
  539. responses = make(chan *brokerProducerResponse)
  540. )
  541. bp := &brokerProducer{
  542. parent: p,
  543. broker: broker,
  544. input: input,
  545. output: bridge,
  546. responses: responses,
  547. buffer: newProduceSet(p),
  548. currentRetries: make(map[string]map[int32]error),
  549. }
  550. go withRecover(bp.run)
  551. // minimal bridge to make the network response `select`able
  552. go withRecover(func() {
  553. for set := range bridge {
  554. request := set.buildRequest()
  555. response, err := broker.Produce(request)
  556. responses <- &brokerProducerResponse{
  557. set: set,
  558. err: err,
  559. res: response,
  560. }
  561. }
  562. close(responses)
  563. })
  564. if p.conf.Producer.Retry.Max <= 0 {
  565. bp.abandoned = make(chan struct{})
  566. }
  567. return bp
  568. }
  569. type brokerProducerResponse struct {
  570. set *produceSet
  571. err error
  572. res *ProduceResponse
  573. }
  574. // groups messages together into appropriately-sized batches for sending to the broker
  575. // handles state related to retries etc
  576. type brokerProducer struct {
  577. parent *asyncProducer
  578. broker *Broker
  579. input chan *ProducerMessage
  580. output chan<- *produceSet
  581. responses <-chan *brokerProducerResponse
  582. abandoned chan struct{}
  583. buffer *produceSet
  584. timer <-chan time.Time
  585. timerFired bool
  586. closing error
  587. currentRetries map[string]map[int32]error
  588. }
  589. func (bp *brokerProducer) run() {
  590. var output chan<- *produceSet
  591. Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
  592. for {
  593. select {
  594. case msg := <-bp.input:
  595. if msg == nil {
  596. bp.shutdown()
  597. return
  598. }
  599. if msg.flags&syn == syn {
  600. Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
  601. bp.broker.ID(), msg.Topic, msg.Partition)
  602. if bp.currentRetries[msg.Topic] == nil {
  603. bp.currentRetries[msg.Topic] = make(map[int32]error)
  604. }
  605. bp.currentRetries[msg.Topic][msg.Partition] = nil
  606. bp.parent.inFlight.Done()
  607. continue
  608. }
  609. if reason := bp.needsRetry(msg); reason != nil {
  610. bp.parent.retryMessage(msg, reason)
  611. if bp.closing == nil && msg.flags&fin == fin {
  612. // we were retrying this partition but we can start processing again
  613. delete(bp.currentRetries[msg.Topic], msg.Partition)
  614. Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
  615. bp.broker.ID(), msg.Topic, msg.Partition)
  616. }
  617. continue
  618. }
  619. if bp.buffer.wouldOverflow(msg) {
  620. if err := bp.waitForSpace(msg); err != nil {
  621. bp.parent.retryMessage(msg, err)
  622. continue
  623. }
  624. }
  625. if err := bp.buffer.add(msg); err != nil {
  626. bp.parent.returnError(msg, err)
  627. continue
  628. }
  629. if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
  630. bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
  631. }
  632. case <-bp.timer:
  633. bp.timerFired = true
  634. case output <- bp.buffer:
  635. bp.rollOver()
  636. case response := <-bp.responses:
  637. bp.handleResponse(response)
  638. }
  639. if bp.timerFired || bp.buffer.readyToFlush() {
  640. output = bp.output
  641. } else {
  642. output = nil
  643. }
  644. }
  645. }
  646. func (bp *brokerProducer) shutdown() {
  647. for !bp.buffer.empty() {
  648. select {
  649. case response := <-bp.responses:
  650. bp.handleResponse(response)
  651. case bp.output <- bp.buffer:
  652. bp.rollOver()
  653. }
  654. }
  655. close(bp.output)
  656. for response := range bp.responses {
  657. bp.handleResponse(response)
  658. }
  659. Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
  660. }
  661. func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
  662. if bp.closing != nil {
  663. return bp.closing
  664. }
  665. return bp.currentRetries[msg.Topic][msg.Partition]
  666. }
  667. func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
  668. Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
  669. for {
  670. select {
  671. case response := <-bp.responses:
  672. bp.handleResponse(response)
  673. // handling a response can change our state, so re-check some things
  674. if reason := bp.needsRetry(msg); reason != nil {
  675. return reason
  676. } else if !bp.buffer.wouldOverflow(msg) {
  677. return nil
  678. }
  679. case bp.output <- bp.buffer:
  680. bp.rollOver()
  681. return nil
  682. }
  683. }
  684. }
  685. func (bp *brokerProducer) rollOver() {
  686. bp.timer = nil
  687. bp.timerFired = false
  688. bp.buffer = newProduceSet(bp.parent)
  689. }
  690. func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
  691. if response.err != nil {
  692. bp.handleError(response.set, response.err)
  693. } else {
  694. bp.handleSuccess(response.set, response.res)
  695. }
  696. if bp.buffer.empty() {
  697. bp.rollOver() // this can happen if the response invalidated our buffer
  698. }
  699. }
  700. func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
  701. // we iterate through the blocks in the request set, not the response, so that we notice
  702. // if the response is missing a block completely
  703. var retryTopics []string
  704. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  705. if response == nil {
  706. // this only happens when RequiredAcks is NoResponse, so we have to assume success
  707. bp.parent.returnSuccesses(pSet.msgs)
  708. return
  709. }
  710. block := response.GetBlock(topic, partition)
  711. if block == nil {
  712. bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
  713. return
  714. }
  715. switch block.Err {
  716. // Success
  717. case ErrNoError:
  718. if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
  719. for _, msg := range pSet.msgs {
  720. msg.Timestamp = block.Timestamp
  721. }
  722. }
  723. for i, msg := range pSet.msgs {
  724. msg.Offset = block.Offset + int64(i)
  725. }
  726. bp.parent.returnSuccesses(pSet.msgs)
  727. // Duplicate
  728. case ErrDuplicateSequenceNumber:
  729. bp.parent.returnSuccesses(pSet.msgs)
  730. // Retriable errors
  731. case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
  732. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  733. if bp.parent.conf.Producer.Retry.Max <= 0 {
  734. bp.parent.abandonBrokerConnection(bp.broker)
  735. bp.parent.returnErrors(pSet.msgs, block.Err)
  736. } else {
  737. retryTopics = append(retryTopics, topic)
  738. }
  739. // Other non-retriable errors
  740. default:
  741. if bp.parent.conf.Producer.Retry.Max <= 0 {
  742. bp.parent.abandonBrokerConnection(bp.broker)
  743. }
  744. bp.parent.returnErrors(pSet.msgs, block.Err)
  745. }
  746. })
  747. if len(retryTopics) > 0 {
  748. if bp.parent.conf.Producer.Idempotent {
  749. err := bp.parent.client.RefreshMetadata(retryTopics...)
  750. if err != nil {
  751. Logger.Printf("Failed refreshing metadata because of %v\n", err)
  752. }
  753. }
  754. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  755. block := response.GetBlock(topic, partition)
  756. if block == nil {
  757. // handled in the previous "eachPartition" loop
  758. return
  759. }
  760. switch block.Err {
  761. case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
  762. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  763. Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
  764. bp.broker.ID(), topic, partition, block.Err)
  765. if bp.currentRetries[topic] == nil {
  766. bp.currentRetries[topic] = make(map[int32]error)
  767. }
  768. bp.currentRetries[topic][partition] = block.Err
  769. if bp.parent.conf.Producer.Idempotent {
  770. go bp.parent.retryBatch(topic, partition, pSet, block.Err)
  771. } else {
  772. bp.parent.retryMessages(pSet.msgs, block.Err)
  773. }
  774. // dropping the following messages has the side effect of incrementing their retry count
  775. bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
  776. }
  777. })
  778. }
  779. }
  780. func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
  781. Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
  782. produceSet := newProduceSet(p)
  783. produceSet.msgs[topic] = make(map[int32]*partitionSet)
  784. produceSet.msgs[topic][partition] = pSet
  785. produceSet.bufferBytes += pSet.bufferBytes
  786. produceSet.bufferCount += len(pSet.msgs)
  787. for _, msg := range pSet.msgs {
  788. if msg.retries >= p.conf.Producer.Retry.Max {
  789. p.returnError(msg, kerr)
  790. return
  791. }
  792. msg.retries++
  793. }
  794. // it's expected that a metadata refresh has been requested prior to calling retryBatch
  795. leader, err := p.client.Leader(topic, partition)
  796. if err != nil {
  797. Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
  798. for _, msg := range pSet.msgs {
  799. p.returnError(msg, kerr)
  800. }
  801. return
  802. }
  803. bp := p.getBrokerProducer(leader)
  804. bp.output <- produceSet
  805. }
  806. func (bp *brokerProducer) handleError(sent *produceSet, err error) {
  807. switch err.(type) {
  808. case PacketEncodingError:
  809. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  810. bp.parent.returnErrors(pSet.msgs, err)
  811. })
  812. default:
  813. Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
  814. bp.parent.abandonBrokerConnection(bp.broker)
  815. _ = bp.broker.Close()
  816. bp.closing = err
  817. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  818. bp.parent.retryMessages(pSet.msgs, err)
  819. })
  820. bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  821. bp.parent.retryMessages(pSet.msgs, err)
  822. })
  823. bp.rollOver()
  824. }
  825. }
  826. // singleton
  827. // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
  828. // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
  829. func (p *asyncProducer) retryHandler() {
  830. var msg *ProducerMessage
  831. buf := queue.New()
  832. for {
  833. if buf.Length() == 0 {
  834. msg = <-p.retries
  835. } else {
  836. select {
  837. case msg = <-p.retries:
  838. case p.input <- buf.Peek().(*ProducerMessage):
  839. buf.Remove()
  840. continue
  841. }
  842. }
  843. if msg == nil {
  844. return
  845. }
  846. buf.Add(msg)
  847. }
  848. }
  849. // utility functions
  850. func (p *asyncProducer) shutdown() {
  851. Logger.Println("Producer shutting down.")
  852. p.inFlight.Add(1)
  853. p.input <- &ProducerMessage{flags: shutdown}
  854. p.inFlight.Wait()
  855. err := p.client.Close()
  856. if err != nil {
  857. Logger.Println("producer/shutdown failed to close the embedded client:", err)
  858. }
  859. close(p.input)
  860. close(p.retries)
  861. close(p.errors)
  862. close(p.successes)
  863. }
  864. func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
  865. msg.clear()
  866. pErr := &ProducerError{Msg: msg, Err: err}
  867. if p.conf.Producer.Return.Errors {
  868. p.errors <- pErr
  869. } else {
  870. Logger.Println(pErr)
  871. }
  872. p.inFlight.Done()
  873. }
  874. func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
  875. for _, msg := range batch {
  876. p.returnError(msg, err)
  877. }
  878. }
  879. func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
  880. for _, msg := range batch {
  881. if p.conf.Producer.Return.Successes {
  882. msg.clear()
  883. p.successes <- msg
  884. }
  885. p.inFlight.Done()
  886. }
  887. }
  888. func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
  889. if msg.retries >= p.conf.Producer.Retry.Max {
  890. p.returnError(msg, err)
  891. } else {
  892. msg.retries++
  893. p.retries <- msg
  894. }
  895. }
  896. func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
  897. for _, msg := range batch {
  898. p.retryMessage(msg, err)
  899. }
  900. }
  901. func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
  902. p.brokerLock.Lock()
  903. defer p.brokerLock.Unlock()
  904. bp := p.brokers[broker]
  905. if bp == nil {
  906. bp = p.newBrokerProducer(broker)
  907. p.brokers[broker] = bp
  908. p.brokerRefs[bp] = 0
  909. }
  910. p.brokerRefs[bp]++
  911. return bp
  912. }
  913. func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
  914. p.brokerLock.Lock()
  915. defer p.brokerLock.Unlock()
  916. p.brokerRefs[bp]--
  917. if p.brokerRefs[bp] == 0 {
  918. close(bp.input)
  919. delete(p.brokerRefs, bp)
  920. if p.brokers[broker] == bp {
  921. delete(p.brokers, broker)
  922. }
  923. }
  924. }
  925. func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
  926. p.brokerLock.Lock()
  927. defer p.brokerLock.Unlock()
  928. bc, ok := p.brokers[broker]
  929. if ok && bc.abandoned != nil {
  930. close(bc.abandoned)
  931. }
  932. delete(p.brokers, broker)
  933. }