producer.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817
  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/eapache/go-resiliency/breaker"
  7. )
  8. func forceFlushThreshold() int {
  9. return int(MaxRequestSize - (10 * 1024)) // 10KiB is safety room for misc. overhead, we might want to calculate this more precisely?
  10. }
  11. // ProducerConfig is used to pass multiple configuration options to NewProducer.
  12. type ProducerConfig struct {
  13. Partitioner PartitionerConstructor // Generates partitioners for choosing the partition to send messages to (defaults to hash).
  14. RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
  15. Timeout time.Duration // The maximum duration the broker will wait the receipt of the number of RequiredAcks. This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated.
  16. Compression CompressionCodec // The type of compression to use on messages (defaults to no compression).
  17. FlushMsgCount int // The number of messages needed to trigger a flush. This is a minimum, not an upper limit (use MaxMessagesPerReq for that).
  18. FlushFrequency time.Duration // If this amount of time elapses without a flush, one will be queued. This is a minimum, not an upper limit.
  19. FlushByteCount int // If this many bytes of messages are accumulated, a flush will be triggered. This is a minimum, not an upper limit.
  20. AckSuccesses bool // If enabled, successfully delivered messages will be returned on the Successes channel.
  21. MaxMessageBytes int // The maximum permitted size of a message (defaults to 1000000)
  22. MaxMessagesPerReq int // The maximum number of messages the producer will send in a single broker request. Defaults to 0 for unlimited. The global setting MaxRequestSize still applies.
  23. ChannelBufferSize int // The size of the buffers of the channels between the different goroutines. Defaults to 0 (unbuffered).
  24. RetryBackoff time.Duration // The amount of time to wait for the cluster to elect a new leader before processing retries. Defaults to 250ms.
  25. }
  26. // NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
  27. func NewProducerConfig() *ProducerConfig {
  28. return &ProducerConfig{
  29. Partitioner: NewHashPartitioner,
  30. RequiredAcks: WaitForLocal,
  31. MaxMessageBytes: 1000000,
  32. RetryBackoff: 250 * time.Millisecond,
  33. }
  34. }
  35. // Validate checks a ProducerConfig instance. It will return a
  36. // ConfigurationError if the specified value doesn't make sense.
  37. func (config *ProducerConfig) Validate() error {
  38. if config.RequiredAcks < -1 {
  39. return ConfigurationError("Invalid RequiredAcks")
  40. } else if config.RequiredAcks > 1 {
  41. Logger.Println("ProducerConfig.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
  42. }
  43. if config.Timeout < 0 {
  44. return ConfigurationError("Invalid Timeout")
  45. } else if config.Timeout%time.Millisecond != 0 {
  46. Logger.Println("ProducerConfig.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
  47. }
  48. if config.RequiredAcks == WaitForAll && config.Timeout == 0 {
  49. return ConfigurationError("If you WaitForAll you must specify a non-zero timeout to wait.")
  50. }
  51. if config.FlushMsgCount < 0 {
  52. return ConfigurationError("Invalid FlushMsgCount")
  53. }
  54. if config.FlushByteCount < 0 {
  55. return ConfigurationError("Invalid FlushByteCount")
  56. } else if config.FlushByteCount >= forceFlushThreshold() {
  57. Logger.Println("ProducerConfig.FlushByteCount too close to MaxRequestSize; it will be ignored.")
  58. }
  59. if config.FlushFrequency < 0 {
  60. return ConfigurationError("Invalid FlushFrequency")
  61. }
  62. if config.Partitioner == nil {
  63. return ConfigurationError("No partitioner set")
  64. }
  65. if config.MaxMessageBytes <= 0 {
  66. return ConfigurationError("Invalid MaxMessageBytes")
  67. } else if config.MaxMessageBytes >= forceFlushThreshold() {
  68. Logger.Println("ProducerConfig.MaxMessageBytes too close to MaxRequestSize; it will be ignored.")
  69. }
  70. if config.MaxMessagesPerReq < 0 || (config.MaxMessagesPerReq > 0 && config.MaxMessagesPerReq < config.FlushMsgCount) {
  71. return ConfigurationError("Invalid MaxMessagesPerReq, must be non-negative and >= FlushMsgCount if set")
  72. }
  73. if config.RetryBackoff < 0 {
  74. return ConfigurationError("Invalid RetryBackoff")
  75. }
  76. return nil
  77. }
  78. // Producer publishes Kafka messages. It routes messages to the correct broker
  79. // for the provided topic-partition, refreshing metadata as appropriate, and
  80. // parses responses for errors. You must read from the Errors() channel or the
  81. // producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
  82. // leaks: it will not be garbage-collected automatically when it passes out of
  83. // scope (this is in addition to calling Close on the underlying client, which
  84. // is still necessary).
  85. type Producer struct {
  86. client *Client
  87. config ProducerConfig
  88. errors chan *ProduceError
  89. input, successes, retries chan *MessageToSend
  90. brokers map[*Broker]*brokerWorker
  91. brokerLock sync.Mutex
  92. }
  93. // NewProducer creates a new Producer using the given client.
  94. func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
  95. // Check that we are not dealing with a closed Client before processing
  96. // any other arguments
  97. if client.Closed() {
  98. return nil, ClosedClient
  99. }
  100. if config == nil {
  101. config = NewProducerConfig()
  102. }
  103. if err := config.Validate(); err != nil {
  104. return nil, err
  105. }
  106. p := &Producer{
  107. client: client,
  108. config: *config,
  109. errors: make(chan *ProduceError),
  110. input: make(chan *MessageToSend),
  111. successes: make(chan *MessageToSend),
  112. retries: make(chan *MessageToSend),
  113. brokers: make(map[*Broker]*brokerWorker),
  114. }
  115. // launch our singleton dispatchers
  116. go withRecover(p.topicDispatcher)
  117. go withRecover(p.retryHandler)
  118. return p, nil
  119. }
  120. type flagSet int8
  121. const (
  122. retried flagSet = 1 << iota // message has been retried
  123. chaser // message is last in a group that failed
  124. ref // add a reference to a singleton channel
  125. unref // remove a reference from a singleton channel
  126. shutdown // start the shutdown process
  127. )
  128. // MessageToSend is the collection of elements passed to the Producer in order to send a message.
  129. type MessageToSend struct {
  130. Topic string // The Kafka topic for this message.
  131. Key Encoder // The partitioning key for this message. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
  132. Value Encoder // The actual message to store in Kafka. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
  133. Metadata interface{} // This field is used to hold arbitrary data you wish to include so it will be available when receiving on the Successes and Errors channels. Sarama completely ignores this field and is only to be used for pass-through data.
  134. // these are filled in by the producer as the message is processed
  135. offset int64
  136. partition int32
  137. flags flagSet
  138. }
  139. // Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if
  140. // the message was successfully delivered and RequiredAcks is not NoResponse.
  141. func (m *MessageToSend) Offset() int64 {
  142. return m.offset
  143. }
  144. // Partition is the partition that the message was sent to. This is only guaranteed to be defined if
  145. // the message was successfully delivered.
  146. func (m *MessageToSend) Partition() int32 {
  147. return m.partition
  148. }
  149. func (m *MessageToSend) byteSize() int {
  150. size := 26 // the metadata overhead of CRC, flags, etc.
  151. if m.Key != nil {
  152. size += m.Key.Length()
  153. }
  154. if m.Value != nil {
  155. size += m.Value.Length()
  156. }
  157. return size
  158. }
  159. // ProduceError is the type of error generated when the producer fails to deliver a message.
  160. // It contains the original MessageToSend as well as the actual error value.
  161. type ProduceError struct {
  162. Msg *MessageToSend
  163. Err error
  164. }
  165. // ProduceErrors is a type that wraps a batch of "ProduceError"s and implements the Error interface.
  166. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
  167. // when closing a producer.
  168. type ProduceErrors []*ProduceError
  169. func (pe ProduceErrors) Error() string {
  170. return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
  171. }
  172. // Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock.
  173. // It is suggested that you send messages and read errors together in a single select statement.
  174. func (p *Producer) Errors() <-chan *ProduceError {
  175. return p.errors
  176. }
  177. // Successes is the success output channel back to the user when AckSuccesses is configured.
  178. // If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock.
  179. // It is suggested that you send and read messages together in a single select statement.
  180. func (p *Producer) Successes() <-chan *MessageToSend {
  181. return p.successes
  182. }
  183. // Input is the input channel for the user to write messages to that they wish to send.
  184. func (p *Producer) Input() chan<- *MessageToSend {
  185. return p.input
  186. }
  187. // Close shuts down the producer and flushes any messages it may have buffered.
  188. // You must call this function before a producer object passes out of scope, as
  189. // it may otherwise leak memory. You must call this before calling Close on the
  190. // underlying client.
  191. func (p *Producer) Close() error {
  192. p.AsyncClose()
  193. if p.config.AckSuccesses {
  194. go withRecover(func() {
  195. for _ = range p.successes {
  196. }
  197. })
  198. }
  199. var errors ProduceErrors
  200. for event := range p.errors {
  201. errors = append(errors, event)
  202. }
  203. if len(errors) > 0 {
  204. return errors
  205. }
  206. return nil
  207. }
  208. // AsyncClose triggers a shutdown of the producer, flushing any messages it may have
  209. // buffered. The shutdown has completed when both the Errors and Successes channels
  210. // have been closed. When calling AsyncClose, you *must* continue to read from those
  211. // channels in order to drain the results of any messages in flight.
  212. func (p *Producer) AsyncClose() {
  213. go withRecover(func() {
  214. p.input <- &MessageToSend{flags: shutdown}
  215. })
  216. }
  217. ///////////////////////////////////////////
  218. // In normal processing, a message flows through the following functions from top to bottom,
  219. // starting at topicDispatcher (which reads from Producer.input) and ending in flusher
  220. // (which sends the message to the broker). In cases where a message must be retried, it goes
  221. // through retryHandler before being returned to the top of the flow.
  222. ///////////////////////////////////////////
  223. // singleton
  224. // dispatches messages by topic
  225. func (p *Producer) topicDispatcher() {
  226. handlers := make(map[string]chan *MessageToSend)
  227. for msg := range p.input {
  228. if msg == nil {
  229. Logger.Println("Something tried to send a nil message, it was ignored.")
  230. continue
  231. }
  232. if msg.flags&shutdown != 0 {
  233. Logger.Println("Producer shutting down.")
  234. break
  235. }
  236. if (p.config.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.config.MaxMessageBytes) ||
  237. (msg.byteSize() > p.config.MaxMessageBytes) {
  238. p.returnError(msg, MessageSizeTooLarge)
  239. continue
  240. }
  241. handler := handlers[msg.Topic]
  242. if handler == nil {
  243. p.retries <- &MessageToSend{flags: ref}
  244. newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
  245. topic := msg.Topic // block local because go's closure semantics suck
  246. go withRecover(func() { p.partitionDispatcher(topic, newHandler) })
  247. handler = newHandler
  248. handlers[msg.Topic] = handler
  249. }
  250. handler <- msg
  251. }
  252. for _, handler := range handlers {
  253. close(handler)
  254. }
  255. p.retries <- &MessageToSend{flags: shutdown}
  256. for msg := range p.input {
  257. p.returnError(msg, ShuttingDown)
  258. }
  259. close(p.errors)
  260. close(p.successes)
  261. }
  262. // one per topic
  263. // partitions messages, then dispatches them by partition
  264. func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend) {
  265. handlers := make(map[int32]chan *MessageToSend)
  266. partitioner := p.config.Partitioner()
  267. for msg := range input {
  268. if msg.flags&retried == 0 {
  269. err := p.assignPartition(partitioner, msg)
  270. if err != nil {
  271. p.returnError(msg, err)
  272. continue
  273. }
  274. }
  275. handler := handlers[msg.partition]
  276. if handler == nil {
  277. p.retries <- &MessageToSend{flags: ref}
  278. newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
  279. topic := msg.Topic // block local because go's closure semantics suck
  280. partition := msg.partition // block local because go's closure semantics suck
  281. go withRecover(func() { p.leaderDispatcher(topic, partition, newHandler) })
  282. handler = newHandler
  283. handlers[msg.partition] = handler
  284. }
  285. handler <- msg
  286. }
  287. for _, handler := range handlers {
  288. close(handler)
  289. }
  290. p.retries <- &MessageToSend{flags: unref}
  291. }
  292. // one per partition per topic
  293. // dispatches messages to the appropriate broker
  294. // also responsible for maintaining message order during retries
  295. func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *MessageToSend) {
  296. var leader *Broker
  297. var output chan *MessageToSend
  298. var backlog []*MessageToSend
  299. breaker := breaker.New(3, 1, 10*time.Second)
  300. doUpdate := func() (err error) {
  301. if err = p.client.RefreshTopicMetadata(topic); err != nil {
  302. return err
  303. }
  304. if leader, err = p.client.Leader(topic, partition); err != nil {
  305. return err
  306. }
  307. output = p.getBrokerWorker(leader)
  308. return nil
  309. }
  310. // try to prefetch the leader; if this doesn't work, we'll do a proper breaker-protected refresh-and-fetch
  311. // on the first message
  312. leader, _ = p.client.Leader(topic, partition)
  313. if leader != nil {
  314. output = p.getBrokerWorker(leader)
  315. }
  316. for msg := range input {
  317. if msg.flags&retried == 0 {
  318. // normal case
  319. if backlog != nil {
  320. backlog = append(backlog, msg)
  321. continue
  322. }
  323. } else if msg.flags&chaser == 0 {
  324. // retry flag set, chaser flag not set
  325. if backlog == nil {
  326. // on the very first retried message we send off a chaser so that we know when everything "in between" has made it
  327. // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
  328. Logger.Printf("producer/leader state change to [retrying] on %s/%d\n", topic, partition)
  329. output <- &MessageToSend{Topic: topic, partition: partition, flags: chaser}
  330. backlog = make([]*MessageToSend, 0)
  331. p.unrefBrokerWorker(leader)
  332. output = nil
  333. time.Sleep(p.config.RetryBackoff)
  334. }
  335. } else {
  336. // retry *and* chaser flag set, flush the backlog and return to normal processing
  337. Logger.Printf("producer/leader state change to [flushing] on %s/%d\n", topic, partition)
  338. if output == nil {
  339. if err := breaker.Run(doUpdate); err != nil {
  340. p.returnErrors(backlog, err)
  341. backlog = nil
  342. continue
  343. }
  344. }
  345. for _, msg := range backlog {
  346. output <- msg
  347. }
  348. Logger.Printf("producer/leader state change to [normal] on %s/%d\n", topic, partition)
  349. backlog = nil
  350. continue
  351. }
  352. if output == nil {
  353. if err := breaker.Run(doUpdate); err != nil {
  354. p.returnError(msg, err)
  355. continue
  356. }
  357. }
  358. output <- msg
  359. }
  360. p.unrefBrokerWorker(leader)
  361. p.retries <- &MessageToSend{flags: unref}
  362. }
  363. // one per broker
  364. // groups messages together into appropriately-sized batches for sending to the broker
  365. // based on https://godoc.org/github.com/eapache/channels#BatchingChannel
  366. func (p *Producer) messageAggregator(broker *Broker, input chan *MessageToSend) {
  367. var ticker *time.Ticker
  368. var timer <-chan time.Time
  369. if p.config.FlushFrequency > 0 {
  370. ticker = time.NewTicker(p.config.FlushFrequency)
  371. timer = ticker.C
  372. }
  373. var buffer []*MessageToSend
  374. var doFlush chan []*MessageToSend
  375. var bytesAccumulated int
  376. flusher := make(chan []*MessageToSend)
  377. go withRecover(func() { p.flusher(broker, flusher) })
  378. for {
  379. select {
  380. case msg := <-input:
  381. if msg == nil {
  382. goto shutdown
  383. }
  384. if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
  385. (p.config.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.config.MaxMessageBytes) ||
  386. (p.config.MaxMessagesPerReq > 0 && len(buffer) >= p.config.MaxMessagesPerReq) {
  387. Logger.Println("producer/aggregator maximum request accumulated, forcing blocking flush")
  388. flusher <- buffer
  389. buffer = nil
  390. doFlush = nil
  391. bytesAccumulated = 0
  392. }
  393. buffer = append(buffer, msg)
  394. bytesAccumulated += msg.byteSize()
  395. if len(buffer) >= p.config.FlushMsgCount ||
  396. (p.config.FlushByteCount > 0 && bytesAccumulated >= p.config.FlushByteCount) {
  397. doFlush = flusher
  398. }
  399. case <-timer:
  400. doFlush = flusher
  401. case doFlush <- buffer:
  402. buffer = nil
  403. doFlush = nil
  404. bytesAccumulated = 0
  405. }
  406. }
  407. shutdown:
  408. if ticker != nil {
  409. ticker.Stop()
  410. }
  411. if len(buffer) > 0 {
  412. flusher <- buffer
  413. }
  414. close(flusher)
  415. }
  416. // one per broker
  417. // takes a batch at a time from the messageAggregator and sends to the broker
  418. func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
  419. var closing error
  420. currentRetries := make(map[string]map[int32]error)
  421. for batch := range input {
  422. if closing != nil {
  423. p.retryMessages(batch, closing)
  424. continue
  425. }
  426. // group messages by topic/partition
  427. msgSets := make(map[string]map[int32][]*MessageToSend)
  428. for i, msg := range batch {
  429. if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.partition] != nil {
  430. if msg.flags&chaser == chaser {
  431. // we can start processing this topic/partition again
  432. Logger.Printf("producer/flusher state change to [normal] on %s/%d\n",
  433. msg.Topic, msg.partition)
  434. currentRetries[msg.Topic][msg.partition] = nil
  435. }
  436. p.retryMessages([]*MessageToSend{msg}, currentRetries[msg.Topic][msg.partition])
  437. batch[i] = nil // to prevent it being returned/retried twice
  438. continue
  439. }
  440. partitionSet := msgSets[msg.Topic]
  441. if partitionSet == nil {
  442. partitionSet = make(map[int32][]*MessageToSend)
  443. msgSets[msg.Topic] = partitionSet
  444. }
  445. partitionSet[msg.partition] = append(partitionSet[msg.partition], msg)
  446. }
  447. request := p.buildRequest(msgSets)
  448. if request == nil {
  449. continue
  450. }
  451. response, err := broker.Produce(p.client.id, request)
  452. switch err {
  453. case nil:
  454. break
  455. case EncodingError:
  456. p.returnErrors(batch, err)
  457. continue
  458. default:
  459. p.client.disconnectBroker(broker)
  460. Logger.Println("producer/flusher state change to [closing] because", err)
  461. closing = err
  462. p.retryMessages(batch, err)
  463. continue
  464. }
  465. if response == nil {
  466. // this only happens when RequiredAcks is NoResponse, so we have to assume success
  467. if p.config.AckSuccesses {
  468. p.returnSuccesses(batch)
  469. }
  470. continue
  471. }
  472. // we iterate through the blocks in the request, not the response, so that we notice
  473. // if the response is missing a block completely
  474. for topic, partitionSet := range msgSets {
  475. for partition, msgs := range partitionSet {
  476. block := response.GetBlock(topic, partition)
  477. if block == nil {
  478. p.returnErrors(msgs, IncompleteResponse)
  479. continue
  480. }
  481. switch block.Err {
  482. case NoError:
  483. // All the messages for this topic-partition were delivered successfully!
  484. if p.config.AckSuccesses {
  485. for i := range msgs {
  486. msgs[i].offset = block.Offset + int64(i)
  487. }
  488. p.returnSuccesses(msgs)
  489. }
  490. case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
  491. Logger.Printf("producer/flusher state change to [retrying] on %s/%d because %v\n",
  492. topic, partition, block.Err)
  493. if currentRetries[topic] == nil {
  494. currentRetries[topic] = make(map[int32]error)
  495. }
  496. currentRetries[topic][partition] = block.Err
  497. p.retryMessages(msgs, block.Err)
  498. default:
  499. p.returnErrors(msgs, block.Err)
  500. }
  501. }
  502. }
  503. }
  504. p.retries <- &MessageToSend{flags: unref}
  505. }
  506. // singleton
  507. // effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
  508. // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
  509. func (p *Producer) retryHandler() {
  510. var buf []*MessageToSend
  511. var msg *MessageToSend
  512. refs := 0
  513. shuttingDown := false
  514. for {
  515. if len(buf) == 0 {
  516. msg = <-p.retries
  517. } else {
  518. select {
  519. case msg = <-p.retries:
  520. case p.input <- buf[0]:
  521. buf = buf[1:]
  522. continue
  523. }
  524. }
  525. if msg.flags&ref != 0 {
  526. refs++
  527. } else if msg.flags&unref != 0 {
  528. refs--
  529. if refs == 0 && shuttingDown {
  530. break
  531. }
  532. } else if msg.flags&shutdown != 0 {
  533. shuttingDown = true
  534. if refs == 0 {
  535. break
  536. }
  537. } else {
  538. buf = append(buf, msg)
  539. }
  540. }
  541. close(p.retries)
  542. for i := range buf {
  543. p.input <- buf[i]
  544. }
  545. close(p.input)
  546. }
  547. ///////////////////////////////////////////
  548. ///////////////////////////////////////////
  549. // utility functions
  550. func (p *Producer) assignPartition(partitioner Partitioner, msg *MessageToSend) error {
  551. var partitions []int32
  552. var err error
  553. if partitioner.RequiresConsistency() {
  554. partitions, err = p.client.Partitions(msg.Topic)
  555. } else {
  556. partitions, err = p.client.WritablePartitions(msg.Topic)
  557. }
  558. if err != nil {
  559. return err
  560. }
  561. numPartitions := int32(len(partitions))
  562. if numPartitions == 0 {
  563. return LeaderNotAvailable
  564. }
  565. choice, err := partitioner.Partition(msg.Key, numPartitions)
  566. if err != nil {
  567. return err
  568. } else if choice < 0 || choice >= numPartitions {
  569. return InvalidPartition
  570. }
  571. msg.partition = partitions[choice]
  572. return nil
  573. }
  574. func (p *Producer) buildRequest(batch map[string]map[int32][]*MessageToSend) *ProduceRequest {
  575. req := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: int32(p.config.Timeout / time.Millisecond)}
  576. empty := true
  577. for topic, partitionSet := range batch {
  578. for partition, msgSet := range partitionSet {
  579. setToSend := new(MessageSet)
  580. setSize := 0
  581. for _, msg := range msgSet {
  582. var keyBytes, valBytes []byte
  583. var err error
  584. if msg.Key != nil {
  585. if keyBytes, err = msg.Key.Encode(); err != nil {
  586. p.returnError(msg, err)
  587. continue
  588. }
  589. }
  590. if msg.Value != nil {
  591. if valBytes, err = msg.Value.Encode(); err != nil {
  592. p.returnError(msg, err)
  593. continue
  594. }
  595. }
  596. if p.config.Compression != CompressionNone && setSize+msg.byteSize() > p.config.MaxMessageBytes {
  597. // compression causes message-sets to be wrapped as single messages, which have tighter
  598. // size requirements, so we have to respect those limits
  599. valBytes, err := encode(setToSend)
  600. if err != nil {
  601. Logger.Println(err) // if this happens, it's basically our fault.
  602. panic(err)
  603. }
  604. req.AddMessage(topic, partition, &Message{Codec: p.config.Compression, Key: nil, Value: valBytes})
  605. setToSend = new(MessageSet)
  606. setSize = 0
  607. }
  608. setSize += msg.byteSize()
  609. setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes})
  610. empty = false
  611. }
  612. if p.config.Compression == CompressionNone {
  613. req.AddSet(topic, partition, setToSend)
  614. } else {
  615. valBytes, err := encode(setToSend)
  616. if err != nil {
  617. Logger.Println(err) // if this happens, it's basically our fault.
  618. panic(err)
  619. }
  620. req.AddMessage(topic, partition, &Message{Codec: p.config.Compression, Key: nil, Value: valBytes})
  621. }
  622. }
  623. }
  624. if empty {
  625. return nil
  626. }
  627. return req
  628. }
  629. func (p *Producer) returnError(msg *MessageToSend, err error) {
  630. msg.flags = 0
  631. p.errors <- &ProduceError{Msg: msg, Err: err}
  632. }
  633. func (p *Producer) returnErrors(batch []*MessageToSend, err error) {
  634. for _, msg := range batch {
  635. if msg != nil {
  636. p.returnError(msg, err)
  637. }
  638. }
  639. }
  640. func (p *Producer) returnSuccesses(batch []*MessageToSend) {
  641. for _, msg := range batch {
  642. if msg != nil {
  643. msg.flags = 0
  644. p.successes <- msg
  645. }
  646. }
  647. }
  648. func (p *Producer) retryMessages(batch []*MessageToSend, err error) {
  649. for _, msg := range batch {
  650. if msg == nil {
  651. continue
  652. }
  653. if msg.flags&retried == retried {
  654. p.returnError(msg, err)
  655. } else {
  656. msg.flags |= retried
  657. p.retries <- msg
  658. }
  659. }
  660. }
  661. type brokerWorker struct {
  662. input chan *MessageToSend
  663. refs int
  664. }
  665. func (p *Producer) getBrokerWorker(broker *Broker) chan *MessageToSend {
  666. p.brokerLock.Lock()
  667. defer p.brokerLock.Unlock()
  668. worker := p.brokers[broker]
  669. if worker == nil {
  670. p.retries <- &MessageToSend{flags: ref}
  671. worker = &brokerWorker{
  672. refs: 1,
  673. input: make(chan *MessageToSend),
  674. }
  675. p.brokers[broker] = worker
  676. go withRecover(func() { p.messageAggregator(broker, worker.input) })
  677. } else {
  678. worker.refs++
  679. }
  680. return worker.input
  681. }
  682. func (p *Producer) unrefBrokerWorker(broker *Broker) {
  683. p.brokerLock.Lock()
  684. defer p.brokerLock.Unlock()
  685. worker := p.brokers[broker]
  686. if worker != nil {
  687. worker.refs--
  688. if worker.refs == 0 {
  689. close(worker.input)
  690. delete(p.brokers, broker)
  691. }
  692. }
  693. }