producer.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815
  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/eapache/go-resiliency/breaker"
  7. )
  8. func forceFlushThreshold() int {
  9. return int(MaxRequestSize - (10 * 1024)) // 10KiB is safety room for misc. overhead, we might want to calculate this more precisely?
  10. }
  11. // Producer publishes Kafka messages. It routes messages to the correct broker
  12. // for the provided topic-partition, refreshing metadata as appropriate, and
  13. // parses responses for errors. You must read from the Errors() channel or the
  14. // producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
  15. // leaks: it will not be garbage-collected automatically when it passes out of
  16. // scope (this is in addition to calling Close on the underlying client, which
  17. // is still necessary).
  18. type Producer interface {
  19. // AsyncClose triggers a shutdown of the producer, flushing any messages it may have
  20. // buffered. The shutdown has completed when both the Errors and Successes channels
  21. // have been closed. When calling AsyncClose, you *must* continue to read from those
  22. // channels in order to drain the results of any messages in flight.
  23. AsyncClose()
  24. // Close shuts down the producer and flushes any messages it may have buffered.
  25. // You must call this function before a producer object passes out of scope, as
  26. // it may otherwise leak memory. You must call this before calling Close on the
  27. // underlying client.
  28. Close() error
  29. // Input is the input channel for the user to write messages to that they wish to send.
  30. Input() chan<- *ProducerMessage
  31. // Successes is the success output channel back to the user when AckSuccesses is confured.
  32. // If Return.Successes is true, you MUST read from this channel or the Producer will deadlock.
  33. // It is suggested that you send and read messages together in a single select statement.
  34. Successes() <-chan *ProducerMessage
  35. // Errors is the error output channel back to the user. You MUST read from this channel
  36. // or the Producer will deadlock when the channel is full. Alternatively, you can set
  37. // Producer.Return.Errors in your config to false, which prevents errors to be returned.
  38. Errors() <-chan *ProducerError
  39. }
  40. type producer struct {
  41. client Client
  42. conf *Config
  43. ownClient bool
  44. errors chan *ProducerError
  45. input, successes, retries chan *ProducerMessage
  46. brokers map[*Broker]*brokerProducer
  47. brokerLock sync.Mutex
  48. }
  49. // NewProducer creates a new Producer using the given broker addresses and configuration.
  50. func NewProducer(addrs []string, conf *Config) (Producer, error) {
  51. client, err := NewClient(addrs, conf)
  52. if err != nil {
  53. return nil, err
  54. }
  55. p, err := NewProducerFromClient(client)
  56. if err != nil {
  57. return nil, err
  58. }
  59. p.(*producer).ownClient = true
  60. return p, nil
  61. }
  62. // NewProducerFromClient creates a new Producer using the given client.
  63. func NewProducerFromClient(client Client) (Producer, error) {
  64. // Check that we are not dealing with a closed Client before processing any other arguments
  65. if client.Closed() {
  66. return nil, ErrClosedClient
  67. }
  68. p := &producer{
  69. client: client,
  70. conf: client.Config(),
  71. errors: make(chan *ProducerError),
  72. input: make(chan *ProducerMessage),
  73. successes: make(chan *ProducerMessage),
  74. retries: make(chan *ProducerMessage),
  75. brokers: make(map[*Broker]*brokerProducer),
  76. }
  77. // launch our singleton dispatchers
  78. go withRecover(p.topicDispatcher)
  79. go withRecover(p.retryHandler)
  80. return p, nil
  81. }
  82. type flagSet int8
  83. const (
  84. chaser flagSet = 1 << iota // message is last in a group that failed
  85. ref // add a reference to a singleton channel
  86. unref // remove a reference from a singleton channel
  87. shutdown // start the shutdown process
  88. )
  89. // ProducerMessage is the collection of elements passed to the Producer in order to send a message.
  90. type ProducerMessage struct {
  91. Topic string // The Kafka topic for this message.
  92. Key Encoder // The partitioning key for this message. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
  93. Value Encoder // The actual message to store in Kafka. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
  94. Metadata interface{} // This field is used to hold arbitrary data you wish to include so it will be available when receiving on the Successes and Errors channels. Sarama completely ignores this field and is only to be used for pass-through data.
  95. // these are filled in by the producer as the message is processed
  96. offset int64
  97. partition int32
  98. retries int
  99. flags flagSet
  100. }
  101. // Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if
  102. // the message was successfully delivered and RequiredAcks is not NoResponse.
  103. func (m *ProducerMessage) Offset() int64 {
  104. return m.offset
  105. }
  106. // Partition is the partition that the message was sent to. This is only guaranteed to be defined if
  107. // the message was successfully delivered.
  108. func (m *ProducerMessage) Partition() int32 {
  109. return m.partition
  110. }
  111. func (m *ProducerMessage) byteSize() int {
  112. size := 26 // the metadata overhead of CRC, flags, etc.
  113. if m.Key != nil {
  114. size += m.Key.Length()
  115. }
  116. if m.Value != nil {
  117. size += m.Value.Length()
  118. }
  119. return size
  120. }
  121. // ProducerError is the type of error generated when the producer fails to deliver a message.
  122. // It contains the original ProducerMessage as well as the actual error value.
  123. type ProducerError struct {
  124. Msg *ProducerMessage
  125. Err error
  126. }
  127. func (pe ProducerError) Error() string {
  128. return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
  129. }
  130. // ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
  131. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
  132. // when closing a producer.
  133. type ProducerErrors []*ProducerError
  134. func (pe ProducerErrors) Error() string {
  135. return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
  136. }
  137. func (p *producer) Errors() <-chan *ProducerError {
  138. return p.errors
  139. }
  140. func (p *producer) Successes() <-chan *ProducerMessage {
  141. return p.successes
  142. }
  143. func (p *producer) Input() chan<- *ProducerMessage {
  144. return p.input
  145. }
  146. func (p *producer) Close() error {
  147. p.AsyncClose()
  148. if p.conf.Producer.Return.Successes {
  149. go withRecover(func() {
  150. for _ = range p.successes {
  151. }
  152. })
  153. }
  154. var errors ProducerErrors
  155. if p.conf.Producer.Return.Errors {
  156. for event := range p.errors {
  157. errors = append(errors, event)
  158. }
  159. }
  160. if len(errors) > 0 {
  161. return errors
  162. }
  163. return nil
  164. }
  165. func (p *producer) AsyncClose() {
  166. go withRecover(func() {
  167. p.input <- &ProducerMessage{flags: shutdown}
  168. })
  169. }
  170. ///////////////////////////////////////////
  171. // In normal processing, a message flows through the following functions from top to bottom,
  172. // starting at topicDispatcher (which reads from Producer.input) and ending in flusher
  173. // (which sends the message to the broker). In cases where a message must be retried, it goes
  174. // through retryHandler before being returned to the top of the flow.
  175. ///////////////////////////////////////////
  176. // singleton
  177. // dispatches messages by topic
  178. func (p *producer) topicDispatcher() {
  179. handlers := make(map[string]chan *ProducerMessage)
  180. for msg := range p.input {
  181. if msg == nil {
  182. Logger.Println("Something tried to send a nil message, it was ignored.")
  183. continue
  184. }
  185. if msg.flags&shutdown != 0 {
  186. Logger.Println("Producer shutting down.")
  187. break
  188. }
  189. if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||
  190. (msg.byteSize() > p.conf.Producer.MaxMessageBytes) {
  191. p.returnError(msg, ErrMessageSizeTooLarge)
  192. continue
  193. }
  194. handler := handlers[msg.Topic]
  195. if handler == nil {
  196. p.retries <- &ProducerMessage{flags: ref}
  197. newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  198. topic := msg.Topic // block local because go's closure semantics suck
  199. go withRecover(func() { p.partitionDispatcher(topic, newHandler) })
  200. handler = newHandler
  201. handlers[msg.Topic] = handler
  202. }
  203. handler <- msg
  204. }
  205. for _, handler := range handlers {
  206. close(handler)
  207. }
  208. p.retries <- &ProducerMessage{flags: shutdown}
  209. for msg := range p.input {
  210. p.returnError(msg, ErrShuttingDown)
  211. }
  212. if p.ownClient {
  213. err := p.client.Close()
  214. if err != nil {
  215. Logger.Println("producer/shutdown failed to close the embedded client:", err)
  216. }
  217. }
  218. close(p.errors)
  219. close(p.successes)
  220. }
  221. // one per topic
  222. // partitions messages, then dispatches them by partition
  223. func (p *producer) partitionDispatcher(topic string, input chan *ProducerMessage) {
  224. handlers := make(map[int32]chan *ProducerMessage)
  225. partitioner := p.conf.Producer.Partitioner()
  226. for msg := range input {
  227. if msg.retries == 0 {
  228. err := p.assignPartition(partitioner, msg)
  229. if err != nil {
  230. p.returnError(msg, err)
  231. continue
  232. }
  233. }
  234. handler := handlers[msg.partition]
  235. if handler == nil {
  236. p.retries <- &ProducerMessage{flags: ref}
  237. newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  238. topic := msg.Topic // block local because go's closure semantics suck
  239. partition := msg.partition // block local because go's closure semantics suck
  240. go withRecover(func() { p.leaderDispatcher(topic, partition, newHandler) })
  241. handler = newHandler
  242. handlers[msg.partition] = handler
  243. }
  244. handler <- msg
  245. }
  246. for _, handler := range handlers {
  247. close(handler)
  248. }
  249. p.retries <- &ProducerMessage{flags: unref}
  250. }
  251. // one per partition per topic
  252. // dispatches messages to the appropriate broker
  253. // also responsible for maintaining message order during retries
  254. func (p *producer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) {
  255. var leader *Broker
  256. var output chan *ProducerMessage
  257. breaker := breaker.New(3, 1, 10*time.Second)
  258. doUpdate := func() (err error) {
  259. if err = p.client.RefreshMetadata(topic); err != nil {
  260. return err
  261. }
  262. if leader, err = p.client.Leader(topic, partition); err != nil {
  263. return err
  264. }
  265. output = p.getBrokerProducer(leader)
  266. return nil
  267. }
  268. // try to prefetch the leader; if this doesn't work, we'll do a proper breaker-protected refresh-and-fetch
  269. // on the first message
  270. leader, _ = p.client.Leader(topic, partition)
  271. if leader != nil {
  272. output = p.getBrokerProducer(leader)
  273. }
  274. // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
  275. // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
  276. // retryState[msg.retries].expectChaser simply tracks whether we've seen a chaser message for a given level (and
  277. // therefore whether our buffer is complete and safe to flush)
  278. highWatermark := 0
  279. retryState := make([]struct {
  280. buf []*ProducerMessage
  281. expectChaser bool
  282. }, p.conf.Producer.Retry.Max+1)
  283. for msg := range input {
  284. if msg.retries > highWatermark {
  285. // new, higher, retry level; send off a chaser so that we know when everything "in between" has made it
  286. // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
  287. highWatermark = msg.retries
  288. Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
  289. retryState[msg.retries].expectChaser = true
  290. output <- &ProducerMessage{Topic: topic, partition: partition, flags: chaser, retries: msg.retries - 1}
  291. Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
  292. p.unrefBrokerProducer(leader)
  293. output = nil
  294. time.Sleep(p.conf.Producer.Retry.Backoff)
  295. } else if highWatermark > 0 {
  296. // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
  297. if msg.retries < highWatermark {
  298. // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser)
  299. if msg.flags&chaser == chaser {
  300. retryState[msg.retries].expectChaser = false
  301. } else {
  302. retryState[msg.retries].buf = append(retryState[msg.retries].buf, msg)
  303. }
  304. continue
  305. } else if msg.flags&chaser == chaser {
  306. // this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set,
  307. // meaning this retry level is done and we can go down (at least) one level and flush that
  308. retryState[highWatermark].expectChaser = false
  309. Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition)
  310. for {
  311. highWatermark--
  312. Logger.Printf("producer/leader state change to [flushing-%d] on %s/%d\n", highWatermark, topic, partition)
  313. if output == nil {
  314. if err := breaker.Run(doUpdate); err != nil {
  315. p.returnErrors(retryState[highWatermark].buf, err)
  316. goto flushDone
  317. }
  318. Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
  319. }
  320. for _, msg := range retryState[highWatermark].buf {
  321. output <- msg
  322. }
  323. flushDone:
  324. retryState[highWatermark].buf = nil
  325. if retryState[highWatermark].expectChaser {
  326. Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
  327. break
  328. } else {
  329. Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition)
  330. if highWatermark == 0 {
  331. break
  332. }
  333. }
  334. }
  335. continue
  336. }
  337. }
  338. // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
  339. // without breaking any of our ordering guarantees
  340. if output == nil {
  341. if err := breaker.Run(doUpdate); err != nil {
  342. p.returnError(msg, err)
  343. time.Sleep(p.conf.Producer.Retry.Backoff)
  344. continue
  345. }
  346. Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
  347. }
  348. output <- msg
  349. }
  350. p.unrefBrokerProducer(leader)
  351. p.retries <- &ProducerMessage{flags: unref}
  352. }
  353. // one per broker
  354. // groups messages together into appropriately-sized batches for sending to the broker
  355. // based on https://godoc.org/github.com/eapache/channels#BatchingChannel
  356. func (p *producer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
  357. var ticker *time.Ticker
  358. var timer <-chan time.Time
  359. if p.conf.Producer.Flush.Frequency > 0 {
  360. ticker = time.NewTicker(p.conf.Producer.Flush.Frequency)
  361. timer = ticker.C
  362. }
  363. var buffer []*ProducerMessage
  364. var doFlush chan []*ProducerMessage
  365. var bytesAccumulated int
  366. flusher := make(chan []*ProducerMessage)
  367. go withRecover(func() { p.flusher(broker, flusher) })
  368. for {
  369. select {
  370. case msg := <-input:
  371. if msg == nil {
  372. goto shutdown
  373. }
  374. if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
  375. (p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) ||
  376. (p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) {
  377. Logger.Println("producer/aggregator maximum request accumulated, forcing blocking flush")
  378. flusher <- buffer
  379. buffer = nil
  380. doFlush = nil
  381. bytesAccumulated = 0
  382. }
  383. buffer = append(buffer, msg)
  384. bytesAccumulated += msg.byteSize()
  385. if len(buffer) >= p.conf.Producer.Flush.Messages ||
  386. (p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
  387. doFlush = flusher
  388. }
  389. case <-timer:
  390. doFlush = flusher
  391. case doFlush <- buffer:
  392. buffer = nil
  393. doFlush = nil
  394. bytesAccumulated = 0
  395. }
  396. }
  397. shutdown:
  398. if ticker != nil {
  399. ticker.Stop()
  400. }
  401. if len(buffer) > 0 {
  402. flusher <- buffer
  403. }
  404. close(flusher)
  405. }
  406. // one per broker
  407. // takes a batch at a time from the messageAggregator and sends to the broker
  408. func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
  409. var closing error
  410. currentRetries := make(map[string]map[int32]error)
  411. Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
  412. for batch := range input {
  413. if closing != nil {
  414. p.retryMessages(batch, closing)
  415. continue
  416. }
  417. // group messages by topic/partition
  418. msgSets := make(map[string]map[int32][]*ProducerMessage)
  419. for i, msg := range batch {
  420. if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.partition] != nil {
  421. if msg.flags&chaser == chaser {
  422. // we can start processing this topic/partition again
  423. Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
  424. broker.ID(), msg.Topic, msg.partition)
  425. currentRetries[msg.Topic][msg.partition] = nil
  426. }
  427. p.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.partition])
  428. batch[i] = nil // to prevent it being returned/retried twice
  429. continue
  430. }
  431. partitionSet := msgSets[msg.Topic]
  432. if partitionSet == nil {
  433. partitionSet = make(map[int32][]*ProducerMessage)
  434. msgSets[msg.Topic] = partitionSet
  435. }
  436. partitionSet[msg.partition] = append(partitionSet[msg.partition], msg)
  437. }
  438. request := p.buildRequest(msgSets)
  439. if request == nil {
  440. continue
  441. }
  442. response, err := broker.Produce(request)
  443. switch err.(type) {
  444. case nil:
  445. break
  446. case PacketEncodingError:
  447. p.returnErrors(batch, err)
  448. continue
  449. default:
  450. Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
  451. closing = err
  452. _ = broker.Close()
  453. p.retryMessages(batch, err)
  454. continue
  455. }
  456. if response == nil {
  457. // this only happens when RequiredAcks is NoResponse, so we have to assume success
  458. if p.conf.Producer.Return.Successes {
  459. p.returnSuccesses(batch)
  460. }
  461. continue
  462. }
  463. // we iterate through the blocks in the request, not the response, so that we notice
  464. // if the response is missing a block completely
  465. for topic, partitionSet := range msgSets {
  466. for partition, msgs := range partitionSet {
  467. block := response.GetBlock(topic, partition)
  468. if block == nil {
  469. p.returnErrors(msgs, ErrIncompleteResponse)
  470. continue
  471. }
  472. switch block.Err {
  473. case ErrNoError:
  474. // All the messages for this topic-partition were delivered successfully!
  475. if p.conf.Producer.Return.Successes {
  476. for i := range msgs {
  477. msgs[i].offset = block.Offset + int64(i)
  478. }
  479. p.returnSuccesses(msgs)
  480. }
  481. case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
  482. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  483. Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
  484. broker.ID(), topic, partition, block.Err)
  485. if currentRetries[topic] == nil {
  486. currentRetries[topic] = make(map[int32]error)
  487. }
  488. currentRetries[topic][partition] = block.Err
  489. p.retryMessages(msgs, block.Err)
  490. default:
  491. p.returnErrors(msgs, block.Err)
  492. }
  493. }
  494. }
  495. }
  496. Logger.Printf("producer/flusher/%d shut down\n", broker.ID())
  497. p.retries <- &ProducerMessage{flags: unref}
  498. }
  499. // singleton
  500. // effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
  501. // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
  502. func (p *producer) retryHandler() {
  503. var buf []*ProducerMessage
  504. var msg *ProducerMessage
  505. refs := 0
  506. shuttingDown := false
  507. for {
  508. if len(buf) == 0 {
  509. msg = <-p.retries
  510. } else {
  511. select {
  512. case msg = <-p.retries:
  513. case p.input <- buf[0]:
  514. buf = buf[1:]
  515. continue
  516. }
  517. }
  518. if msg.flags&ref != 0 {
  519. refs++
  520. } else if msg.flags&unref != 0 {
  521. refs--
  522. if refs == 0 && shuttingDown {
  523. break
  524. }
  525. } else if msg.flags&shutdown != 0 {
  526. shuttingDown = true
  527. if refs == 0 {
  528. break
  529. }
  530. } else {
  531. buf = append(buf, msg)
  532. }
  533. }
  534. close(p.retries)
  535. for i := range buf {
  536. p.input <- buf[i]
  537. }
  538. close(p.input)
  539. }
  540. ///////////////////////////////////////////
  541. ///////////////////////////////////////////
  542. // utility functions
  543. func (p *producer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
  544. var partitions []int32
  545. var err error
  546. if partitioner.RequiresConsistency() {
  547. partitions, err = p.client.Partitions(msg.Topic)
  548. } else {
  549. partitions, err = p.client.WritablePartitions(msg.Topic)
  550. }
  551. if err != nil {
  552. return err
  553. }
  554. numPartitions := int32(len(partitions))
  555. if numPartitions == 0 {
  556. return ErrLeaderNotAvailable
  557. }
  558. choice, err := partitioner.Partition(msg, numPartitions)
  559. if err != nil {
  560. return err
  561. } else if choice < 0 || choice >= numPartitions {
  562. return ErrInvalidPartition
  563. }
  564. msg.partition = partitions[choice]
  565. return nil
  566. }
  567. func (p *producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
  568. req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}
  569. empty := true
  570. for topic, partitionSet := range batch {
  571. for partition, msgSet := range partitionSet {
  572. setToSend := new(MessageSet)
  573. setSize := 0
  574. for _, msg := range msgSet {
  575. var keyBytes, valBytes []byte
  576. var err error
  577. if msg.Key != nil {
  578. if keyBytes, err = msg.Key.Encode(); err != nil {
  579. p.returnError(msg, err)
  580. continue
  581. }
  582. }
  583. if msg.Value != nil {
  584. if valBytes, err = msg.Value.Encode(); err != nil {
  585. p.returnError(msg, err)
  586. continue
  587. }
  588. }
  589. if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes {
  590. // compression causes message-sets to be wrapped as single messages, which have tighter
  591. // size requirements, so we have to respect those limits
  592. valBytes, err := encode(setToSend)
  593. if err != nil {
  594. Logger.Println(err) // if this happens, it's basically our fault.
  595. panic(err)
  596. }
  597. req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
  598. setToSend = new(MessageSet)
  599. setSize = 0
  600. }
  601. setSize += msg.byteSize()
  602. setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes})
  603. empty = false
  604. }
  605. if p.conf.Producer.Compression == CompressionNone {
  606. req.AddSet(topic, partition, setToSend)
  607. } else {
  608. valBytes, err := encode(setToSend)
  609. if err != nil {
  610. Logger.Println(err) // if this happens, it's basically our fault.
  611. panic(err)
  612. }
  613. req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
  614. }
  615. }
  616. }
  617. if empty {
  618. return nil
  619. }
  620. return req
  621. }
  622. func (p *producer) returnError(msg *ProducerMessage, err error) {
  623. msg.flags = 0
  624. msg.retries = 0
  625. pErr := &ProducerError{Msg: msg, Err: err}
  626. if p.conf.Producer.Return.Errors {
  627. p.errors <- pErr
  628. } else {
  629. Logger.Println(pErr)
  630. }
  631. }
  632. func (p *producer) returnErrors(batch []*ProducerMessage, err error) {
  633. for _, msg := range batch {
  634. if msg != nil {
  635. p.returnError(msg, err)
  636. }
  637. }
  638. }
  639. func (p *producer) returnSuccesses(batch []*ProducerMessage) {
  640. for _, msg := range batch {
  641. if msg != nil {
  642. msg.flags = 0
  643. p.successes <- msg
  644. }
  645. }
  646. }
  647. func (p *producer) retryMessages(batch []*ProducerMessage, err error) {
  648. for _, msg := range batch {
  649. if msg == nil {
  650. continue
  651. }
  652. if msg.retries >= p.conf.Producer.Retry.Max {
  653. p.returnError(msg, err)
  654. } else {
  655. msg.retries++
  656. p.retries <- msg
  657. }
  658. }
  659. }
  660. type brokerProducer struct {
  661. input chan *ProducerMessage
  662. refs int
  663. }
  664. func (p *producer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
  665. p.brokerLock.Lock()
  666. defer p.brokerLock.Unlock()
  667. producer := p.brokers[broker]
  668. if producer == nil {
  669. p.retries <- &ProducerMessage{flags: ref}
  670. producer = &brokerProducer{
  671. refs: 1,
  672. input: make(chan *ProducerMessage),
  673. }
  674. p.brokers[broker] = producer
  675. go withRecover(func() { p.messageAggregator(broker, producer.input) })
  676. } else {
  677. producer.refs++
  678. }
  679. return producer.input
  680. }
  681. func (p *producer) unrefBrokerProducer(broker *Broker) {
  682. p.brokerLock.Lock()
  683. defer p.brokerLock.Unlock()
  684. producer := p.brokers[broker]
  685. if producer != nil {
  686. producer.refs--
  687. if producer.refs == 0 {
  688. close(producer.input)
  689. delete(p.brokers, broker)
  690. }
  691. }
  692. }