async_producer.go 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/eapache/go-resiliency/breaker"
  8. "github.com/eapache/queue"
  9. )
  10. // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
  11. // to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
  12. // and parses responses for errors. You must read from the Errors() channel or the
  13. // producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
  14. // leaks: it will not be garbage-collected automatically when it passes out of
  15. // scope.
  16. type AsyncProducer interface {
  17. // AsyncClose triggers a shutdown of the producer. The shutdown has completed
  18. // when both the Errors and Successes channels have been closed. When calling
  19. // AsyncClose, you *must* continue to read from those channels in order to
  20. // drain the results of any messages in flight.
  21. AsyncClose()
  22. // Close shuts down the producer and waits for any buffered messages to be
  23. // flushed. You must call this function before a producer object passes out of
  24. // scope, as it may otherwise leak memory. You must call this before calling
  25. // Close on the underlying client.
  26. Close() error
  27. // Input is the input channel for the user to write messages to that they
  28. // wish to send.
  29. Input() chan<- *ProducerMessage
  30. // Successes is the success output channel back to the user when Return.Successes is
  31. // enabled. If Return.Successes is true, you MUST read from this channel or the
  32. // Producer will deadlock. It is suggested that you send and read messages
  33. // together in a single select statement.
  34. Successes() <-chan *ProducerMessage
  35. // Errors is the error output channel back to the user. You MUST read from this
  36. // channel or the Producer will deadlock when the channel is full. Alternatively,
  37. // you can set Producer.Return.Errors in your config to false, which prevents
  38. // errors to be returned.
  39. Errors() <-chan *ProducerError
  40. }
  41. // transactionManager keeps the state necessary to ensure idempotent production
  42. type transactionManager struct {
  43. producerID int64
  44. producerEpoch int16
  45. sequenceNumbers map[string]int32
  46. mutex sync.Mutex
  47. }
  48. const (
  49. noProducerID = -1
  50. noProducerEpoch = -1
  51. )
  52. func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
  53. key := fmt.Sprintf("%s-%d", topic, partition)
  54. t.mutex.Lock()
  55. defer t.mutex.Unlock()
  56. sequence := t.sequenceNumbers[key]
  57. t.sequenceNumbers[key] = sequence + 1
  58. return sequence, t.producerEpoch
  59. }
  60. func (t *transactionManager) bumpEpoch() {
  61. t.mutex.Lock()
  62. defer t.mutex.Unlock()
  63. t.producerEpoch++
  64. for k := range t.sequenceNumbers {
  65. t.sequenceNumbers[k] = 0
  66. }
  67. }
  68. func (t *transactionManager) getProducerID() (int64, int16) {
  69. t.mutex.Lock()
  70. defer t.mutex.Unlock()
  71. return t.producerID, t.producerEpoch
  72. }
  73. func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
  74. txnmgr := &transactionManager{
  75. producerID: noProducerID,
  76. producerEpoch: noProducerEpoch,
  77. }
  78. if conf.Producer.Idempotent {
  79. initProducerIDResponse, err := client.InitProducerID()
  80. if err != nil {
  81. return nil, err
  82. }
  83. txnmgr.producerID = initProducerIDResponse.ProducerID
  84. txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
  85. txnmgr.sequenceNumbers = make(map[string]int32)
  86. txnmgr.mutex = sync.Mutex{}
  87. Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
  88. }
  89. return txnmgr, nil
  90. }
  91. type asyncProducer struct {
  92. client Client
  93. conf *Config
  94. errors chan *ProducerError
  95. input, successes, retries chan *ProducerMessage
  96. inFlight sync.WaitGroup
  97. brokers map[*Broker]*brokerProducer
  98. brokerRefs map[*brokerProducer]int
  99. brokerLock sync.Mutex
  100. txnmgr *transactionManager
  101. }
  102. // NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
  103. func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
  104. client, err := NewClient(addrs, conf)
  105. if err != nil {
  106. return nil, err
  107. }
  108. return newAsyncProducer(client)
  109. }
  110. // NewAsyncProducerFromClient creates a new Producer using the given client. It is still
  111. // necessary to call Close() on the underlying client when shutting down this producer.
  112. func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
  113. // For clients passed in by the client, ensure we don't
  114. // call Close() on it.
  115. cli := &nopCloserClient{client}
  116. return newAsyncProducer(cli)
  117. }
  118. func newAsyncProducer(client Client) (AsyncProducer, error) {
  119. // Check that we are not dealing with a closed Client before processing any other arguments
  120. if client.Closed() {
  121. return nil, ErrClosedClient
  122. }
  123. txnmgr, err := newTransactionManager(client.Config(), client)
  124. if err != nil {
  125. return nil, err
  126. }
  127. p := &asyncProducer{
  128. client: client,
  129. conf: client.Config(),
  130. errors: make(chan *ProducerError),
  131. input: make(chan *ProducerMessage),
  132. successes: make(chan *ProducerMessage),
  133. retries: make(chan *ProducerMessage),
  134. brokers: make(map[*Broker]*brokerProducer),
  135. brokerRefs: make(map[*brokerProducer]int),
  136. txnmgr: txnmgr,
  137. }
  138. // launch our singleton dispatchers
  139. go withRecover(p.dispatcher)
  140. go withRecover(p.retryHandler)
  141. return p, nil
  142. }
  143. type flagSet int8
  144. const (
  145. syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
  146. fin // final message from partitionProducer to brokerProducer and back
  147. shutdown // start the shutdown process
  148. )
  149. // ProducerMessage is the collection of elements passed to the Producer in order to send a message.
  150. type ProducerMessage struct {
  151. Topic string // The Kafka topic for this message.
  152. // The partitioning key for this message. Pre-existing Encoders include
  153. // StringEncoder and ByteEncoder.
  154. Key Encoder
  155. // The actual message to store in Kafka. Pre-existing Encoders include
  156. // StringEncoder and ByteEncoder.
  157. Value Encoder
  158. // The headers are key-value pairs that are transparently passed
  159. // by Kafka between producers and consumers.
  160. Headers []RecordHeader
  161. // This field is used to hold arbitrary data you wish to include so it
  162. // will be available when receiving on the Successes and Errors channels.
  163. // Sarama completely ignores this field and is only to be used for
  164. // pass-through data.
  165. Metadata interface{}
  166. // Below this point are filled in by the producer as the message is processed
  167. // Offset is the offset of the message stored on the broker. This is only
  168. // guaranteed to be defined if the message was successfully delivered and
  169. // RequiredAcks is not NoResponse.
  170. Offset int64
  171. // Partition is the partition that the message was sent to. This is only
  172. // guaranteed to be defined if the message was successfully delivered.
  173. Partition int32
  174. // Timestamp can vary in behaviour depending on broker configuration, being
  175. // in either one of the CreateTime or LogAppendTime modes (default CreateTime),
  176. // and requiring version at least 0.10.0.
  177. //
  178. // When configured to CreateTime, the timestamp is specified by the producer
  179. // either by explicitly setting this field, or when the message is added
  180. // to a produce set.
  181. //
  182. // When configured to LogAppendTime, the timestamp assigned to the message
  183. // by the broker. This is only guaranteed to be defined if the message was
  184. // successfully delivered and RequiredAcks is not NoResponse.
  185. Timestamp time.Time
  186. retries int
  187. flags flagSet
  188. expectation chan *ProducerError
  189. sequenceNumber int32
  190. producerEpoch int16
  191. hasSequence bool
  192. }
  193. const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
  194. func (m *ProducerMessage) byteSize(version int) int {
  195. var size int
  196. if version >= 2 {
  197. size = maximumRecordOverhead
  198. for _, h := range m.Headers {
  199. size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
  200. }
  201. } else {
  202. size = producerMessageOverhead
  203. }
  204. if m.Key != nil {
  205. size += m.Key.Length()
  206. }
  207. if m.Value != nil {
  208. size += m.Value.Length()
  209. }
  210. return size
  211. }
  212. func (m *ProducerMessage) clear() {
  213. m.flags = 0
  214. m.retries = 0
  215. m.sequenceNumber = 0
  216. m.producerEpoch = 0
  217. m.hasSequence = false
  218. }
  219. // ProducerError is the type of error generated when the producer fails to deliver a message.
  220. // It contains the original ProducerMessage as well as the actual error value.
  221. type ProducerError struct {
  222. Msg *ProducerMessage
  223. Err error
  224. }
  225. func (pe ProducerError) Error() string {
  226. return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
  227. }
  228. // ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
  229. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
  230. // when closing a producer.
  231. type ProducerErrors []*ProducerError
  232. func (pe ProducerErrors) Error() string {
  233. return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
  234. }
  235. func (p *asyncProducer) Errors() <-chan *ProducerError {
  236. return p.errors
  237. }
  238. func (p *asyncProducer) Successes() <-chan *ProducerMessage {
  239. return p.successes
  240. }
  241. func (p *asyncProducer) Input() chan<- *ProducerMessage {
  242. return p.input
  243. }
  244. func (p *asyncProducer) Close() error {
  245. p.AsyncClose()
  246. if p.conf.Producer.Return.Successes {
  247. go withRecover(func() {
  248. for range p.successes {
  249. }
  250. })
  251. }
  252. var errors ProducerErrors
  253. if p.conf.Producer.Return.Errors {
  254. for event := range p.errors {
  255. errors = append(errors, event)
  256. }
  257. } else {
  258. <-p.errors
  259. }
  260. if len(errors) > 0 {
  261. return errors
  262. }
  263. return nil
  264. }
  265. func (p *asyncProducer) AsyncClose() {
  266. go withRecover(p.shutdown)
  267. }
  268. // singleton
  269. // dispatches messages by topic
  270. func (p *asyncProducer) dispatcher() {
  271. handlers := make(map[string]chan<- *ProducerMessage)
  272. shuttingDown := false
  273. for msg := range p.input {
  274. if msg == nil {
  275. Logger.Println("Something tried to send a nil message, it was ignored.")
  276. continue
  277. }
  278. if msg.flags&shutdown != 0 {
  279. shuttingDown = true
  280. p.inFlight.Done()
  281. continue
  282. } else if msg.retries == 0 {
  283. if shuttingDown {
  284. // we can't just call returnError here because that decrements the wait group,
  285. // which hasn't been incremented yet for this message, and shouldn't be
  286. pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
  287. if p.conf.Producer.Return.Errors {
  288. p.errors <- pErr
  289. } else {
  290. Logger.Println(pErr)
  291. }
  292. continue
  293. }
  294. p.inFlight.Add(1)
  295. }
  296. for _, interceptor := range p.conf.Producer.Interceptors {
  297. msg.safelyApplyInterceptor(interceptor)
  298. }
  299. version := 1
  300. if p.conf.Version.IsAtLeast(V0_11_0_0) {
  301. version = 2
  302. } else if msg.Headers != nil {
  303. p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
  304. continue
  305. }
  306. if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
  307. p.returnError(msg, ErrMessageSizeTooLarge)
  308. continue
  309. }
  310. handler := handlers[msg.Topic]
  311. if handler == nil {
  312. handler = p.newTopicProducer(msg.Topic)
  313. handlers[msg.Topic] = handler
  314. }
  315. handler <- msg
  316. }
  317. for _, handler := range handlers {
  318. close(handler)
  319. }
  320. }
  321. // one per topic
  322. // partitions messages, then dispatches them by partition
  323. type topicProducer struct {
  324. parent *asyncProducer
  325. topic string
  326. input <-chan *ProducerMessage
  327. breaker *breaker.Breaker
  328. handlers map[int32]chan<- *ProducerMessage
  329. partitioner Partitioner
  330. }
  331. func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
  332. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  333. tp := &topicProducer{
  334. parent: p,
  335. topic: topic,
  336. input: input,
  337. breaker: breaker.New(3, 1, 10*time.Second),
  338. handlers: make(map[int32]chan<- *ProducerMessage),
  339. partitioner: p.conf.Producer.Partitioner(topic),
  340. }
  341. go withRecover(tp.dispatch)
  342. return input
  343. }
  344. func (tp *topicProducer) dispatch() {
  345. for msg := range tp.input {
  346. if msg.retries == 0 {
  347. if err := tp.partitionMessage(msg); err != nil {
  348. tp.parent.returnError(msg, err)
  349. continue
  350. }
  351. }
  352. handler := tp.handlers[msg.Partition]
  353. if handler == nil {
  354. handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
  355. tp.handlers[msg.Partition] = handler
  356. }
  357. handler <- msg
  358. }
  359. for _, handler := range tp.handlers {
  360. close(handler)
  361. }
  362. }
  363. func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
  364. var partitions []int32
  365. err := tp.breaker.Run(func() (err error) {
  366. requiresConsistency := false
  367. if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
  368. requiresConsistency = ep.MessageRequiresConsistency(msg)
  369. } else {
  370. requiresConsistency = tp.partitioner.RequiresConsistency()
  371. }
  372. if requiresConsistency {
  373. partitions, err = tp.parent.client.Partitions(msg.Topic)
  374. } else {
  375. partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
  376. }
  377. return
  378. })
  379. if err != nil {
  380. return err
  381. }
  382. numPartitions := int32(len(partitions))
  383. if numPartitions == 0 {
  384. return ErrLeaderNotAvailable
  385. }
  386. choice, err := tp.partitioner.Partition(msg, numPartitions)
  387. if err != nil {
  388. return err
  389. } else if choice < 0 || choice >= numPartitions {
  390. return ErrInvalidPartition
  391. }
  392. msg.Partition = partitions[choice]
  393. return nil
  394. }
  395. // one per partition per topic
  396. // dispatches messages to the appropriate broker
  397. // also responsible for maintaining message order during retries
  398. type partitionProducer struct {
  399. parent *asyncProducer
  400. topic string
  401. partition int32
  402. input <-chan *ProducerMessage
  403. leader *Broker
  404. breaker *breaker.Breaker
  405. brokerProducer *brokerProducer
  406. // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
  407. // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
  408. // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
  409. // therefore whether our buffer is complete and safe to flush)
  410. highWatermark int
  411. retryState []partitionRetryState
  412. }
  413. type partitionRetryState struct {
  414. buf []*ProducerMessage
  415. expectChaser bool
  416. }
  417. func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
  418. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  419. pp := &partitionProducer{
  420. parent: p,
  421. topic: topic,
  422. partition: partition,
  423. input: input,
  424. breaker: breaker.New(3, 1, 10*time.Second),
  425. retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
  426. }
  427. go withRecover(pp.dispatch)
  428. return input
  429. }
  430. func (pp *partitionProducer) backoff(retries int) {
  431. var backoff time.Duration
  432. if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
  433. maxRetries := pp.parent.conf.Producer.Retry.Max
  434. backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
  435. } else {
  436. backoff = pp.parent.conf.Producer.Retry.Backoff
  437. }
  438. if backoff > 0 {
  439. time.Sleep(backoff)
  440. }
  441. }
  442. func (pp *partitionProducer) dispatch() {
  443. // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
  444. // on the first message
  445. pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
  446. if pp.leader != nil {
  447. pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
  448. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  449. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  450. }
  451. defer func() {
  452. if pp.brokerProducer != nil {
  453. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  454. }
  455. }()
  456. for msg := range pp.input {
  457. if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
  458. select {
  459. case <-pp.brokerProducer.abandoned:
  460. // a message on the abandoned channel means that our current broker selection is out of date
  461. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  462. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  463. pp.brokerProducer = nil
  464. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  465. default:
  466. // producer connection is still open.
  467. }
  468. }
  469. if msg.retries > pp.highWatermark {
  470. // a new, higher, retry level; handle it and then back off
  471. pp.newHighWatermark(msg.retries)
  472. pp.backoff(msg.retries)
  473. } else if pp.highWatermark > 0 {
  474. // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
  475. if msg.retries < pp.highWatermark {
  476. // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
  477. if msg.flags&fin == fin {
  478. pp.retryState[msg.retries].expectChaser = false
  479. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  480. } else {
  481. pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
  482. }
  483. continue
  484. } else if msg.flags&fin == fin {
  485. // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
  486. // meaning this retry level is done and we can go down (at least) one level and flush that
  487. pp.retryState[pp.highWatermark].expectChaser = false
  488. pp.flushRetryBuffers()
  489. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  490. continue
  491. }
  492. }
  493. // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
  494. // without breaking any of our ordering guarantees
  495. if pp.brokerProducer == nil {
  496. if err := pp.updateLeader(); err != nil {
  497. pp.parent.returnError(msg, err)
  498. pp.backoff(msg.retries)
  499. continue
  500. }
  501. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  502. }
  503. // Now that we know we have a broker to actually try and send this message to, generate the sequence
  504. // number for it.
  505. // All messages being retried (sent or not) have already had their retry count updated
  506. // Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
  507. if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
  508. msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
  509. msg.hasSequence = true
  510. }
  511. pp.brokerProducer.input <- msg
  512. }
  513. }
  514. func (pp *partitionProducer) newHighWatermark(hwm int) {
  515. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
  516. pp.highWatermark = hwm
  517. // send off a fin so that we know when everything "in between" has made it
  518. // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
  519. pp.retryState[pp.highWatermark].expectChaser = true
  520. pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
  521. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
  522. // a new HWM means that our current broker selection is out of date
  523. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  524. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  525. pp.brokerProducer = nil
  526. }
  527. func (pp *partitionProducer) flushRetryBuffers() {
  528. Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  529. for {
  530. pp.highWatermark--
  531. if pp.brokerProducer == nil {
  532. if err := pp.updateLeader(); err != nil {
  533. pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
  534. goto flushDone
  535. }
  536. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  537. }
  538. for _, msg := range pp.retryState[pp.highWatermark].buf {
  539. pp.brokerProducer.input <- msg
  540. }
  541. flushDone:
  542. pp.retryState[pp.highWatermark].buf = nil
  543. if pp.retryState[pp.highWatermark].expectChaser {
  544. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  545. break
  546. } else if pp.highWatermark == 0 {
  547. Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
  548. break
  549. }
  550. }
  551. }
  552. func (pp *partitionProducer) updateLeader() error {
  553. return pp.breaker.Run(func() (err error) {
  554. if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
  555. return err
  556. }
  557. if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
  558. return err
  559. }
  560. pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
  561. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  562. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  563. return nil
  564. })
  565. }
  566. // one per broker; also constructs an associated flusher
  567. func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
  568. var (
  569. input = make(chan *ProducerMessage)
  570. bridge = make(chan *produceSet)
  571. responses = make(chan *brokerProducerResponse)
  572. )
  573. bp := &brokerProducer{
  574. parent: p,
  575. broker: broker,
  576. input: input,
  577. output: bridge,
  578. responses: responses,
  579. stopchan: make(chan struct{}),
  580. buffer: newProduceSet(p),
  581. currentRetries: make(map[string]map[int32]error),
  582. }
  583. go withRecover(bp.run)
  584. // minimal bridge to make the network response `select`able
  585. go withRecover(func() {
  586. for set := range bridge {
  587. request := set.buildRequest()
  588. response, err := broker.Produce(request)
  589. responses <- &brokerProducerResponse{
  590. set: set,
  591. err: err,
  592. res: response,
  593. }
  594. }
  595. close(responses)
  596. })
  597. if p.conf.Producer.Retry.Max <= 0 {
  598. bp.abandoned = make(chan struct{})
  599. }
  600. return bp
  601. }
  602. type brokerProducerResponse struct {
  603. set *produceSet
  604. err error
  605. res *ProduceResponse
  606. }
  607. // groups messages together into appropriately-sized batches for sending to the broker
  608. // handles state related to retries etc
  609. type brokerProducer struct {
  610. parent *asyncProducer
  611. broker *Broker
  612. input chan *ProducerMessage
  613. output chan<- *produceSet
  614. responses <-chan *brokerProducerResponse
  615. abandoned chan struct{}
  616. stopchan chan struct{}
  617. buffer *produceSet
  618. timer <-chan time.Time
  619. timerFired bool
  620. closing error
  621. currentRetries map[string]map[int32]error
  622. }
  623. func (bp *brokerProducer) run() {
  624. var output chan<- *produceSet
  625. Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
  626. for {
  627. select {
  628. case msg, ok := <-bp.input:
  629. if !ok {
  630. Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
  631. bp.shutdown()
  632. return
  633. }
  634. if msg == nil {
  635. continue
  636. }
  637. if msg.flags&syn == syn {
  638. Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
  639. bp.broker.ID(), msg.Topic, msg.Partition)
  640. if bp.currentRetries[msg.Topic] == nil {
  641. bp.currentRetries[msg.Topic] = make(map[int32]error)
  642. }
  643. bp.currentRetries[msg.Topic][msg.Partition] = nil
  644. bp.parent.inFlight.Done()
  645. continue
  646. }
  647. if reason := bp.needsRetry(msg); reason != nil {
  648. bp.parent.retryMessage(msg, reason)
  649. if bp.closing == nil && msg.flags&fin == fin {
  650. // we were retrying this partition but we can start processing again
  651. delete(bp.currentRetries[msg.Topic], msg.Partition)
  652. Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
  653. bp.broker.ID(), msg.Topic, msg.Partition)
  654. }
  655. continue
  656. }
  657. if bp.buffer.wouldOverflow(msg) {
  658. Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
  659. if err := bp.waitForSpace(msg, false); err != nil {
  660. bp.parent.retryMessage(msg, err)
  661. continue
  662. }
  663. }
  664. if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
  665. // The epoch was reset, need to roll the buffer over
  666. Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
  667. if err := bp.waitForSpace(msg, true); err != nil {
  668. bp.parent.retryMessage(msg, err)
  669. continue
  670. }
  671. }
  672. if err := bp.buffer.add(msg); err != nil {
  673. bp.parent.returnError(msg, err)
  674. continue
  675. }
  676. if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
  677. bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
  678. }
  679. case <-bp.timer:
  680. bp.timerFired = true
  681. case output <- bp.buffer:
  682. bp.rollOver()
  683. case response, ok := <-bp.responses:
  684. if ok {
  685. bp.handleResponse(response)
  686. }
  687. case <-bp.stopchan:
  688. Logger.Printf(
  689. "producer/broker/%d run loop asked to stop\n", bp.broker.ID())
  690. return
  691. }
  692. if bp.timerFired || bp.buffer.readyToFlush() {
  693. output = bp.output
  694. } else {
  695. output = nil
  696. }
  697. }
  698. }
  699. func (bp *brokerProducer) shutdown() {
  700. for !bp.buffer.empty() {
  701. select {
  702. case response := <-bp.responses:
  703. bp.handleResponse(response)
  704. case bp.output <- bp.buffer:
  705. bp.rollOver()
  706. }
  707. }
  708. close(bp.output)
  709. for response := range bp.responses {
  710. bp.handleResponse(response)
  711. }
  712. close(bp.stopchan)
  713. Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
  714. }
  715. func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
  716. if bp.closing != nil {
  717. return bp.closing
  718. }
  719. return bp.currentRetries[msg.Topic][msg.Partition]
  720. }
  721. func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
  722. for {
  723. select {
  724. case response := <-bp.responses:
  725. bp.handleResponse(response)
  726. // handling a response can change our state, so re-check some things
  727. if reason := bp.needsRetry(msg); reason != nil {
  728. return reason
  729. } else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
  730. return nil
  731. }
  732. case bp.output <- bp.buffer:
  733. bp.rollOver()
  734. return nil
  735. }
  736. }
  737. }
  738. func (bp *brokerProducer) rollOver() {
  739. bp.timer = nil
  740. bp.timerFired = false
  741. bp.buffer = newProduceSet(bp.parent)
  742. }
  743. func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
  744. if response.err != nil {
  745. bp.handleError(response.set, response.err)
  746. } else {
  747. bp.handleSuccess(response.set, response.res)
  748. }
  749. if bp.buffer.empty() {
  750. bp.rollOver() // this can happen if the response invalidated our buffer
  751. }
  752. }
  753. func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
  754. // we iterate through the blocks in the request set, not the response, so that we notice
  755. // if the response is missing a block completely
  756. var retryTopics []string
  757. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  758. if response == nil {
  759. // this only happens when RequiredAcks is NoResponse, so we have to assume success
  760. bp.parent.returnSuccesses(pSet.msgs)
  761. return
  762. }
  763. block := response.GetBlock(topic, partition)
  764. if block == nil {
  765. bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
  766. return
  767. }
  768. switch block.Err {
  769. // Success
  770. case ErrNoError:
  771. if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
  772. for _, msg := range pSet.msgs {
  773. msg.Timestamp = block.Timestamp
  774. }
  775. }
  776. for i, msg := range pSet.msgs {
  777. msg.Offset = block.Offset + int64(i)
  778. }
  779. bp.parent.returnSuccesses(pSet.msgs)
  780. // Duplicate
  781. case ErrDuplicateSequenceNumber:
  782. bp.parent.returnSuccesses(pSet.msgs)
  783. // Retriable errors
  784. case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
  785. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  786. if bp.parent.conf.Producer.Retry.Max <= 0 {
  787. bp.parent.abandonBrokerConnection(bp.broker)
  788. bp.parent.returnErrors(pSet.msgs, block.Err)
  789. } else {
  790. retryTopics = append(retryTopics, topic)
  791. }
  792. // Other non-retriable errors
  793. default:
  794. if bp.parent.conf.Producer.Retry.Max <= 0 {
  795. bp.parent.abandonBrokerConnection(bp.broker)
  796. }
  797. bp.parent.returnErrors(pSet.msgs, block.Err)
  798. }
  799. })
  800. if len(retryTopics) > 0 {
  801. if bp.parent.conf.Producer.Idempotent {
  802. err := bp.parent.client.RefreshMetadata(retryTopics...)
  803. if err != nil {
  804. Logger.Printf("Failed refreshing metadata because of %v\n", err)
  805. }
  806. }
  807. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  808. block := response.GetBlock(topic, partition)
  809. if block == nil {
  810. // handled in the previous "eachPartition" loop
  811. return
  812. }
  813. switch block.Err {
  814. case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
  815. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  816. Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
  817. bp.broker.ID(), topic, partition, block.Err)
  818. if bp.currentRetries[topic] == nil {
  819. bp.currentRetries[topic] = make(map[int32]error)
  820. }
  821. bp.currentRetries[topic][partition] = block.Err
  822. if bp.parent.conf.Producer.Idempotent {
  823. go bp.parent.retryBatch(topic, partition, pSet, block.Err)
  824. } else {
  825. bp.parent.retryMessages(pSet.msgs, block.Err)
  826. }
  827. // dropping the following messages has the side effect of incrementing their retry count
  828. bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
  829. }
  830. })
  831. }
  832. }
  833. func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
  834. Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
  835. produceSet := newProduceSet(p)
  836. produceSet.msgs[topic] = make(map[int32]*partitionSet)
  837. produceSet.msgs[topic][partition] = pSet
  838. produceSet.bufferBytes += pSet.bufferBytes
  839. produceSet.bufferCount += len(pSet.msgs)
  840. for _, msg := range pSet.msgs {
  841. if msg.retries >= p.conf.Producer.Retry.Max {
  842. p.returnError(msg, kerr)
  843. return
  844. }
  845. msg.retries++
  846. }
  847. // it's expected that a metadata refresh has been requested prior to calling retryBatch
  848. leader, err := p.client.Leader(topic, partition)
  849. if err != nil {
  850. Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
  851. for _, msg := range pSet.msgs {
  852. p.returnError(msg, kerr)
  853. }
  854. return
  855. }
  856. bp := p.getBrokerProducer(leader)
  857. bp.output <- produceSet
  858. }
  859. func (bp *brokerProducer) handleError(sent *produceSet, err error) {
  860. switch err.(type) {
  861. case PacketEncodingError:
  862. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  863. bp.parent.returnErrors(pSet.msgs, err)
  864. })
  865. default:
  866. Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
  867. bp.parent.abandonBrokerConnection(bp.broker)
  868. _ = bp.broker.Close()
  869. bp.closing = err
  870. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  871. bp.parent.retryMessages(pSet.msgs, err)
  872. })
  873. bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  874. bp.parent.retryMessages(pSet.msgs, err)
  875. })
  876. bp.rollOver()
  877. }
  878. }
  879. // singleton
  880. // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
  881. // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
  882. func (p *asyncProducer) retryHandler() {
  883. var msg *ProducerMessage
  884. buf := queue.New()
  885. for {
  886. if buf.Length() == 0 {
  887. msg = <-p.retries
  888. } else {
  889. select {
  890. case msg = <-p.retries:
  891. case p.input <- buf.Peek().(*ProducerMessage):
  892. buf.Remove()
  893. continue
  894. }
  895. }
  896. if msg == nil {
  897. return
  898. }
  899. buf.Add(msg)
  900. }
  901. }
  902. // utility functions
  903. func (p *asyncProducer) shutdown() {
  904. Logger.Println("Producer shutting down.")
  905. p.inFlight.Add(1)
  906. p.input <- &ProducerMessage{flags: shutdown}
  907. p.inFlight.Wait()
  908. err := p.client.Close()
  909. if err != nil {
  910. Logger.Println("producer/shutdown failed to close the embedded client:", err)
  911. }
  912. close(p.input)
  913. close(p.retries)
  914. close(p.errors)
  915. close(p.successes)
  916. }
  917. func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
  918. // We need to reset the producer ID epoch if we set a sequence number on it, because the broker
  919. // will never see a message with this number, so we can never continue the sequence.
  920. if msg.hasSequence {
  921. Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
  922. p.txnmgr.bumpEpoch()
  923. }
  924. msg.clear()
  925. pErr := &ProducerError{Msg: msg, Err: err}
  926. if p.conf.Producer.Return.Errors {
  927. p.errors <- pErr
  928. } else {
  929. Logger.Println(pErr)
  930. }
  931. p.inFlight.Done()
  932. }
  933. func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
  934. for _, msg := range batch {
  935. p.returnError(msg, err)
  936. }
  937. }
  938. func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
  939. for _, msg := range batch {
  940. if p.conf.Producer.Return.Successes {
  941. msg.clear()
  942. p.successes <- msg
  943. }
  944. p.inFlight.Done()
  945. }
  946. }
  947. func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
  948. if msg.retries >= p.conf.Producer.Retry.Max {
  949. p.returnError(msg, err)
  950. } else {
  951. msg.retries++
  952. p.retries <- msg
  953. }
  954. }
  955. func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
  956. for _, msg := range batch {
  957. p.retryMessage(msg, err)
  958. }
  959. }
  960. func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
  961. p.brokerLock.Lock()
  962. defer p.brokerLock.Unlock()
  963. bp := p.brokers[broker]
  964. if bp == nil {
  965. bp = p.newBrokerProducer(broker)
  966. p.brokers[broker] = bp
  967. p.brokerRefs[bp] = 0
  968. }
  969. p.brokerRefs[bp]++
  970. return bp
  971. }
  972. func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
  973. p.brokerLock.Lock()
  974. defer p.brokerLock.Unlock()
  975. p.brokerRefs[bp]--
  976. if p.brokerRefs[bp] == 0 {
  977. close(bp.input)
  978. delete(p.brokerRefs, bp)
  979. if p.brokers[broker] == bp {
  980. delete(p.brokers, broker)
  981. }
  982. }
  983. }
  984. func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
  985. p.brokerLock.Lock()
  986. defer p.brokerLock.Unlock()
  987. bc, ok := p.brokers[broker]
  988. if ok && bc.abandoned != nil {
  989. close(bc.abandoned)
  990. }
  991. delete(p.brokers, broker)
  992. }