async_producer.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/eapache/go-resiliency/breaker"
  8. "github.com/eapache/queue"
  9. )
  10. // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
  11. // to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
  12. // and parses responses for errors. You must read from the Errors() channel or the
  13. // producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
  14. // leaks: it will not be garbage-collected automatically when it passes out of
  15. // scope.
  16. type AsyncProducer interface {
  17. // AsyncClose triggers a shutdown of the producer. The shutdown has completed
  18. // when both the Errors and Successes channels have been closed. When calling
  19. // AsyncClose, you *must* continue to read from those channels in order to
  20. // drain the results of any messages in flight.
  21. AsyncClose()
  22. // Close shuts down the producer and waits for any buffered messages to be
  23. // flushed. You must call this function before a producer object passes out of
  24. // scope, as it may otherwise leak memory. You must call this before calling
  25. // Close on the underlying client.
  26. Close() error
  27. // Input is the input channel for the user to write messages to that they
  28. // wish to send.
  29. Input() chan<- *ProducerMessage
  30. // Successes is the success output channel back to the user when Return.Successes is
  31. // enabled. If Return.Successes is true, you MUST read from this channel or the
  32. // Producer will deadlock. It is suggested that you send and read messages
  33. // together in a single select statement.
  34. Successes() <-chan *ProducerMessage
  35. // Errors is the error output channel back to the user. You MUST read from this
  36. // channel or the Producer will deadlock when the channel is full. Alternatively,
  37. // you can set Producer.Return.Errors in your config to false, which prevents
  38. // errors to be returned.
  39. Errors() <-chan *ProducerError
  40. }
  41. // transactionManager keeps the state necessary to ensure idempotent production
  42. type transactionManager struct {
  43. producerID int64
  44. producerEpoch int16
  45. sequenceNumbers map[string]int32
  46. mutex sync.Mutex
  47. }
  48. const (
  49. noProducerID = -1
  50. noProducerEpoch = -1
  51. )
  52. func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
  53. key := fmt.Sprintf("%s-%d", topic, partition)
  54. t.mutex.Lock()
  55. defer t.mutex.Unlock()
  56. sequence := t.sequenceNumbers[key]
  57. t.sequenceNumbers[key] = sequence + 1
  58. return sequence
  59. }
  60. func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
  61. txnmgr := &transactionManager{
  62. producerID: noProducerID,
  63. producerEpoch: noProducerEpoch,
  64. }
  65. if conf.Producer.Idempotent {
  66. initProducerIDResponse, err := client.InitProducerID()
  67. if err != nil {
  68. return nil, err
  69. }
  70. txnmgr.producerID = initProducerIDResponse.ProducerID
  71. txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
  72. txnmgr.sequenceNumbers = make(map[string]int32)
  73. txnmgr.mutex = sync.Mutex{}
  74. Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
  75. }
  76. return txnmgr, nil
  77. }
  78. type asyncProducer struct {
  79. client Client
  80. conf *Config
  81. errors chan *ProducerError
  82. input, successes, retries chan *ProducerMessage
  83. inFlight sync.WaitGroup
  84. brokers map[*Broker]*brokerProducer
  85. brokerRefs map[*brokerProducer]int
  86. brokerLock sync.Mutex
  87. txnmgr *transactionManager
  88. }
  89. // NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
  90. func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
  91. client, err := NewClient(addrs, conf)
  92. if err != nil {
  93. return nil, err
  94. }
  95. return newAsyncProducer(client)
  96. }
  97. // NewAsyncProducerFromClient creates a new Producer using the given client. It is still
  98. // necessary to call Close() on the underlying client when shutting down this producer.
  99. func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
  100. // For clients passed in by the client, ensure we don't
  101. // call Close() on it.
  102. cli := &nopCloserClient{client}
  103. return newAsyncProducer(cli)
  104. }
  105. func newAsyncProducer(client Client) (AsyncProducer, error) {
  106. // Check that we are not dealing with a closed Client before processing any other arguments
  107. if client.Closed() {
  108. return nil, ErrClosedClient
  109. }
  110. txnmgr, err := newTransactionManager(client.Config(), client)
  111. if err != nil {
  112. return nil, err
  113. }
  114. p := &asyncProducer{
  115. client: client,
  116. conf: client.Config(),
  117. errors: make(chan *ProducerError),
  118. input: make(chan *ProducerMessage),
  119. successes: make(chan *ProducerMessage),
  120. retries: make(chan *ProducerMessage),
  121. brokers: make(map[*Broker]*brokerProducer),
  122. brokerRefs: make(map[*brokerProducer]int),
  123. txnmgr: txnmgr,
  124. }
  125. // launch our singleton dispatchers
  126. go withRecover(p.dispatcher)
  127. go withRecover(p.retryHandler)
  128. return p, nil
  129. }
  130. type flagSet int8
  131. const (
  132. syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
  133. fin // final message from partitionProducer to brokerProducer and back
  134. shutdown // start the shutdown process
  135. )
  136. // ProducerMessage is the collection of elements passed to the Producer in order to send a message.
  137. type ProducerMessage struct {
  138. Topic string // The Kafka topic for this message.
  139. // The partitioning key for this message. Pre-existing Encoders include
  140. // StringEncoder and ByteEncoder.
  141. Key Encoder
  142. // The actual message to store in Kafka. Pre-existing Encoders include
  143. // StringEncoder and ByteEncoder.
  144. Value Encoder
  145. // The headers are key-value pairs that are transparently passed
  146. // by Kafka between producers and consumers.
  147. Headers []RecordHeader
  148. // This field is used to hold arbitrary data you wish to include so it
  149. // will be available when receiving on the Successes and Errors channels.
  150. // Sarama completely ignores this field and is only to be used for
  151. // pass-through data.
  152. Metadata interface{}
  153. // Below this point are filled in by the producer as the message is processed
  154. // Offset is the offset of the message stored on the broker. This is only
  155. // guaranteed to be defined if the message was successfully delivered and
  156. // RequiredAcks is not NoResponse.
  157. Offset int64
  158. // Partition is the partition that the message was sent to. This is only
  159. // guaranteed to be defined if the message was successfully delivered.
  160. Partition int32
  161. // Timestamp can vary in behaviour depending on broker configuration, being
  162. // in either one of the CreateTime or LogAppendTime modes (default CreateTime),
  163. // and requiring version at least 0.10.0.
  164. //
  165. // When configured to CreateTime, the timestamp is specified by the producer
  166. // either by explicitly setting this field, or when the message is added
  167. // to a produce set.
  168. //
  169. // When configured to LogAppendTime, the timestamp assigned to the message
  170. // by the broker. This is only guaranteed to be defined if the message was
  171. // successfully delivered and RequiredAcks is not NoResponse.
  172. Timestamp time.Time
  173. retries int
  174. flags flagSet
  175. expectation chan *ProducerError
  176. sequenceNumber int32
  177. }
  178. const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
  179. func (m *ProducerMessage) byteSize(version int) int {
  180. var size int
  181. if version >= 2 {
  182. size = maximumRecordOverhead
  183. for _, h := range m.Headers {
  184. size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
  185. }
  186. } else {
  187. size = producerMessageOverhead
  188. }
  189. if m.Key != nil {
  190. size += m.Key.Length()
  191. }
  192. if m.Value != nil {
  193. size += m.Value.Length()
  194. }
  195. return size
  196. }
  197. func (m *ProducerMessage) clear() {
  198. m.flags = 0
  199. m.retries = 0
  200. }
  201. // ProducerError is the type of error generated when the producer fails to deliver a message.
  202. // It contains the original ProducerMessage as well as the actual error value.
  203. type ProducerError struct {
  204. Msg *ProducerMessage
  205. Err error
  206. }
  207. func (pe ProducerError) Error() string {
  208. return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
  209. }
  210. // ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
  211. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
  212. // when closing a producer.
  213. type ProducerErrors []*ProducerError
  214. func (pe ProducerErrors) Error() string {
  215. return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
  216. }
  217. func (p *asyncProducer) Errors() <-chan *ProducerError {
  218. return p.errors
  219. }
  220. func (p *asyncProducer) Successes() <-chan *ProducerMessage {
  221. return p.successes
  222. }
  223. func (p *asyncProducer) Input() chan<- *ProducerMessage {
  224. return p.input
  225. }
  226. func (p *asyncProducer) Close() error {
  227. p.AsyncClose()
  228. if p.conf.Producer.Return.Successes {
  229. go withRecover(func() {
  230. for range p.successes {
  231. }
  232. })
  233. }
  234. var errors ProducerErrors
  235. if p.conf.Producer.Return.Errors {
  236. for event := range p.errors {
  237. errors = append(errors, event)
  238. }
  239. } else {
  240. <-p.errors
  241. }
  242. if len(errors) > 0 {
  243. return errors
  244. }
  245. return nil
  246. }
  247. func (p *asyncProducer) AsyncClose() {
  248. go withRecover(p.shutdown)
  249. }
  250. // singleton
  251. // dispatches messages by topic
  252. func (p *asyncProducer) dispatcher() {
  253. handlers := make(map[string]chan<- *ProducerMessage)
  254. shuttingDown := false
  255. for msg := range p.input {
  256. if msg == nil {
  257. Logger.Println("Something tried to send a nil message, it was ignored.")
  258. continue
  259. }
  260. if msg.flags&shutdown != 0 {
  261. shuttingDown = true
  262. p.inFlight.Done()
  263. continue
  264. } else if msg.retries == 0 {
  265. if shuttingDown {
  266. // we can't just call returnError here because that decrements the wait group,
  267. // which hasn't been incremented yet for this message, and shouldn't be
  268. pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
  269. if p.conf.Producer.Return.Errors {
  270. p.errors <- pErr
  271. } else {
  272. Logger.Println(pErr)
  273. }
  274. continue
  275. }
  276. p.inFlight.Add(1)
  277. }
  278. version := 1
  279. if p.conf.Version.IsAtLeast(V0_11_0_0) {
  280. version = 2
  281. } else if msg.Headers != nil {
  282. p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
  283. continue
  284. }
  285. if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
  286. p.returnError(msg, ErrMessageSizeTooLarge)
  287. continue
  288. }
  289. handler := handlers[msg.Topic]
  290. if handler == nil {
  291. handler = p.newTopicProducer(msg.Topic)
  292. handlers[msg.Topic] = handler
  293. }
  294. handler <- msg
  295. }
  296. for _, handler := range handlers {
  297. close(handler)
  298. }
  299. }
  300. // one per topic
  301. // partitions messages, then dispatches them by partition
  302. type topicProducer struct {
  303. parent *asyncProducer
  304. topic string
  305. input <-chan *ProducerMessage
  306. breaker *breaker.Breaker
  307. handlers map[int32]chan<- *ProducerMessage
  308. partitioner Partitioner
  309. }
  310. func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
  311. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  312. tp := &topicProducer{
  313. parent: p,
  314. topic: topic,
  315. input: input,
  316. breaker: breaker.New(3, 1, 10*time.Second),
  317. handlers: make(map[int32]chan<- *ProducerMessage),
  318. partitioner: p.conf.Producer.Partitioner(topic),
  319. }
  320. go withRecover(tp.dispatch)
  321. return input
  322. }
  323. func (tp *topicProducer) dispatch() {
  324. for msg := range tp.input {
  325. if msg.retries == 0 {
  326. if err := tp.partitionMessage(msg); err != nil {
  327. tp.parent.returnError(msg, err)
  328. continue
  329. }
  330. }
  331. // All messages being retried (sent or not) have already had their retry count updated
  332. if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
  333. msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
  334. }
  335. handler := tp.handlers[msg.Partition]
  336. if handler == nil {
  337. handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
  338. tp.handlers[msg.Partition] = handler
  339. }
  340. handler <- msg
  341. }
  342. for _, handler := range tp.handlers {
  343. close(handler)
  344. }
  345. }
  346. func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
  347. var partitions []int32
  348. err := tp.breaker.Run(func() (err error) {
  349. requiresConsistency := false
  350. if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
  351. requiresConsistency = ep.MessageRequiresConsistency(msg)
  352. } else {
  353. requiresConsistency = tp.partitioner.RequiresConsistency()
  354. }
  355. if requiresConsistency {
  356. partitions, err = tp.parent.client.Partitions(msg.Topic)
  357. } else {
  358. partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
  359. }
  360. return
  361. })
  362. if err != nil {
  363. return err
  364. }
  365. numPartitions := int32(len(partitions))
  366. if numPartitions == 0 {
  367. return ErrLeaderNotAvailable
  368. }
  369. choice, err := tp.partitioner.Partition(msg, numPartitions)
  370. if err != nil {
  371. return err
  372. } else if choice < 0 || choice >= numPartitions {
  373. return ErrInvalidPartition
  374. }
  375. msg.Partition = partitions[choice]
  376. return nil
  377. }
  378. // one per partition per topic
  379. // dispatches messages to the appropriate broker
  380. // also responsible for maintaining message order during retries
  381. type partitionProducer struct {
  382. parent *asyncProducer
  383. topic string
  384. partition int32
  385. input <-chan *ProducerMessage
  386. leader *Broker
  387. breaker *breaker.Breaker
  388. brokerProducer *brokerProducer
  389. // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
  390. // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
  391. // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
  392. // therefore whether our buffer is complete and safe to flush)
  393. highWatermark int
  394. retryState []partitionRetryState
  395. }
  396. type partitionRetryState struct {
  397. buf []*ProducerMessage
  398. expectChaser bool
  399. }
  400. func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
  401. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  402. pp := &partitionProducer{
  403. parent: p,
  404. topic: topic,
  405. partition: partition,
  406. input: input,
  407. breaker: breaker.New(3, 1, 10*time.Second),
  408. retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
  409. }
  410. go withRecover(pp.dispatch)
  411. return input
  412. }
  413. func (pp *partitionProducer) backoff(retries int) {
  414. var backoff time.Duration
  415. if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
  416. maxRetries := pp.parent.conf.Producer.Retry.Max
  417. backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
  418. } else {
  419. backoff = pp.parent.conf.Producer.Retry.Backoff
  420. }
  421. if backoff > 0 {
  422. time.Sleep(backoff)
  423. }
  424. }
  425. func (pp *partitionProducer) dispatch() {
  426. // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
  427. // on the first message
  428. pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
  429. if pp.leader != nil {
  430. pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
  431. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  432. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  433. }
  434. defer func() {
  435. if pp.brokerProducer != nil {
  436. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  437. }
  438. }()
  439. for msg := range pp.input {
  440. if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
  441. select {
  442. case <-pp.brokerProducer.abandoned:
  443. // a message on the abandoned channel means that our current broker selection is out of date
  444. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  445. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  446. pp.brokerProducer = nil
  447. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  448. default:
  449. // producer connection is still open.
  450. }
  451. }
  452. if msg.retries > pp.highWatermark {
  453. // a new, higher, retry level; handle it and then back off
  454. pp.newHighWatermark(msg.retries)
  455. pp.backoff(msg.retries)
  456. } else if pp.highWatermark > 0 {
  457. // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
  458. if msg.retries < pp.highWatermark {
  459. // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
  460. if msg.flags&fin == fin {
  461. pp.retryState[msg.retries].expectChaser = false
  462. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  463. } else {
  464. pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
  465. }
  466. continue
  467. } else if msg.flags&fin == fin {
  468. // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
  469. // meaning this retry level is done and we can go down (at least) one level and flush that
  470. pp.retryState[pp.highWatermark].expectChaser = false
  471. pp.flushRetryBuffers()
  472. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  473. continue
  474. }
  475. }
  476. // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
  477. // without breaking any of our ordering guarantees
  478. if pp.brokerProducer == nil {
  479. if err := pp.updateLeader(); err != nil {
  480. pp.parent.returnError(msg, err)
  481. pp.backoff(msg.retries)
  482. continue
  483. }
  484. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  485. }
  486. pp.brokerProducer.input <- msg
  487. }
  488. }
  489. func (pp *partitionProducer) newHighWatermark(hwm int) {
  490. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
  491. pp.highWatermark = hwm
  492. // send off a fin so that we know when everything "in between" has made it
  493. // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
  494. pp.retryState[pp.highWatermark].expectChaser = true
  495. pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
  496. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
  497. // a new HWM means that our current broker selection is out of date
  498. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  499. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  500. pp.brokerProducer = nil
  501. }
  502. func (pp *partitionProducer) flushRetryBuffers() {
  503. Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  504. for {
  505. pp.highWatermark--
  506. if pp.brokerProducer == nil {
  507. if err := pp.updateLeader(); err != nil {
  508. pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
  509. goto flushDone
  510. }
  511. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  512. }
  513. for _, msg := range pp.retryState[pp.highWatermark].buf {
  514. pp.brokerProducer.input <- msg
  515. }
  516. flushDone:
  517. pp.retryState[pp.highWatermark].buf = nil
  518. if pp.retryState[pp.highWatermark].expectChaser {
  519. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  520. break
  521. } else if pp.highWatermark == 0 {
  522. Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
  523. break
  524. }
  525. }
  526. }
  527. func (pp *partitionProducer) updateLeader() error {
  528. return pp.breaker.Run(func() (err error) {
  529. if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
  530. return err
  531. }
  532. if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
  533. return err
  534. }
  535. pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
  536. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  537. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  538. return nil
  539. })
  540. }
  541. // one per broker; also constructs an associated flusher
  542. func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
  543. var (
  544. input = make(chan *ProducerMessage)
  545. bridge = make(chan *produceSet)
  546. responses = make(chan *brokerProducerResponse)
  547. )
  548. bp := &brokerProducer{
  549. parent: p,
  550. broker: broker,
  551. input: input,
  552. output: bridge,
  553. responses: responses,
  554. stopchan: make(chan struct{}),
  555. buffer: newProduceSet(p),
  556. currentRetries: make(map[string]map[int32]error),
  557. }
  558. go withRecover(bp.run)
  559. // minimal bridge to make the network response `select`able
  560. go withRecover(func() {
  561. for set := range bridge {
  562. request := set.buildRequest()
  563. response, err := broker.Produce(request)
  564. responses <- &brokerProducerResponse{
  565. set: set,
  566. err: err,
  567. res: response,
  568. }
  569. }
  570. close(responses)
  571. })
  572. if p.conf.Producer.Retry.Max <= 0 {
  573. bp.abandoned = make(chan struct{})
  574. }
  575. return bp
  576. }
  577. type brokerProducerResponse struct {
  578. set *produceSet
  579. err error
  580. res *ProduceResponse
  581. }
  582. // groups messages together into appropriately-sized batches for sending to the broker
  583. // handles state related to retries etc
  584. type brokerProducer struct {
  585. parent *asyncProducer
  586. broker *Broker
  587. input chan *ProducerMessage
  588. output chan<- *produceSet
  589. responses <-chan *brokerProducerResponse
  590. abandoned chan struct{}
  591. stopchan chan struct{}
  592. buffer *produceSet
  593. timer <-chan time.Time
  594. timerFired bool
  595. closing error
  596. currentRetries map[string]map[int32]error
  597. }
  598. func (bp *brokerProducer) run() {
  599. var output chan<- *produceSet
  600. Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
  601. for {
  602. select {
  603. case msg, ok := <-bp.input:
  604. if !ok {
  605. Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
  606. bp.shutdown()
  607. return
  608. }
  609. if msg == nil {
  610. continue
  611. }
  612. if msg.flags&syn == syn {
  613. Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
  614. bp.broker.ID(), msg.Topic, msg.Partition)
  615. if bp.currentRetries[msg.Topic] == nil {
  616. bp.currentRetries[msg.Topic] = make(map[int32]error)
  617. }
  618. bp.currentRetries[msg.Topic][msg.Partition] = nil
  619. bp.parent.inFlight.Done()
  620. continue
  621. }
  622. if reason := bp.needsRetry(msg); reason != nil {
  623. bp.parent.retryMessage(msg, reason)
  624. if bp.closing == nil && msg.flags&fin == fin {
  625. // we were retrying this partition but we can start processing again
  626. delete(bp.currentRetries[msg.Topic], msg.Partition)
  627. Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
  628. bp.broker.ID(), msg.Topic, msg.Partition)
  629. }
  630. continue
  631. }
  632. if bp.buffer.wouldOverflow(msg) {
  633. if err := bp.waitForSpace(msg); err != nil {
  634. bp.parent.retryMessage(msg, err)
  635. continue
  636. }
  637. }
  638. if err := bp.buffer.add(msg); err != nil {
  639. bp.parent.returnError(msg, err)
  640. continue
  641. }
  642. if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
  643. bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
  644. }
  645. case <-bp.timer:
  646. bp.timerFired = true
  647. case output <- bp.buffer:
  648. bp.rollOver()
  649. case response, ok := <-bp.responses:
  650. if ok {
  651. bp.handleResponse(response)
  652. }
  653. case <-bp.stopchan:
  654. Logger.Printf(
  655. "producer/broker/%d run loop asked to stop\n", bp.broker.ID())
  656. return
  657. }
  658. if bp.timerFired || bp.buffer.readyToFlush() {
  659. output = bp.output
  660. } else {
  661. output = nil
  662. }
  663. }
  664. }
  665. func (bp *brokerProducer) shutdown() {
  666. for !bp.buffer.empty() {
  667. select {
  668. case response := <-bp.responses:
  669. bp.handleResponse(response)
  670. case bp.output <- bp.buffer:
  671. bp.rollOver()
  672. }
  673. }
  674. close(bp.output)
  675. for response := range bp.responses {
  676. bp.handleResponse(response)
  677. }
  678. close(bp.stopchan)
  679. Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
  680. }
  681. func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
  682. if bp.closing != nil {
  683. return bp.closing
  684. }
  685. return bp.currentRetries[msg.Topic][msg.Partition]
  686. }
  687. func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
  688. Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
  689. for {
  690. select {
  691. case response := <-bp.responses:
  692. bp.handleResponse(response)
  693. // handling a response can change our state, so re-check some things
  694. if reason := bp.needsRetry(msg); reason != nil {
  695. return reason
  696. } else if !bp.buffer.wouldOverflow(msg) {
  697. return nil
  698. }
  699. case bp.output <- bp.buffer:
  700. bp.rollOver()
  701. return nil
  702. }
  703. }
  704. }
  705. func (bp *brokerProducer) rollOver() {
  706. bp.timer = nil
  707. bp.timerFired = false
  708. bp.buffer = newProduceSet(bp.parent)
  709. }
  710. func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
  711. if response.err != nil {
  712. bp.handleError(response.set, response.err)
  713. } else {
  714. bp.handleSuccess(response.set, response.res)
  715. }
  716. if bp.buffer.empty() {
  717. bp.rollOver() // this can happen if the response invalidated our buffer
  718. }
  719. }
  720. func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
  721. // we iterate through the blocks in the request set, not the response, so that we notice
  722. // if the response is missing a block completely
  723. var retryTopics []string
  724. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  725. if response == nil {
  726. // this only happens when RequiredAcks is NoResponse, so we have to assume success
  727. bp.parent.returnSuccesses(pSet.msgs)
  728. return
  729. }
  730. block := response.GetBlock(topic, partition)
  731. if block == nil {
  732. bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
  733. return
  734. }
  735. switch block.Err {
  736. // Success
  737. case ErrNoError:
  738. if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
  739. for _, msg := range pSet.msgs {
  740. msg.Timestamp = block.Timestamp
  741. }
  742. }
  743. for i, msg := range pSet.msgs {
  744. msg.Offset = block.Offset + int64(i)
  745. }
  746. bp.parent.returnSuccesses(pSet.msgs)
  747. // Duplicate
  748. case ErrDuplicateSequenceNumber:
  749. bp.parent.returnSuccesses(pSet.msgs)
  750. // Retriable errors
  751. case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
  752. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  753. if bp.parent.conf.Producer.Retry.Max <= 0 {
  754. bp.parent.abandonBrokerConnection(bp.broker)
  755. bp.parent.returnErrors(pSet.msgs, block.Err)
  756. } else {
  757. retryTopics = append(retryTopics, topic)
  758. }
  759. // Other non-retriable errors
  760. default:
  761. if bp.parent.conf.Producer.Retry.Max <= 0 {
  762. bp.parent.abandonBrokerConnection(bp.broker)
  763. }
  764. bp.parent.returnErrors(pSet.msgs, block.Err)
  765. }
  766. })
  767. if len(retryTopics) > 0 {
  768. if bp.parent.conf.Producer.Idempotent {
  769. err := bp.parent.client.RefreshMetadata(retryTopics...)
  770. if err != nil {
  771. Logger.Printf("Failed refreshing metadata because of %v\n", err)
  772. }
  773. }
  774. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  775. block := response.GetBlock(topic, partition)
  776. if block == nil {
  777. // handled in the previous "eachPartition" loop
  778. return
  779. }
  780. switch block.Err {
  781. case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
  782. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  783. Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
  784. bp.broker.ID(), topic, partition, block.Err)
  785. if bp.currentRetries[topic] == nil {
  786. bp.currentRetries[topic] = make(map[int32]error)
  787. }
  788. bp.currentRetries[topic][partition] = block.Err
  789. if bp.parent.conf.Producer.Idempotent {
  790. go bp.parent.retryBatch(topic, partition, pSet, block.Err)
  791. } else {
  792. bp.parent.retryMessages(pSet.msgs, block.Err)
  793. }
  794. // dropping the following messages has the side effect of incrementing their retry count
  795. bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
  796. }
  797. })
  798. }
  799. }
  800. func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
  801. Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
  802. produceSet := newProduceSet(p)
  803. produceSet.msgs[topic] = make(map[int32]*partitionSet)
  804. produceSet.msgs[topic][partition] = pSet
  805. produceSet.bufferBytes += pSet.bufferBytes
  806. produceSet.bufferCount += len(pSet.msgs)
  807. for _, msg := range pSet.msgs {
  808. if msg.retries >= p.conf.Producer.Retry.Max {
  809. p.returnError(msg, kerr)
  810. return
  811. }
  812. msg.retries++
  813. }
  814. // it's expected that a metadata refresh has been requested prior to calling retryBatch
  815. leader, err := p.client.Leader(topic, partition)
  816. if err != nil {
  817. Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
  818. for _, msg := range pSet.msgs {
  819. p.returnError(msg, kerr)
  820. }
  821. return
  822. }
  823. bp := p.getBrokerProducer(leader)
  824. bp.output <- produceSet
  825. }
  826. func (bp *brokerProducer) handleError(sent *produceSet, err error) {
  827. switch err.(type) {
  828. case PacketEncodingError:
  829. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  830. bp.parent.returnErrors(pSet.msgs, err)
  831. })
  832. default:
  833. Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
  834. bp.parent.abandonBrokerConnection(bp.broker)
  835. _ = bp.broker.Close()
  836. bp.closing = err
  837. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  838. bp.parent.retryMessages(pSet.msgs, err)
  839. })
  840. bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  841. bp.parent.retryMessages(pSet.msgs, err)
  842. })
  843. bp.rollOver()
  844. }
  845. }
  846. // singleton
  847. // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
  848. // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
  849. func (p *asyncProducer) retryHandler() {
  850. var msg *ProducerMessage
  851. buf := queue.New()
  852. for {
  853. if buf.Length() == 0 {
  854. msg = <-p.retries
  855. } else {
  856. select {
  857. case msg = <-p.retries:
  858. case p.input <- buf.Peek().(*ProducerMessage):
  859. buf.Remove()
  860. continue
  861. }
  862. }
  863. if msg == nil {
  864. return
  865. }
  866. buf.Add(msg)
  867. }
  868. }
  869. // utility functions
  870. func (p *asyncProducer) shutdown() {
  871. Logger.Println("Producer shutting down.")
  872. p.inFlight.Add(1)
  873. p.input <- &ProducerMessage{flags: shutdown}
  874. p.inFlight.Wait()
  875. err := p.client.Close()
  876. if err != nil {
  877. Logger.Println("producer/shutdown failed to close the embedded client:", err)
  878. }
  879. close(p.input)
  880. close(p.retries)
  881. close(p.errors)
  882. close(p.successes)
  883. }
  884. func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
  885. msg.clear()
  886. pErr := &ProducerError{Msg: msg, Err: err}
  887. if p.conf.Producer.Return.Errors {
  888. p.errors <- pErr
  889. } else {
  890. Logger.Println(pErr)
  891. }
  892. p.inFlight.Done()
  893. }
  894. func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
  895. for _, msg := range batch {
  896. p.returnError(msg, err)
  897. }
  898. }
  899. func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
  900. for _, msg := range batch {
  901. if p.conf.Producer.Return.Successes {
  902. msg.clear()
  903. p.successes <- msg
  904. }
  905. p.inFlight.Done()
  906. }
  907. }
  908. func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
  909. if msg.retries >= p.conf.Producer.Retry.Max {
  910. p.returnError(msg, err)
  911. } else {
  912. msg.retries++
  913. p.retries <- msg
  914. }
  915. }
  916. func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
  917. for _, msg := range batch {
  918. p.retryMessage(msg, err)
  919. }
  920. }
  921. func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
  922. p.brokerLock.Lock()
  923. defer p.brokerLock.Unlock()
  924. bp := p.brokers[broker]
  925. if bp == nil {
  926. bp = p.newBrokerProducer(broker)
  927. p.brokers[broker] = bp
  928. p.brokerRefs[bp] = 0
  929. }
  930. p.brokerRefs[bp]++
  931. return bp
  932. }
  933. func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
  934. p.brokerLock.Lock()
  935. defer p.brokerLock.Unlock()
  936. p.brokerRefs[bp]--
  937. if p.brokerRefs[bp] == 0 {
  938. close(bp.input)
  939. delete(p.brokerRefs, bp)
  940. if p.brokers[broker] == bp {
  941. delete(p.brokers, broker)
  942. }
  943. }
  944. }
  945. func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
  946. p.brokerLock.Lock()
  947. defer p.brokerLock.Unlock()
  948. bc, ok := p.brokers[broker]
  949. if ok && bc.abandoned != nil {
  950. close(bc.abandoned)
  951. }
  952. delete(p.brokers, broker)
  953. }