producer.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761
  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. func forceFlushThreshold() int {
  8. return int(MaxRequestSize - (10 * 1024)) // 10KiB is safety room for misc. overhead, we might want to calculate this more precisely?
  9. }
  10. // ProducerConfig is used to pass multiple configuration options to NewProducer.
  11. type ProducerConfig struct {
  12. Partitioner PartitionerConstructor // Generates partitioners for choosing the partition to send messages to (defaults to hash).
  13. RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
  14. Timeout time.Duration // The maximum duration the broker will wait the receipt of the number of RequiredAcks. This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated.
  15. Compression CompressionCodec // The type of compression to use on messages (defaults to no compression).
  16. FlushMsgCount int // The number of messages needed to trigger a flush.
  17. FlushFrequency time.Duration // If this amount of time elapses without a flush, one will be queued.
  18. FlushByteCount int // If this many bytes of messages are accumulated, a flush will be triggered.
  19. AckSuccesses bool // If enabled, successfully delivered messages will be returned on the Successes channel.
  20. MaxMessageBytes int // The maximum permitted size of a message (defaults to 1000000)
  21. ChannelBufferSize int // The size of the buffers of the channels between the different goroutines. Defaults to 0 (unbuffered).
  22. }
  23. // NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
  24. func NewProducerConfig() *ProducerConfig {
  25. return &ProducerConfig{
  26. Partitioner: NewHashPartitioner,
  27. RequiredAcks: WaitForLocal,
  28. MaxMessageBytes: 1000000,
  29. }
  30. }
  31. // Validate checks a ProducerConfig instance. It will return a
  32. // ConfigurationError if the specified value doesn't make sense.
  33. func (config *ProducerConfig) Validate() error {
  34. if config.RequiredAcks < -1 {
  35. return ConfigurationError("Invalid RequiredAcks")
  36. }
  37. if config.Timeout < 0 {
  38. return ConfigurationError("Invalid Timeout")
  39. } else if config.Timeout%time.Millisecond != 0 {
  40. Logger.Println("ProducerConfig.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
  41. }
  42. if config.FlushMsgCount < 0 {
  43. return ConfigurationError("Invalid FlushMsgCount")
  44. }
  45. if config.FlushByteCount < 0 {
  46. return ConfigurationError("Invalid FlushByteCount")
  47. } else if config.FlushByteCount >= forceFlushThreshold() {
  48. Logger.Println("ProducerConfig.FlushByteCount too close to MaxRequestSize; it will be ignored.")
  49. }
  50. if config.FlushFrequency < 0 {
  51. return ConfigurationError("Invalid FlushFrequency")
  52. }
  53. if config.Partitioner == nil {
  54. return ConfigurationError("No partitioner set")
  55. }
  56. if config.MaxMessageBytes <= 0 {
  57. return ConfigurationError("Invalid MaxMessageBytes")
  58. } else if config.MaxMessageBytes >= forceFlushThreshold() {
  59. Logger.Println("ProducerConfig.MaxMessageBytes too close to MaxRequestSize; it will be ignored.")
  60. }
  61. return nil
  62. }
  63. // Producer publishes Kafka messages. It routes messages to the correct broker
  64. // for the provided topic-partition, refreshing metadata as appropriate, and
  65. // parses responses for errors. You must read from the Errors() channel or the
  66. // producer will deadlock. You must call Close() on a producer to avoid
  67. // leaks: it will not be garbage-collected automatically when it passes out of
  68. // scope (this is in addition to calling Close on the underlying client, which
  69. // is still necessary).
  70. type Producer struct {
  71. client *Client
  72. config ProducerConfig
  73. errors chan *ProduceError
  74. input, successes, retries chan *MessageToSend
  75. brokers map[*Broker]*brokerWorker
  76. brokerLock sync.Mutex
  77. }
  78. // NewProducer creates a new Producer using the given client.
  79. func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
  80. // Check that we are not dealing with a closed Client before processing
  81. // any other arguments
  82. if client.Closed() {
  83. return nil, ClosedClient
  84. }
  85. if config == nil {
  86. config = NewProducerConfig()
  87. }
  88. if err := config.Validate(); err != nil {
  89. return nil, err
  90. }
  91. p := &Producer{
  92. client: client,
  93. config: *config,
  94. errors: make(chan *ProduceError),
  95. input: make(chan *MessageToSend),
  96. successes: make(chan *MessageToSend),
  97. retries: make(chan *MessageToSend),
  98. brokers: make(map[*Broker]*brokerWorker),
  99. }
  100. // launch our singleton dispatchers
  101. go withRecover(p.topicDispatcher)
  102. go withRecover(p.retryHandler)
  103. return p, nil
  104. }
  105. type flagSet int8
  106. const (
  107. retried flagSet = 1 << iota // message has been retried
  108. chaser // message is last in a group that failed
  109. ref // add a reference to a singleton channel
  110. unref // remove a reference from a singleton channel
  111. shutdown // start the shutdown process
  112. )
  113. // MessageToSend is the collection of elements passed to the Producer in order to send a message.
  114. type MessageToSend struct {
  115. Topic string
  116. Key, Value Encoder
  117. // these are filled in by the producer as the message is processed
  118. offset int64
  119. partition int32
  120. flags flagSet
  121. }
  122. // Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if
  123. // the message was successfully delivered and RequiredAcks is not NoResponse.
  124. func (m *MessageToSend) Offset() int64 {
  125. return m.offset
  126. }
  127. // Partition is the partition that the message was sent to. This is only guaranteed to be defined if
  128. // the message was successfully delivered.
  129. func (m *MessageToSend) Partition() int32 {
  130. return m.partition
  131. }
  132. func (m *MessageToSend) byteSize() int {
  133. size := 26 // the metadata overhead of CRC, flags, etc.
  134. if m.Key != nil {
  135. size += m.Key.Length()
  136. }
  137. if m.Value != nil {
  138. size += m.Value.Length()
  139. }
  140. return size
  141. }
  142. // ProduceError is the type of error generated when the producer fails to deliver a message.
  143. // It contains the original MessageToSend as well as the actual error value.
  144. type ProduceError struct {
  145. Msg *MessageToSend
  146. Err error
  147. }
  148. // ProduceErrors is a type that wraps a batch of "ProduceError"s and implements the Error interface.
  149. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
  150. // when closing a producer.
  151. type ProduceErrors []*ProduceError
  152. func (pe ProduceErrors) Error() string {
  153. return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
  154. }
  155. // Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock.
  156. // It is suggested that you send messages and read errors together in a single select statement.
  157. func (p *Producer) Errors() <-chan *ProduceError {
  158. return p.errors
  159. }
  160. // Successes is the success output channel back to the user when AckSuccesses is configured.
  161. // If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock.
  162. // It is suggested that you send and read messages together in a single select statement.
  163. func (p *Producer) Successes() <-chan *MessageToSend {
  164. return p.successes
  165. }
  166. // Input is the input channel for the user to write messages to that they wish to send.
  167. func (p *Producer) Input() chan<- *MessageToSend {
  168. return p.input
  169. }
  170. // Close shuts down the producer and flushes any messages it may have buffered.
  171. // You must call this function before a producer object passes out of scope, as
  172. // it may otherwise leak memory. You must call this before calling Close on the
  173. // underlying client.
  174. func (p *Producer) Close() error {
  175. go withRecover(func() {
  176. p.input <- &MessageToSend{flags: shutdown}
  177. })
  178. if p.config.AckSuccesses {
  179. go withRecover(func() {
  180. for _ = range p.successes {
  181. }
  182. })
  183. }
  184. var errors ProduceErrors
  185. for event := range p.errors {
  186. errors = append(errors, event)
  187. }
  188. close(p.successes)
  189. if len(errors) > 0 {
  190. return errors
  191. }
  192. return nil
  193. }
  194. ///////////////////////////////////////////
  195. // In normal processing, a message flows through the following functions from top to bottom,
  196. // starting at topicDispatcher (which reads from Producer.input) and ending in flusher
  197. // (which sends the message to the broker). In cases where a message must be retried, it goes
  198. // through retryHandler before being returned to the top of the flow.
  199. ///////////////////////////////////////////
  200. // singleton
  201. func (p *Producer) topicDispatcher() {
  202. handlers := make(map[string]chan *MessageToSend)
  203. for msg := range p.input {
  204. if msg == nil {
  205. Logger.Println("Something tried to send a nil message, it was ignored.")
  206. continue
  207. }
  208. if msg.flags&shutdown != 0 {
  209. Logger.Println("Producer shutting down.")
  210. break
  211. }
  212. if (p.config.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.config.MaxMessageBytes) ||
  213. (msg.byteSize() > p.config.MaxMessageBytes) {
  214. p.errors <- &ProduceError{Msg: msg, Err: MessageSizeTooLarge}
  215. continue
  216. }
  217. handler := handlers[msg.Topic]
  218. if handler == nil {
  219. p.retries <- &MessageToSend{flags: ref}
  220. newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
  221. go withRecover(func() { p.partitionDispatcher(msg.Topic, newHandler) })
  222. handler = newHandler
  223. handlers[msg.Topic] = handler
  224. }
  225. handler <- msg
  226. }
  227. for _, handler := range handlers {
  228. close(handler)
  229. }
  230. p.retries <- &MessageToSend{flags: shutdown}
  231. for msg := range p.input {
  232. p.errors <- &ProduceError{Msg: msg, Err: ShuttingDown}
  233. }
  234. close(p.errors)
  235. }
  236. // one per topic
  237. func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend) {
  238. handlers := make(map[int32]chan *MessageToSend)
  239. partitioner := p.config.Partitioner()
  240. for msg := range input {
  241. if msg.flags&retried == 0 {
  242. err := p.assignPartition(partitioner, msg)
  243. if err != nil {
  244. p.errors <- &ProduceError{Msg: msg, Err: err}
  245. continue
  246. }
  247. }
  248. handler := handlers[msg.partition]
  249. if handler == nil {
  250. p.retries <- &MessageToSend{flags: ref}
  251. newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
  252. go withRecover(func() { p.leaderDispatcher(msg.Topic, msg.partition, newHandler) })
  253. handler = newHandler
  254. handlers[msg.partition] = handler
  255. }
  256. handler <- msg
  257. }
  258. for _, handler := range handlers {
  259. close(handler)
  260. }
  261. p.retries <- &MessageToSend{flags: unref}
  262. }
  263. // one per partition per topic
  264. func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *MessageToSend) {
  265. var leader *Broker
  266. var output chan *MessageToSend
  267. var backlog []*MessageToSend
  268. for msg := range input {
  269. if msg.flags&retried == 0 {
  270. // normal case
  271. if backlog != nil {
  272. backlog = append(backlog, msg)
  273. continue
  274. }
  275. } else if msg.flags&chaser == 0 {
  276. // retry flag set, chaser flag not set
  277. if backlog == nil {
  278. // on the very first retried message we send off a chaser so that we know when everything "in between" has made it
  279. // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
  280. Logger.Printf("producer/leader state change to [retrying] on %s/%d\n", topic, partition)
  281. output <- &MessageToSend{Topic: topic, partition: partition, flags: chaser}
  282. backlog = make([]*MessageToSend, 0)
  283. p.unrefBrokerWorker(leader)
  284. output = nil
  285. }
  286. } else {
  287. // retry *and* chaser flag set, flush the backlog and return to normal processing
  288. Logger.Printf("producer/leader state change to [flushing] on %s/%d\n", topic, partition)
  289. if output == nil {
  290. err := p.client.RefreshTopicMetadata(topic)
  291. if err != nil {
  292. p.returnErrors(backlog, err)
  293. backlog = nil
  294. continue
  295. }
  296. leader, err = p.client.Leader(topic, partition)
  297. if err != nil {
  298. p.returnErrors(backlog, err)
  299. backlog = nil
  300. continue
  301. }
  302. output = p.getBrokerWorker(leader)
  303. }
  304. for _, msg := range backlog {
  305. output <- msg
  306. }
  307. Logger.Printf("producer/leader state change to [normal] on %s/%d\n", topic, partition)
  308. backlog = nil
  309. continue
  310. }
  311. if output == nil {
  312. var err error
  313. if backlog != nil {
  314. err = p.client.RefreshTopicMetadata(topic)
  315. if err != nil {
  316. p.errors <- &ProduceError{Msg: msg, Err: err}
  317. continue
  318. }
  319. }
  320. leader, err = p.client.Leader(topic, partition)
  321. if err != nil {
  322. p.errors <- &ProduceError{Msg: msg, Err: err}
  323. continue
  324. }
  325. output = p.getBrokerWorker(leader)
  326. }
  327. output <- msg
  328. }
  329. p.unrefBrokerWorker(leader)
  330. p.retries <- &MessageToSend{flags: unref}
  331. }
  332. // one per broker
  333. func (p *Producer) messageAggregator(broker *Broker, input chan *MessageToSend) {
  334. var ticker *time.Ticker
  335. var timer <-chan time.Time
  336. if p.config.FlushFrequency > 0 {
  337. ticker = time.NewTicker(p.config.FlushFrequency)
  338. timer = ticker.C
  339. }
  340. var buffer []*MessageToSend
  341. var doFlush chan []*MessageToSend
  342. var bytesAccumulated int
  343. flusher := make(chan []*MessageToSend)
  344. go withRecover(func() { p.flusher(broker, flusher) })
  345. for {
  346. select {
  347. case msg := <-input:
  348. if msg == nil {
  349. goto shutdown
  350. }
  351. if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
  352. (p.config.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.config.MaxMessageBytes) {
  353. Logger.Println("producer/aggregator hit maximum request size, forcing blocking flush")
  354. flusher <- buffer
  355. buffer = nil
  356. doFlush = nil
  357. bytesAccumulated = 0
  358. }
  359. buffer = append(buffer, msg)
  360. bytesAccumulated += msg.byteSize()
  361. if len(buffer) >= p.config.FlushMsgCount ||
  362. (p.config.FlushByteCount > 0 && bytesAccumulated >= p.config.FlushByteCount) {
  363. doFlush = flusher
  364. }
  365. case <-timer:
  366. doFlush = flusher
  367. case doFlush <- buffer:
  368. buffer = nil
  369. doFlush = nil
  370. bytesAccumulated = 0
  371. }
  372. }
  373. shutdown:
  374. if ticker != nil {
  375. ticker.Stop()
  376. }
  377. if len(buffer) > 0 {
  378. flusher <- buffer
  379. }
  380. close(flusher)
  381. }
  382. // one per broker
  383. func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
  384. var closing error
  385. currentRetries := make(map[string]map[int32]error)
  386. for batch := range input {
  387. if closing != nil {
  388. p.retryMessages(batch, closing)
  389. continue
  390. }
  391. // group messages by topic/partition
  392. msgSets := make(map[string]map[int32][]*MessageToSend)
  393. for i, msg := range batch {
  394. if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.partition] != nil {
  395. if msg.flags&chaser == chaser {
  396. // we can start processing this topic/partition again
  397. Logger.Printf("producer/flusher state change to [normal] on %s/%d\n",
  398. msg.Topic, msg.partition)
  399. currentRetries[msg.Topic][msg.partition] = nil
  400. }
  401. p.retryMessages([]*MessageToSend{msg}, currentRetries[msg.Topic][msg.partition])
  402. batch[i] = nil // to prevent it being returned/retried twice
  403. continue
  404. }
  405. partitionSet := msgSets[msg.Topic]
  406. if partitionSet == nil {
  407. partitionSet = make(map[int32][]*MessageToSend)
  408. msgSets[msg.Topic] = partitionSet
  409. }
  410. partitionSet[msg.partition] = append(partitionSet[msg.partition], msg)
  411. }
  412. request := p.buildRequest(msgSets)
  413. if request == nil {
  414. continue
  415. }
  416. response, err := broker.Produce(p.client.id, request)
  417. switch err {
  418. case nil:
  419. break
  420. case EncodingError:
  421. p.returnErrors(batch, err)
  422. continue
  423. default:
  424. p.client.disconnectBroker(broker)
  425. Logger.Println("producer/flusher state change to [closing] because", err)
  426. closing = err
  427. p.retryMessages(batch, err)
  428. continue
  429. }
  430. if response == nil {
  431. // this only happens when RequiredAcks is NoResponse, so we have to assume success
  432. if p.config.AckSuccesses {
  433. p.returnSuccesses(batch)
  434. }
  435. continue
  436. }
  437. // we iterate through the blocks in the request, not the response, so that we notice
  438. // if the response is missing a block completely
  439. for topic, partitionSet := range msgSets {
  440. for partition, msgs := range partitionSet {
  441. block := response.GetBlock(topic, partition)
  442. if block == nil {
  443. p.returnErrors(msgs, IncompleteResponse)
  444. continue
  445. }
  446. switch block.Err {
  447. case NoError:
  448. // All the messages for this topic-partition were delivered successfully!
  449. if p.config.AckSuccesses {
  450. for i := range msgs {
  451. msgs[i].offset = block.Offset + int64(i)
  452. }
  453. p.returnSuccesses(msgs)
  454. }
  455. case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
  456. Logger.Printf("producer/flusher state change to [retrying] on %s/%d because %v\n",
  457. topic, partition, block.Err)
  458. if currentRetries[topic] == nil {
  459. currentRetries[topic] = make(map[int32]error)
  460. }
  461. currentRetries[topic][partition] = block.Err
  462. p.retryMessages(msgs, block.Err)
  463. default:
  464. p.returnErrors(msgs, block.Err)
  465. }
  466. }
  467. }
  468. }
  469. p.retries <- &MessageToSend{flags: unref}
  470. }
  471. // singleton
  472. func (p *Producer) retryHandler() {
  473. var buf []*MessageToSend
  474. var msg *MessageToSend
  475. refs := 0
  476. shuttingDown := false
  477. for {
  478. if len(buf) == 0 {
  479. msg = <-p.retries
  480. } else {
  481. select {
  482. case msg = <-p.retries:
  483. case p.input <- buf[0]:
  484. buf = buf[1:]
  485. continue
  486. }
  487. }
  488. if msg.flags&ref != 0 {
  489. refs++
  490. } else if msg.flags&unref != 0 {
  491. refs--
  492. if refs == 0 && shuttingDown {
  493. break
  494. }
  495. } else if msg.flags&shutdown != 0 {
  496. shuttingDown = true
  497. if refs == 0 {
  498. break
  499. }
  500. } else {
  501. buf = append(buf, msg)
  502. }
  503. }
  504. close(p.retries)
  505. for i := range buf {
  506. p.input <- buf[i]
  507. }
  508. close(p.input)
  509. }
  510. ///////////////////////////////////////////
  511. ///////////////////////////////////////////
  512. // utility functions
  513. func (p *Producer) assignPartition(partitioner Partitioner, msg *MessageToSend) error {
  514. partitions, err := p.client.Partitions(msg.Topic)
  515. if err != nil {
  516. return err
  517. }
  518. numPartitions := int32(len(partitions))
  519. if numPartitions == 0 {
  520. return LeaderNotAvailable
  521. }
  522. choice := partitioner.Partition(msg.Key, numPartitions)
  523. if choice < 0 || choice >= numPartitions {
  524. return InvalidPartition
  525. }
  526. msg.partition = partitions[choice]
  527. return nil
  528. }
  529. func (p *Producer) buildRequest(batch map[string]map[int32][]*MessageToSend) *ProduceRequest {
  530. req := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: int32(p.config.Timeout / time.Millisecond)}
  531. empty := true
  532. for topic, partitionSet := range batch {
  533. for partition, msgSet := range partitionSet {
  534. setToSend := new(MessageSet)
  535. setSize := 0
  536. for _, msg := range msgSet {
  537. var keyBytes, valBytes []byte
  538. var err error
  539. if msg.Key != nil {
  540. if keyBytes, err = msg.Key.Encode(); err != nil {
  541. p.errors <- &ProduceError{Msg: msg, Err: err}
  542. continue
  543. }
  544. }
  545. if msg.Value != nil {
  546. if valBytes, err = msg.Value.Encode(); err != nil {
  547. p.errors <- &ProduceError{Msg: msg, Err: err}
  548. continue
  549. }
  550. }
  551. if p.config.Compression != CompressionNone && setSize+msg.byteSize() > p.config.MaxMessageBytes {
  552. // compression causes message-sets to be wrapped as single messages, which have tighter
  553. // size requirements, so we have to respect those limits
  554. valBytes, err := encode(setToSend)
  555. if err != nil {
  556. Logger.Println(err) // if this happens, it's basically our fault.
  557. panic(err)
  558. }
  559. req.AddMessage(topic, partition, &Message{Codec: p.config.Compression, Key: nil, Value: valBytes})
  560. setToSend = new(MessageSet)
  561. setSize = 0
  562. }
  563. setSize += msg.byteSize()
  564. setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes})
  565. empty = false
  566. }
  567. if p.config.Compression == CompressionNone {
  568. req.AddSet(topic, partition, setToSend)
  569. } else {
  570. valBytes, err := encode(setToSend)
  571. if err != nil {
  572. Logger.Println(err) // if this happens, it's basically our fault.
  573. panic(err)
  574. }
  575. req.AddMessage(topic, partition, &Message{Codec: p.config.Compression, Key: nil, Value: valBytes})
  576. }
  577. }
  578. }
  579. if empty {
  580. return nil
  581. }
  582. return req
  583. }
  584. func (p *Producer) returnErrors(batch []*MessageToSend, err error) {
  585. for _, msg := range batch {
  586. if msg != nil {
  587. p.errors <- &ProduceError{Msg: msg, Err: err}
  588. }
  589. }
  590. }
  591. func (p *Producer) returnSuccesses(batch []*MessageToSend) {
  592. for _, msg := range batch {
  593. if msg != nil {
  594. p.successes <- msg
  595. }
  596. }
  597. }
  598. func (p *Producer) retryMessages(batch []*MessageToSend, err error) {
  599. for _, msg := range batch {
  600. if msg == nil {
  601. continue
  602. }
  603. if msg.flags&retried == retried {
  604. p.errors <- &ProduceError{Msg: msg, Err: err}
  605. } else {
  606. msg.flags |= retried
  607. p.retries <- msg
  608. }
  609. }
  610. }
  611. type brokerWorker struct {
  612. input chan *MessageToSend
  613. refs int
  614. }
  615. func (p *Producer) getBrokerWorker(broker *Broker) chan *MessageToSend {
  616. p.brokerLock.Lock()
  617. defer p.brokerLock.Unlock()
  618. worker := p.brokers[broker]
  619. if worker == nil {
  620. p.retries <- &MessageToSend{flags: ref}
  621. worker = &brokerWorker{
  622. refs: 1,
  623. input: make(chan *MessageToSend),
  624. }
  625. p.brokers[broker] = worker
  626. go withRecover(func() { p.messageAggregator(broker, worker.input) })
  627. } else {
  628. worker.refs++
  629. }
  630. return worker.input
  631. }
  632. func (p *Producer) unrefBrokerWorker(broker *Broker) {
  633. p.brokerLock.Lock()
  634. defer p.brokerLock.Unlock()
  635. worker := p.brokers[broker]
  636. if worker != nil {
  637. worker.refs--
  638. if worker.refs == 0 {
  639. close(worker.input)
  640. delete(p.brokers, broker)
  641. }
  642. }
  643. }