multiproducer.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. package sarama
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. type MultiProducerConfig struct {
  7. Partitioner Partitioner
  8. RequiredAcks RequiredAcks
  9. Timeout int32
  10. Compression CompressionCodec
  11. MaxBufferBytes uint32
  12. MaxBufferTime uint32
  13. MaxDeliveryRetries uint32
  14. }
  15. type MultiProducer struct {
  16. client *Client
  17. config MultiProducerConfig
  18. brokerProducers map[*Broker]*brokerProducer
  19. m sync.RWMutex
  20. errors chan error
  21. deliveryLocks map[topicPartition]chan bool
  22. dm sync.RWMutex
  23. }
  24. type brokerProducer struct {
  25. mapM sync.Mutex
  26. messages map[string]map[int32][]*produceMessage
  27. bufferedBytes uint32
  28. flushNow chan bool
  29. broker *Broker
  30. stopper chan bool
  31. hasMessages chan bool
  32. }
  33. type produceMessage struct {
  34. topic string
  35. partition int32
  36. key, value []byte
  37. failures uint32
  38. }
  39. type topicPartition struct {
  40. topic string
  41. partition int32
  42. }
  43. func NewMultiProducer(client *Client, config *MultiProducerConfig) (*MultiProducer, error) {
  44. if config == nil {
  45. config = new(MultiProducerConfig)
  46. }
  47. if config.RequiredAcks < -1 {
  48. return nil, ConfigurationError("Invalid RequiredAcks")
  49. }
  50. if config.Timeout < 0 {
  51. return nil, ConfigurationError("Invalid Timeout")
  52. }
  53. if config.Partitioner == nil {
  54. config.Partitioner = NewRandomPartitioner()
  55. }
  56. if config.MaxBufferBytes == 0 {
  57. config.MaxBufferBytes = 1
  58. }
  59. return &MultiProducer{
  60. client: client,
  61. config: *config,
  62. errors: make(chan error, 16),
  63. deliveryLocks: make(map[topicPartition]chan bool),
  64. brokerProducers: make(map[*Broker]*brokerProducer),
  65. }, nil
  66. }
  67. func (p *MultiProducer) Errors() chan error {
  68. if p.isSynchronous() {
  69. panic("use of Errors() is not permitted in synchronous mode.")
  70. } else {
  71. return p.errors
  72. }
  73. }
  74. func (p *MultiProducer) Close() error {
  75. return nil
  76. }
  77. func (p *MultiProducer) SendMessage(topic string, key, value Encoder) (err error) {
  78. var keyBytes, valBytes []byte
  79. if key != nil {
  80. if keyBytes, err = key.Encode(); err != nil {
  81. return err
  82. }
  83. }
  84. if value != nil {
  85. if valBytes, err = value.Encode(); err != nil {
  86. return err
  87. }
  88. }
  89. partition, err := p.choosePartition(topic, key)
  90. if err != nil {
  91. return err
  92. }
  93. msg := &produceMessage{
  94. topic: topic,
  95. partition: partition,
  96. key: keyBytes,
  97. value: valBytes,
  98. failures: 0,
  99. }
  100. return p.addMessage(msg, false)
  101. }
  102. func (p *MultiProducer) choosePartition(topic string, key Encoder) (int32, error) {
  103. partitions, err := p.client.Partitions(topic)
  104. if err != nil {
  105. return -1, err
  106. }
  107. numPartitions := int32(len(partitions))
  108. choice := p.config.Partitioner.Partition(key, numPartitions)
  109. if choice < 0 || choice >= numPartitions {
  110. return -1, InvalidPartition
  111. }
  112. return partitions[choice], nil
  113. }
  114. func (p *MultiProducer) addMessage(msg *produceMessage, isRetry bool) error {
  115. broker, err := p.client.Leader(msg.topic, msg.partition)
  116. if err != nil {
  117. return err
  118. }
  119. bp := p.brokerProducerFor(broker)
  120. bp.addMessage(msg, p.config.MaxBufferBytes, isRetry)
  121. if p.isSynchronous() {
  122. return <-p.errors
  123. }
  124. return nil
  125. }
  126. func (p *MultiProducer) isSynchronous() bool {
  127. return p.config.MaxBufferBytes < 2 && p.config.MaxBufferTime == 0
  128. }
  129. func (p *MultiProducer) brokerProducerFor(broker *Broker) *brokerProducer {
  130. p.m.RLock()
  131. bp, ok := p.brokerProducers[broker]
  132. p.m.RUnlock()
  133. if !ok {
  134. p.m.Lock()
  135. bp, ok = p.brokerProducers[broker]
  136. if !ok {
  137. bp = p.newBrokerProducer(broker)
  138. p.brokerProducers[broker] = bp
  139. }
  140. p.m.Unlock()
  141. }
  142. return bp
  143. }
  144. func (p *MultiProducer) newBrokerProducer(broker *Broker) *brokerProducer {
  145. bp := &brokerProducer{
  146. messages: make(map[string]map[int32][]*produceMessage),
  147. flushNow: make(chan bool, 1),
  148. broker: broker,
  149. stopper: make(chan bool),
  150. hasMessages: make(chan bool, 1),
  151. }
  152. maxBufferTime := time.Duration(p.config.MaxBufferTime) * time.Millisecond
  153. var wg sync.WaitGroup
  154. wg.Add(1)
  155. go func() {
  156. timer := time.NewTimer(maxBufferTime)
  157. wg.Done()
  158. for {
  159. select {
  160. case <-bp.flushNow:
  161. bp.flush(p)
  162. case <-timer.C:
  163. bp.flush(p)
  164. case <-bp.stopper:
  165. p.m.Lock()
  166. delete(p.brokerProducers, bp.broker)
  167. p.m.Unlock()
  168. bp.flush(p)
  169. p.client.disconnectBroker(bp.broker)
  170. close(bp.flushNow)
  171. close(bp.hasMessages)
  172. return
  173. }
  174. timer.Reset(maxBufferTime)
  175. }
  176. }()
  177. wg.Wait()
  178. return bp
  179. }
  180. func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32, isRetry bool) {
  181. bp.mapM.Lock()
  182. forTopic, ok := bp.messages[msg.topic]
  183. if !ok {
  184. forTopic = make(map[int32][]*produceMessage)
  185. bp.messages[msg.topic] = forTopic
  186. }
  187. if isRetry {
  188. // Prepend: Deliver first.
  189. forTopic[msg.partition] = append([]*produceMessage{msg}, forTopic[msg.partition]...)
  190. } else {
  191. // Append
  192. forTopic[msg.partition] = append(forTopic[msg.partition], msg)
  193. }
  194. bp.bufferedBytes += uint32(len(msg.key) + len(msg.value))
  195. select {
  196. case bp.hasMessages <- true:
  197. default:
  198. }
  199. bp.mapM.Unlock()
  200. if bp.bufferedBytes > maxBufferBytes {
  201. // TODO: decrement this later on
  202. bp.tryFlush()
  203. }
  204. }
  205. func (bp *brokerProducer) tryFlush() {
  206. select {
  207. case bp.flushNow <- true:
  208. default:
  209. }
  210. }
  211. func (bp *brokerProducer) flush(p *MultiProducer) {
  212. // try to acquire delivery locks for each topic-partition involved.
  213. var messagesToSend []*produceMessage
  214. <-bp.hasMessages // wait for a message if the BP currently has none.
  215. bp.mapM.Lock()
  216. for topic, m := range bp.messages {
  217. for partition, messages := range m {
  218. if p.tryAcquireDeliveryLock(topic, partition) {
  219. messagesToSend = append(messagesToSend, messages...)
  220. m[partition] = nil
  221. }
  222. }
  223. }
  224. bp.mapM.Unlock()
  225. go bp.flushMessages(p, messagesToSend)
  226. }
  227. func (bp *brokerProducer) flushMessages(p *MultiProducer, messages []*produceMessage) {
  228. if len(messages) == 0 {
  229. return
  230. }
  231. req := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: p.config.Timeout}
  232. for _, pmsg := range messages {
  233. msg := &Message{Codec: p.config.Compression, Key: pmsg.key, Value: pmsg.value}
  234. req.AddMessage(pmsg.topic, pmsg.partition, msg)
  235. }
  236. bp.flushRequest(p, req, messages)
  237. }
  238. func (bp *brokerProducer) Close() error {
  239. close(bp.stopper)
  240. return nil
  241. }
  242. func (bp *brokerProducer) flushRequest(p *MultiProducer, request *ProduceRequest, messages []*produceMessage) {
  243. response, err := bp.broker.Produce(p.client.id, request)
  244. switch err {
  245. case nil:
  246. break
  247. case EncodingError:
  248. // No sense in retrying; it'll just fail again. But what about all the other
  249. // messages that weren't invalid? Really, this is a "shit's broke real good"
  250. // scenario, so angrily logging it and moving on is probably acceptable.
  251. p.errors <- err
  252. goto releaseAllLocks
  253. default:
  254. // TODO: Now we have to sift through the messages and determine which should be retried.
  255. p.client.disconnectBroker(bp.broker)
  256. bp.Close()
  257. // ie. for msg := range reverse(messages)
  258. for i := len(messages) - 1; i >= 0; i-- {
  259. msg := messages[i]
  260. if msg.failures < p.config.MaxDeliveryRetries {
  261. msg.failures++
  262. // Passing isRetry=true causes the message to happen before other queued messages.
  263. // This is also why we have to iterate backwards through the failed messages --
  264. // to preserve ordering, we have to prepend the items starting from the last one.
  265. p.addMessage(msg, true)
  266. } else {
  267. // log about message failing too many times?
  268. }
  269. }
  270. goto releaseAllLocks
  271. }
  272. // When does this ever actually happen, and why don't we explode when it does?
  273. // This seems bad.
  274. if response == nil {
  275. p.errors <- nil
  276. goto releaseAllLocks
  277. }
  278. for topic, d := range response.Blocks {
  279. for partition, block := range d {
  280. if block == nil {
  281. // IncompleteResponse. Here we just drop all the messages; we don't know whether
  282. // they were successfully sent or not. Non-ideal, but how often does it happen?
  283. // Log angrily.
  284. }
  285. switch block.Err {
  286. case NoError:
  287. // All the messages for this topic-partition were delivered successfully!
  288. // Unlock delivery for this topic-partition and discard the produceMessage objects.
  289. p.errors <- nil
  290. case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
  291. // TODO: should we refresh metadata for this topic?
  292. // ie. for msg := range reverse(messages)
  293. for i := len(messages) - 1; i >= 0; i-- {
  294. msg := messages[i]
  295. if msg.topic == topic && msg.partition == partition {
  296. if msg.failures < p.config.MaxDeliveryRetries {
  297. msg.failures++
  298. // Passing isRetry=true causes the message to happen before other queued messages.
  299. // This is also why we have to iterate backwards through the failed messages --
  300. // to preserve ordering, we have to prepend the items starting from the last one.
  301. p.addMessage(msg, true)
  302. } else {
  303. // dropping message; log angrily maybe.
  304. }
  305. }
  306. }
  307. default:
  308. // non-retriable error. Drop the messages and log angrily.
  309. }
  310. p.releaseDeliveryLock(topic, partition)
  311. }
  312. }
  313. return
  314. releaseAllLocks:
  315. // This is slow, but only happens on rare error conditions.
  316. tps := make(map[string]map[int32]bool)
  317. for _, msg := range messages {
  318. forTopic, ok := tps[msg.topic]
  319. if !ok {
  320. forTopic = make(map[int32]bool)
  321. tps[msg.topic] = forTopic
  322. }
  323. forTopic[msg.partition] = true
  324. }
  325. for topic, d := range tps {
  326. for partition := range d {
  327. p.releaseDeliveryLock(topic, partition)
  328. }
  329. }
  330. }
  331. func (p *MultiProducer) tryAcquireDeliveryLock(topic string, partition int32) bool {
  332. tp := topicPartition{topic, partition}
  333. p.dm.RLock()
  334. ch, ok := p.deliveryLocks[tp]
  335. p.dm.RUnlock()
  336. if !ok {
  337. p.dm.Lock()
  338. ch, ok = p.deliveryLocks[tp]
  339. if !ok {
  340. ch = make(chan bool, 1)
  341. p.deliveryLocks[tp] = ch
  342. }
  343. p.dm.Unlock()
  344. }
  345. select {
  346. case ch <- true:
  347. return true
  348. default:
  349. return false
  350. }
  351. }
  352. func (p *MultiProducer) releaseDeliveryLock(topic string, partition int32) {
  353. p.dm.RLock()
  354. ch := p.deliveryLocks[topicPartition{topic, partition}]
  355. p.dm.RUnlock()
  356. select {
  357. case <-ch:
  358. default:
  359. panic("Serious logic bug: releaseDeliveryLock called without acquiring lock first.")
  360. }
  361. }