produce_request.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. package sarama
  2. import "github.com/rcrowley/go-metrics"
  3. // RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements
  4. // it must see before responding. Any of the constants defined here are valid. On broker versions
  5. // prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many
  6. // acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced
  7. // by setting the `min.isr` value in the brokers configuration).
  8. type RequiredAcks int16
  9. const (
  10. // NoResponse doesn't send any response, the TCP ACK is all you get.
  11. NoResponse RequiredAcks = 0
  12. // WaitForLocal waits for only the local commit to succeed before responding.
  13. WaitForLocal RequiredAcks = 1
  14. // WaitForAll waits for all in-sync replicas to commit before responding.
  15. // The minimum number of in-sync replicas is configured on the broker via
  16. // the `min.insync.replicas` configuration key.
  17. WaitForAll RequiredAcks = -1
  18. )
  19. type ProduceRequest struct {
  20. RequiredAcks RequiredAcks
  21. Timeout int32
  22. Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10
  23. msgSets map[string]map[int32]*MessageSet
  24. }
  25. func (r *ProduceRequest) encode(pe packetEncoder) error {
  26. pe.putInt16(int16(r.RequiredAcks))
  27. pe.putInt32(r.Timeout)
  28. err := pe.putArrayLength(len(r.msgSets))
  29. if err != nil {
  30. return err
  31. }
  32. metricRegistry := pe.metricRegistry()
  33. var batchSizeMetric metrics.Histogram
  34. var compressionRatioMetric metrics.Histogram
  35. if metricRegistry != nil {
  36. batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry)
  37. compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry)
  38. }
  39. totalRecordCount := int64(0)
  40. for topic, partitions := range r.msgSets {
  41. err = pe.putString(topic)
  42. if err != nil {
  43. return err
  44. }
  45. err = pe.putArrayLength(len(partitions))
  46. if err != nil {
  47. return err
  48. }
  49. topicRecordCount := int64(0)
  50. var topicCompressionRatioMetric metrics.Histogram
  51. if metricRegistry != nil {
  52. topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry)
  53. }
  54. for id, msgSet := range partitions {
  55. startOffset := pe.offset()
  56. pe.putInt32(id)
  57. pe.push(&lengthField{})
  58. err = msgSet.encode(pe)
  59. if err != nil {
  60. return err
  61. }
  62. err = pe.pop()
  63. if err != nil {
  64. return err
  65. }
  66. if metricRegistry != nil {
  67. for _, messageBlock := range msgSet.Messages {
  68. // Is this a fake "message" wrapping real messages?
  69. if messageBlock.Msg.Set != nil {
  70. topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
  71. } else {
  72. // A single uncompressed message
  73. topicRecordCount++
  74. }
  75. // Better be safe than sorry when computing the compression ratio
  76. if messageBlock.Msg.compressedSize != 0 {
  77. compressionRatio := float64(len(messageBlock.Msg.Value)) /
  78. float64(messageBlock.Msg.compressedSize)
  79. // Histogram do not support decimal values, let's multiple it by 100 for better precision
  80. intCompressionRatio := int64(100 * compressionRatio)
  81. compressionRatioMetric.Update(intCompressionRatio)
  82. topicCompressionRatioMetric.Update(intCompressionRatio)
  83. }
  84. }
  85. batchSize := int64(pe.offset() - startOffset)
  86. batchSizeMetric.Update(batchSize)
  87. getOrRegisterTopicHistogram("batch-size", topic, metricRegistry).Update(batchSize)
  88. }
  89. }
  90. if topicRecordCount > 0 {
  91. getOrRegisterTopicMeter("record-send-rate", topic, metricRegistry).Mark(topicRecordCount)
  92. getOrRegisterTopicHistogram("records-per-request", topic, metricRegistry).Update(topicRecordCount)
  93. totalRecordCount += topicRecordCount
  94. }
  95. }
  96. if totalRecordCount > 0 {
  97. metrics.GetOrRegisterMeter("record-send-rate", metricRegistry).Mark(totalRecordCount)
  98. getOrRegisterHistogram("records-per-request", metricRegistry).Update(totalRecordCount)
  99. }
  100. return nil
  101. }
  102. func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
  103. requiredAcks, err := pd.getInt16()
  104. if err != nil {
  105. return err
  106. }
  107. r.RequiredAcks = RequiredAcks(requiredAcks)
  108. if r.Timeout, err = pd.getInt32(); err != nil {
  109. return err
  110. }
  111. topicCount, err := pd.getArrayLength()
  112. if err != nil {
  113. return err
  114. }
  115. if topicCount == 0 {
  116. return nil
  117. }
  118. r.msgSets = make(map[string]map[int32]*MessageSet)
  119. for i := 0; i < topicCount; i++ {
  120. topic, err := pd.getString()
  121. if err != nil {
  122. return err
  123. }
  124. partitionCount, err := pd.getArrayLength()
  125. if err != nil {
  126. return err
  127. }
  128. r.msgSets[topic] = make(map[int32]*MessageSet)
  129. for j := 0; j < partitionCount; j++ {
  130. partition, err := pd.getInt32()
  131. if err != nil {
  132. return err
  133. }
  134. messageSetSize, err := pd.getInt32()
  135. if err != nil {
  136. return err
  137. }
  138. msgSetDecoder, err := pd.getSubset(int(messageSetSize))
  139. if err != nil {
  140. return err
  141. }
  142. msgSet := &MessageSet{}
  143. err = msgSet.decode(msgSetDecoder)
  144. if err != nil {
  145. return err
  146. }
  147. r.msgSets[topic][partition] = msgSet
  148. }
  149. }
  150. return nil
  151. }
  152. func (r *ProduceRequest) key() int16 {
  153. return 0
  154. }
  155. func (r *ProduceRequest) version() int16 {
  156. return r.Version
  157. }
  158. func (r *ProduceRequest) requiredVersion() KafkaVersion {
  159. switch r.Version {
  160. case 1:
  161. return V0_9_0_0
  162. case 2:
  163. return V0_10_0_0
  164. default:
  165. return minVersion
  166. }
  167. }
  168. func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
  169. if r.msgSets == nil {
  170. r.msgSets = make(map[string]map[int32]*MessageSet)
  171. }
  172. if r.msgSets[topic] == nil {
  173. r.msgSets[topic] = make(map[int32]*MessageSet)
  174. }
  175. set := r.msgSets[topic][partition]
  176. if set == nil {
  177. set = new(MessageSet)
  178. r.msgSets[topic][partition] = set
  179. }
  180. set.addMessage(msg)
  181. }
  182. func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) {
  183. if r.msgSets == nil {
  184. r.msgSets = make(map[string]map[int32]*MessageSet)
  185. }
  186. if r.msgSets[topic] == nil {
  187. r.msgSets[topic] = make(map[int32]*MessageSet)
  188. }
  189. r.msgSets[topic][partition] = set
  190. }