async_producer.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/eapache/go-resiliency/breaker"
  8. "github.com/eapache/queue"
  9. )
  10. // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
  11. // to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
  12. // and parses responses for errors. You must read from the Errors() channel or the
  13. // producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
  14. // leaks: it will not be garbage-collected automatically when it passes out of
  15. // scope.
  16. type AsyncProducer interface {
  17. // AsyncClose triggers a shutdown of the producer. The shutdown has completed
  18. // when both the Errors and Successes channels have been closed. When calling
  19. // AsyncClose, you *must* continue to read from those channels in order to
  20. // drain the results of any messages in flight.
  21. AsyncClose()
  22. // Close shuts down the producer and waits for any buffered messages to be
  23. // flushed. You must call this function before a producer object passes out of
  24. // scope, as it may otherwise leak memory. You must call this before calling
  25. // Close on the underlying client.
  26. Close() error
  27. // Input is the input channel for the user to write messages to that they
  28. // wish to send.
  29. Input() chan<- *ProducerMessage
  30. // Successes is the success output channel back to the user when Return.Successes is
  31. // enabled. If Return.Successes is true, you MUST read from this channel or the
  32. // Producer will deadlock. It is suggested that you send and read messages
  33. // together in a single select statement.
  34. Successes() <-chan *ProducerMessage
  35. // Errors is the error output channel back to the user. You MUST read from this
  36. // channel or the Producer will deadlock when the channel is full. Alternatively,
  37. // you can set Producer.Return.Errors in your config to false, which prevents
  38. // errors to be returned.
  39. Errors() <-chan *ProducerError
  40. }
  41. // transactionManager keeps the state necessary to ensure idempotent production
  42. type transactionManager struct {
  43. producerID int64
  44. producerEpoch int16
  45. sequenceNumbers map[string]int32
  46. mutex sync.Mutex
  47. }
  48. const (
  49. noProducerID = -1
  50. noProducerEpoch = -1
  51. )
  52. func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
  53. key := fmt.Sprintf("%s-%d", topic, partition)
  54. t.mutex.Lock()
  55. defer t.mutex.Unlock()
  56. sequence := t.sequenceNumbers[key]
  57. t.sequenceNumbers[key] = sequence + 1
  58. return sequence, t.producerEpoch
  59. }
  60. func (t *transactionManager) bumpEpoch() {
  61. t.mutex.Lock()
  62. defer t.mutex.Unlock()
  63. t.producerEpoch++
  64. for k := range t.sequenceNumbers {
  65. t.sequenceNumbers[k] = 0
  66. }
  67. }
  68. func (t *transactionManager) getProducerID() (int64, int16) {
  69. t.mutex.Lock()
  70. defer t.mutex.Unlock()
  71. return t.producerID, t.producerEpoch
  72. }
  73. func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
  74. txnmgr := &transactionManager{
  75. producerID: noProducerID,
  76. producerEpoch: noProducerEpoch,
  77. }
  78. if conf.Producer.Idempotent {
  79. initProducerIDResponse, err := client.InitProducerID()
  80. if err != nil {
  81. return nil, err
  82. }
  83. txnmgr.producerID = initProducerIDResponse.ProducerID
  84. txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
  85. txnmgr.sequenceNumbers = make(map[string]int32)
  86. txnmgr.mutex = sync.Mutex{}
  87. Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
  88. }
  89. return txnmgr, nil
  90. }
  91. type asyncProducer struct {
  92. client Client
  93. conf *Config
  94. errors chan *ProducerError
  95. input, successes, retries chan *ProducerMessage
  96. inFlight sync.WaitGroup
  97. brokers map[*Broker]*brokerProducer
  98. brokerRefs map[*brokerProducer]int
  99. brokerLock sync.Mutex
  100. txnmgr *transactionManager
  101. }
  102. // NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
  103. func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
  104. client, err := NewClient(addrs, conf)
  105. if err != nil {
  106. return nil, err
  107. }
  108. return newAsyncProducer(client)
  109. }
  110. // NewAsyncProducerFromClient creates a new Producer using the given client. It is still
  111. // necessary to call Close() on the underlying client when shutting down this producer.
  112. func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
  113. // For clients passed in by the client, ensure we don't
  114. // call Close() on it.
  115. cli := &nopCloserClient{client}
  116. return newAsyncProducer(cli)
  117. }
  118. func newAsyncProducer(client Client) (AsyncProducer, error) {
  119. // Check that we are not dealing with a closed Client before processing any other arguments
  120. if client.Closed() {
  121. return nil, ErrClosedClient
  122. }
  123. txnmgr, err := newTransactionManager(client.Config(), client)
  124. if err != nil {
  125. return nil, err
  126. }
  127. p := &asyncProducer{
  128. client: client,
  129. conf: client.Config(),
  130. errors: make(chan *ProducerError),
  131. input: make(chan *ProducerMessage),
  132. successes: make(chan *ProducerMessage),
  133. retries: make(chan *ProducerMessage),
  134. brokers: make(map[*Broker]*brokerProducer),
  135. brokerRefs: make(map[*brokerProducer]int),
  136. txnmgr: txnmgr,
  137. }
  138. // launch our singleton dispatchers
  139. go withRecover(p.dispatcher)
  140. go withRecover(p.retryHandler)
  141. return p, nil
  142. }
  143. type flagSet int8
  144. const (
  145. syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
  146. fin // final message from partitionProducer to brokerProducer and back
  147. shutdown // start the shutdown process
  148. )
  149. // ProducerMessage is the collection of elements passed to the Producer in order to send a message.
  150. type ProducerMessage struct {
  151. Topic string // The Kafka topic for this message.
  152. // The partitioning key for this message. Pre-existing Encoders include
  153. // StringEncoder and ByteEncoder.
  154. Key Encoder
  155. // The actual message to store in Kafka. Pre-existing Encoders include
  156. // StringEncoder and ByteEncoder.
  157. Value Encoder
  158. // The headers are key-value pairs that are transparently passed
  159. // by Kafka between producers and consumers.
  160. Headers []RecordHeader
  161. // This field is used to hold arbitrary data you wish to include so it
  162. // will be available when receiving on the Successes and Errors channels.
  163. // Sarama completely ignores this field and is only to be used for
  164. // pass-through data.
  165. Metadata interface{}
  166. // Below this point are filled in by the producer as the message is processed
  167. // Offset is the offset of the message stored on the broker. This is only
  168. // guaranteed to be defined if the message was successfully delivered and
  169. // RequiredAcks is not NoResponse.
  170. Offset int64
  171. // Partition is the partition that the message was sent to. This is only
  172. // guaranteed to be defined if the message was successfully delivered.
  173. Partition int32
  174. // Timestamp can vary in behaviour depending on broker configuration, being
  175. // in either one of the CreateTime or LogAppendTime modes (default CreateTime),
  176. // and requiring version at least 0.10.0.
  177. //
  178. // When configured to CreateTime, the timestamp is specified by the producer
  179. // either by explicitly setting this field, or when the message is added
  180. // to a produce set.
  181. //
  182. // When configured to LogAppendTime, the timestamp assigned to the message
  183. // by the broker. This is only guaranteed to be defined if the message was
  184. // successfully delivered and RequiredAcks is not NoResponse.
  185. Timestamp time.Time
  186. retries int
  187. flags flagSet
  188. expectation chan *ProducerError
  189. sequenceNumber int32
  190. producerEpoch int16
  191. hasSequence bool
  192. }
  193. const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
  194. func (m *ProducerMessage) byteSize(version int) int {
  195. var size int
  196. if version >= 2 {
  197. size = maximumRecordOverhead
  198. for _, h := range m.Headers {
  199. size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
  200. }
  201. } else {
  202. size = producerMessageOverhead
  203. }
  204. if m.Key != nil {
  205. size += m.Key.Length()
  206. }
  207. if m.Value != nil {
  208. size += m.Value.Length()
  209. }
  210. return size
  211. }
  212. func (m *ProducerMessage) clear() {
  213. m.flags = 0
  214. m.retries = 0
  215. m.sequenceNumber = 0
  216. m.producerEpoch = 0
  217. m.hasSequence = false
  218. }
  219. // ProducerError is the type of error generated when the producer fails to deliver a message.
  220. // It contains the original ProducerMessage as well as the actual error value.
  221. type ProducerError struct {
  222. Msg *ProducerMessage
  223. Err error
  224. }
  225. func (pe ProducerError) Error() string {
  226. return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
  227. }
  228. // ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
  229. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
  230. // when closing a producer.
  231. type ProducerErrors []*ProducerError
  232. func (pe ProducerErrors) Error() string {
  233. return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
  234. }
  235. func (p *asyncProducer) Errors() <-chan *ProducerError {
  236. return p.errors
  237. }
  238. func (p *asyncProducer) Successes() <-chan *ProducerMessage {
  239. return p.successes
  240. }
  241. func (p *asyncProducer) Input() chan<- *ProducerMessage {
  242. return p.input
  243. }
  244. func (p *asyncProducer) Close() error {
  245. p.AsyncClose()
  246. if p.conf.Producer.Return.Successes {
  247. go withRecover(func() {
  248. for range p.successes {
  249. }
  250. })
  251. }
  252. var errors ProducerErrors
  253. if p.conf.Producer.Return.Errors {
  254. for event := range p.errors {
  255. errors = append(errors, event)
  256. }
  257. } else {
  258. <-p.errors
  259. }
  260. if len(errors) > 0 {
  261. return errors
  262. }
  263. return nil
  264. }
  265. func (p *asyncProducer) AsyncClose() {
  266. go withRecover(p.shutdown)
  267. }
  268. // singleton
  269. // dispatches messages by topic
  270. func (p *asyncProducer) dispatcher() {
  271. handlers := make(map[string]chan<- *ProducerMessage)
  272. shuttingDown := false
  273. for msg := range p.input {
  274. if msg == nil {
  275. Logger.Println("Something tried to send a nil message, it was ignored.")
  276. continue
  277. }
  278. if msg.flags&shutdown != 0 {
  279. shuttingDown = true
  280. p.inFlight.Done()
  281. continue
  282. } else if msg.retries == 0 {
  283. if shuttingDown {
  284. // we can't just call returnError here because that decrements the wait group,
  285. // which hasn't been incremented yet for this message, and shouldn't be
  286. pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
  287. if p.conf.Producer.Return.Errors {
  288. p.errors <- pErr
  289. } else {
  290. Logger.Println(pErr)
  291. }
  292. continue
  293. }
  294. p.inFlight.Add(1)
  295. }
  296. version := 1
  297. if p.conf.Version.IsAtLeast(V0_11_0_0) {
  298. version = 2
  299. } else if msg.Headers != nil {
  300. p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
  301. continue
  302. }
  303. if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
  304. p.returnError(msg, ErrMessageSizeTooLarge)
  305. continue
  306. }
  307. handler := handlers[msg.Topic]
  308. if handler == nil {
  309. handler = p.newTopicProducer(msg.Topic)
  310. handlers[msg.Topic] = handler
  311. }
  312. handler <- msg
  313. }
  314. for _, handler := range handlers {
  315. close(handler)
  316. }
  317. }
  318. // one per topic
  319. // partitions messages, then dispatches them by partition
  320. type topicProducer struct {
  321. parent *asyncProducer
  322. topic string
  323. input <-chan *ProducerMessage
  324. breaker *breaker.Breaker
  325. handlers map[int32]chan<- *ProducerMessage
  326. partitioner Partitioner
  327. }
  328. func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
  329. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  330. tp := &topicProducer{
  331. parent: p,
  332. topic: topic,
  333. input: input,
  334. breaker: breaker.New(3, 1, 10*time.Second),
  335. handlers: make(map[int32]chan<- *ProducerMessage),
  336. partitioner: p.conf.Producer.Partitioner(topic),
  337. }
  338. go withRecover(tp.dispatch)
  339. return input
  340. }
  341. func (tp *topicProducer) dispatch() {
  342. for msg := range tp.input {
  343. if msg.retries == 0 {
  344. if err := tp.partitionMessage(msg); err != nil {
  345. tp.parent.returnError(msg, err)
  346. continue
  347. }
  348. }
  349. handler := tp.handlers[msg.Partition]
  350. if handler == nil {
  351. handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
  352. tp.handlers[msg.Partition] = handler
  353. }
  354. handler <- msg
  355. }
  356. for _, handler := range tp.handlers {
  357. close(handler)
  358. }
  359. }
  360. func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
  361. var partitions []int32
  362. err := tp.breaker.Run(func() (err error) {
  363. requiresConsistency := false
  364. if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
  365. requiresConsistency = ep.MessageRequiresConsistency(msg)
  366. } else {
  367. requiresConsistency = tp.partitioner.RequiresConsistency()
  368. }
  369. if requiresConsistency {
  370. partitions, err = tp.parent.client.Partitions(msg.Topic)
  371. } else {
  372. partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
  373. }
  374. return
  375. })
  376. if err != nil {
  377. return err
  378. }
  379. numPartitions := int32(len(partitions))
  380. if numPartitions == 0 {
  381. return ErrLeaderNotAvailable
  382. }
  383. choice, err := tp.partitioner.Partition(msg, numPartitions)
  384. if err != nil {
  385. return err
  386. } else if choice < 0 || choice >= numPartitions {
  387. return ErrInvalidPartition
  388. }
  389. msg.Partition = partitions[choice]
  390. return nil
  391. }
  392. // one per partition per topic
  393. // dispatches messages to the appropriate broker
  394. // also responsible for maintaining message order during retries
  395. type partitionProducer struct {
  396. parent *asyncProducer
  397. topic string
  398. partition int32
  399. input <-chan *ProducerMessage
  400. leader *Broker
  401. breaker *breaker.Breaker
  402. brokerProducer *brokerProducer
  403. // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
  404. // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
  405. // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
  406. // therefore whether our buffer is complete and safe to flush)
  407. highWatermark int
  408. retryState []partitionRetryState
  409. }
  410. type partitionRetryState struct {
  411. buf []*ProducerMessage
  412. expectChaser bool
  413. }
  414. func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
  415. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  416. pp := &partitionProducer{
  417. parent: p,
  418. topic: topic,
  419. partition: partition,
  420. input: input,
  421. breaker: breaker.New(3, 1, 10*time.Second),
  422. retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
  423. }
  424. go withRecover(pp.dispatch)
  425. return input
  426. }
  427. func (pp *partitionProducer) backoff(retries int) {
  428. var backoff time.Duration
  429. if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
  430. maxRetries := pp.parent.conf.Producer.Retry.Max
  431. backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
  432. } else {
  433. backoff = pp.parent.conf.Producer.Retry.Backoff
  434. }
  435. if backoff > 0 {
  436. time.Sleep(backoff)
  437. }
  438. }
  439. func (pp *partitionProducer) dispatch() {
  440. // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
  441. // on the first message
  442. pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
  443. if pp.leader != nil {
  444. pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
  445. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  446. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  447. }
  448. defer func() {
  449. if pp.brokerProducer != nil {
  450. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  451. }
  452. }()
  453. for msg := range pp.input {
  454. if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
  455. select {
  456. case <-pp.brokerProducer.abandoned:
  457. // a message on the abandoned channel means that our current broker selection is out of date
  458. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  459. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  460. pp.brokerProducer = nil
  461. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  462. default:
  463. // producer connection is still open.
  464. }
  465. }
  466. if msg.retries > pp.highWatermark {
  467. // a new, higher, retry level; handle it and then back off
  468. pp.newHighWatermark(msg.retries)
  469. pp.backoff(msg.retries)
  470. } else if pp.highWatermark > 0 {
  471. // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
  472. if msg.retries < pp.highWatermark {
  473. // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
  474. if msg.flags&fin == fin {
  475. pp.retryState[msg.retries].expectChaser = false
  476. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  477. } else {
  478. pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
  479. }
  480. continue
  481. } else if msg.flags&fin == fin {
  482. // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
  483. // meaning this retry level is done and we can go down (at least) one level and flush that
  484. pp.retryState[pp.highWatermark].expectChaser = false
  485. pp.flushRetryBuffers()
  486. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  487. continue
  488. }
  489. }
  490. // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
  491. // without breaking any of our ordering guarantees
  492. if pp.brokerProducer == nil {
  493. if err := pp.updateLeader(); err != nil {
  494. pp.parent.returnError(msg, err)
  495. pp.backoff(msg.retries)
  496. continue
  497. }
  498. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  499. }
  500. // Now that we know we have a broker to actually try and send this message to, generate the sequence
  501. // number for it.
  502. // All messages being retried (sent or not) have already had their retry count updated
  503. // Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
  504. if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
  505. msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
  506. msg.hasSequence = true
  507. }
  508. pp.brokerProducer.input <- msg
  509. }
  510. }
  511. func (pp *partitionProducer) newHighWatermark(hwm int) {
  512. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
  513. pp.highWatermark = hwm
  514. // send off a fin so that we know when everything "in between" has made it
  515. // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
  516. pp.retryState[pp.highWatermark].expectChaser = true
  517. pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
  518. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
  519. // a new HWM means that our current broker selection is out of date
  520. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  521. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  522. pp.brokerProducer = nil
  523. }
  524. func (pp *partitionProducer) flushRetryBuffers() {
  525. Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  526. for {
  527. pp.highWatermark--
  528. if pp.brokerProducer == nil {
  529. if err := pp.updateLeader(); err != nil {
  530. pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
  531. goto flushDone
  532. }
  533. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  534. }
  535. for _, msg := range pp.retryState[pp.highWatermark].buf {
  536. pp.brokerProducer.input <- msg
  537. }
  538. flushDone:
  539. pp.retryState[pp.highWatermark].buf = nil
  540. if pp.retryState[pp.highWatermark].expectChaser {
  541. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  542. break
  543. } else if pp.highWatermark == 0 {
  544. Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
  545. break
  546. }
  547. }
  548. }
  549. func (pp *partitionProducer) updateLeader() error {
  550. return pp.breaker.Run(func() (err error) {
  551. if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
  552. return err
  553. }
  554. if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
  555. return err
  556. }
  557. pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
  558. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  559. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  560. return nil
  561. })
  562. }
  563. // one per broker; also constructs an associated flusher
  564. func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
  565. var (
  566. input = make(chan *ProducerMessage)
  567. bridge = make(chan *produceSet)
  568. responses = make(chan *brokerProducerResponse)
  569. )
  570. bp := &brokerProducer{
  571. parent: p,
  572. broker: broker,
  573. input: input,
  574. output: bridge,
  575. responses: responses,
  576. stopchan: make(chan struct{}),
  577. buffer: newProduceSet(p),
  578. currentRetries: make(map[string]map[int32]error),
  579. }
  580. go withRecover(bp.run)
  581. // minimal bridge to make the network response `select`able
  582. go withRecover(func() {
  583. for set := range bridge {
  584. request := set.buildRequest()
  585. response, err := broker.Produce(request)
  586. responses <- &brokerProducerResponse{
  587. set: set,
  588. err: err,
  589. res: response,
  590. }
  591. }
  592. close(responses)
  593. })
  594. if p.conf.Producer.Retry.Max <= 0 {
  595. bp.abandoned = make(chan struct{})
  596. }
  597. return bp
  598. }
  599. type brokerProducerResponse struct {
  600. set *produceSet
  601. err error
  602. res *ProduceResponse
  603. }
  604. // groups messages together into appropriately-sized batches for sending to the broker
  605. // handles state related to retries etc
  606. type brokerProducer struct {
  607. parent *asyncProducer
  608. broker *Broker
  609. input chan *ProducerMessage
  610. output chan<- *produceSet
  611. responses <-chan *brokerProducerResponse
  612. abandoned chan struct{}
  613. stopchan chan struct{}
  614. buffer *produceSet
  615. timer <-chan time.Time
  616. timerFired bool
  617. closing error
  618. currentRetries map[string]map[int32]error
  619. }
  620. func (bp *brokerProducer) run() {
  621. var output chan<- *produceSet
  622. Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
  623. for {
  624. select {
  625. case msg, ok := <-bp.input:
  626. if !ok {
  627. Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
  628. bp.shutdown()
  629. return
  630. }
  631. if msg == nil {
  632. continue
  633. }
  634. if msg.flags&syn == syn {
  635. Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
  636. bp.broker.ID(), msg.Topic, msg.Partition)
  637. if bp.currentRetries[msg.Topic] == nil {
  638. bp.currentRetries[msg.Topic] = make(map[int32]error)
  639. }
  640. bp.currentRetries[msg.Topic][msg.Partition] = nil
  641. bp.parent.inFlight.Done()
  642. continue
  643. }
  644. if reason := bp.needsRetry(msg); reason != nil {
  645. bp.parent.retryMessage(msg, reason)
  646. if bp.closing == nil && msg.flags&fin == fin {
  647. // we were retrying this partition but we can start processing again
  648. delete(bp.currentRetries[msg.Topic], msg.Partition)
  649. Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
  650. bp.broker.ID(), msg.Topic, msg.Partition)
  651. }
  652. continue
  653. }
  654. if bp.buffer.wouldOverflow(msg) {
  655. Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
  656. if err := bp.waitForSpace(msg, false); err != nil {
  657. bp.parent.retryMessage(msg, err)
  658. continue
  659. }
  660. }
  661. if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
  662. // The epoch was reset, need to roll the buffer over
  663. Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
  664. if err := bp.waitForSpace(msg, true); err != nil {
  665. bp.parent.retryMessage(msg, err)
  666. continue
  667. }
  668. }
  669. if err := bp.buffer.add(msg); err != nil {
  670. bp.parent.returnError(msg, err)
  671. continue
  672. }
  673. if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
  674. bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
  675. }
  676. case <-bp.timer:
  677. bp.timerFired = true
  678. case output <- bp.buffer:
  679. bp.rollOver()
  680. case response, ok := <-bp.responses:
  681. if ok {
  682. bp.handleResponse(response)
  683. }
  684. case <-bp.stopchan:
  685. Logger.Printf(
  686. "producer/broker/%d run loop asked to stop\n", bp.broker.ID())
  687. return
  688. }
  689. if bp.timerFired || bp.buffer.readyToFlush() {
  690. output = bp.output
  691. } else {
  692. output = nil
  693. }
  694. }
  695. }
  696. func (bp *brokerProducer) shutdown() {
  697. for !bp.buffer.empty() {
  698. select {
  699. case response := <-bp.responses:
  700. bp.handleResponse(response)
  701. case bp.output <- bp.buffer:
  702. bp.rollOver()
  703. }
  704. }
  705. close(bp.output)
  706. for response := range bp.responses {
  707. bp.handleResponse(response)
  708. }
  709. close(bp.stopchan)
  710. Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
  711. }
  712. func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
  713. if bp.closing != nil {
  714. return bp.closing
  715. }
  716. return bp.currentRetries[msg.Topic][msg.Partition]
  717. }
  718. func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
  719. for {
  720. select {
  721. case response := <-bp.responses:
  722. bp.handleResponse(response)
  723. // handling a response can change our state, so re-check some things
  724. if reason := bp.needsRetry(msg); reason != nil {
  725. return reason
  726. } else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
  727. return nil
  728. }
  729. case bp.output <- bp.buffer:
  730. bp.rollOver()
  731. return nil
  732. }
  733. }
  734. }
  735. func (bp *brokerProducer) rollOver() {
  736. bp.timer = nil
  737. bp.timerFired = false
  738. bp.buffer = newProduceSet(bp.parent)
  739. }
  740. func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
  741. if response.err != nil {
  742. bp.handleError(response.set, response.err)
  743. } else {
  744. bp.handleSuccess(response.set, response.res)
  745. }
  746. if bp.buffer.empty() {
  747. bp.rollOver() // this can happen if the response invalidated our buffer
  748. }
  749. }
  750. func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
  751. // we iterate through the blocks in the request set, not the response, so that we notice
  752. // if the response is missing a block completely
  753. var retryTopics []string
  754. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  755. if response == nil {
  756. // this only happens when RequiredAcks is NoResponse, so we have to assume success
  757. bp.parent.returnSuccesses(pSet.msgs)
  758. return
  759. }
  760. block := response.GetBlock(topic, partition)
  761. if block == nil {
  762. bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
  763. return
  764. }
  765. switch block.Err {
  766. // Success
  767. case ErrNoError:
  768. if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
  769. for _, msg := range pSet.msgs {
  770. msg.Timestamp = block.Timestamp
  771. }
  772. }
  773. for i, msg := range pSet.msgs {
  774. msg.Offset = block.Offset + int64(i)
  775. }
  776. bp.parent.returnSuccesses(pSet.msgs)
  777. // Duplicate
  778. case ErrDuplicateSequenceNumber:
  779. bp.parent.returnSuccesses(pSet.msgs)
  780. // Retriable errors
  781. case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
  782. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  783. if bp.parent.conf.Producer.Retry.Max <= 0 {
  784. bp.parent.abandonBrokerConnection(bp.broker)
  785. bp.parent.returnErrors(pSet.msgs, block.Err)
  786. } else {
  787. retryTopics = append(retryTopics, topic)
  788. }
  789. // Other non-retriable errors
  790. default:
  791. if bp.parent.conf.Producer.Retry.Max <= 0 {
  792. bp.parent.abandonBrokerConnection(bp.broker)
  793. }
  794. bp.parent.returnErrors(pSet.msgs, block.Err)
  795. }
  796. })
  797. if len(retryTopics) > 0 {
  798. if bp.parent.conf.Producer.Idempotent {
  799. err := bp.parent.client.RefreshMetadata(retryTopics...)
  800. if err != nil {
  801. Logger.Printf("Failed refreshing metadata because of %v\n", err)
  802. }
  803. }
  804. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  805. block := response.GetBlock(topic, partition)
  806. if block == nil {
  807. // handled in the previous "eachPartition" loop
  808. return
  809. }
  810. switch block.Err {
  811. case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
  812. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  813. Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
  814. bp.broker.ID(), topic, partition, block.Err)
  815. if bp.currentRetries[topic] == nil {
  816. bp.currentRetries[topic] = make(map[int32]error)
  817. }
  818. bp.currentRetries[topic][partition] = block.Err
  819. if bp.parent.conf.Producer.Idempotent {
  820. go bp.parent.retryBatch(topic, partition, pSet, block.Err)
  821. } else {
  822. bp.parent.retryMessages(pSet.msgs, block.Err)
  823. }
  824. // dropping the following messages has the side effect of incrementing their retry count
  825. bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
  826. }
  827. })
  828. }
  829. }
  830. func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
  831. Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
  832. produceSet := newProduceSet(p)
  833. produceSet.msgs[topic] = make(map[int32]*partitionSet)
  834. produceSet.msgs[topic][partition] = pSet
  835. produceSet.bufferBytes += pSet.bufferBytes
  836. produceSet.bufferCount += len(pSet.msgs)
  837. for _, msg := range pSet.msgs {
  838. if msg.retries >= p.conf.Producer.Retry.Max {
  839. p.returnError(msg, kerr)
  840. return
  841. }
  842. msg.retries++
  843. }
  844. // it's expected that a metadata refresh has been requested prior to calling retryBatch
  845. leader, err := p.client.Leader(topic, partition)
  846. if err != nil {
  847. Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
  848. for _, msg := range pSet.msgs {
  849. p.returnError(msg, kerr)
  850. }
  851. return
  852. }
  853. bp := p.getBrokerProducer(leader)
  854. bp.output <- produceSet
  855. }
  856. func (bp *brokerProducer) handleError(sent *produceSet, err error) {
  857. switch err.(type) {
  858. case PacketEncodingError:
  859. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  860. bp.parent.returnErrors(pSet.msgs, err)
  861. })
  862. default:
  863. Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
  864. bp.parent.abandonBrokerConnection(bp.broker)
  865. _ = bp.broker.Close()
  866. bp.closing = err
  867. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  868. bp.parent.retryMessages(pSet.msgs, err)
  869. })
  870. bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  871. bp.parent.retryMessages(pSet.msgs, err)
  872. })
  873. bp.rollOver()
  874. }
  875. }
  876. // singleton
  877. // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
  878. // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
  879. func (p *asyncProducer) retryHandler() {
  880. var msg *ProducerMessage
  881. buf := queue.New()
  882. for {
  883. if buf.Length() == 0 {
  884. msg = <-p.retries
  885. } else {
  886. select {
  887. case msg = <-p.retries:
  888. case p.input <- buf.Peek().(*ProducerMessage):
  889. buf.Remove()
  890. continue
  891. }
  892. }
  893. if msg == nil {
  894. return
  895. }
  896. buf.Add(msg)
  897. }
  898. }
  899. // utility functions
  900. func (p *asyncProducer) shutdown() {
  901. Logger.Println("Producer shutting down.")
  902. p.inFlight.Add(1)
  903. p.input <- &ProducerMessage{flags: shutdown}
  904. p.inFlight.Wait()
  905. err := p.client.Close()
  906. if err != nil {
  907. Logger.Println("producer/shutdown failed to close the embedded client:", err)
  908. }
  909. close(p.input)
  910. close(p.retries)
  911. close(p.errors)
  912. close(p.successes)
  913. }
  914. func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
  915. // We need to reset the producer ID epoch if we set a sequence number on it, because the broker
  916. // will never see a message with this number, so we can never continue the sequence.
  917. if msg.hasSequence {
  918. Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
  919. p.txnmgr.bumpEpoch()
  920. }
  921. msg.clear()
  922. pErr := &ProducerError{Msg: msg, Err: err}
  923. if p.conf.Producer.Return.Errors {
  924. p.errors <- pErr
  925. } else {
  926. Logger.Println(pErr)
  927. }
  928. p.inFlight.Done()
  929. }
  930. func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
  931. for _, msg := range batch {
  932. p.returnError(msg, err)
  933. }
  934. }
  935. func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
  936. for _, msg := range batch {
  937. if p.conf.Producer.Return.Successes {
  938. msg.clear()
  939. p.successes <- msg
  940. }
  941. p.inFlight.Done()
  942. }
  943. }
  944. func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
  945. if msg.retries >= p.conf.Producer.Retry.Max {
  946. p.returnError(msg, err)
  947. } else {
  948. msg.retries++
  949. p.retries <- msg
  950. }
  951. }
  952. func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
  953. for _, msg := range batch {
  954. p.retryMessage(msg, err)
  955. }
  956. }
  957. func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
  958. p.brokerLock.Lock()
  959. defer p.brokerLock.Unlock()
  960. bp := p.brokers[broker]
  961. if bp == nil {
  962. bp = p.newBrokerProducer(broker)
  963. p.brokers[broker] = bp
  964. p.brokerRefs[bp] = 0
  965. }
  966. p.brokerRefs[bp]++
  967. return bp
  968. }
  969. func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
  970. p.brokerLock.Lock()
  971. defer p.brokerLock.Unlock()
  972. p.brokerRefs[bp]--
  973. if p.brokerRefs[bp] == 0 {
  974. close(bp.input)
  975. delete(p.brokerRefs, bp)
  976. if p.brokers[broker] == bp {
  977. delete(p.brokers, broker)
  978. }
  979. }
  980. }
  981. func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
  982. p.brokerLock.Lock()
  983. defer p.brokerLock.Unlock()
  984. bc, ok := p.brokers[broker]
  985. if ok && bc.abandoned != nil {
  986. close(bc.abandoned)
  987. }
  988. delete(p.brokers, broker)
  989. }