produce_request.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package sarama
  2. import "github.com/rcrowley/go-metrics"
  3. // RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements
  4. // it must see before responding. Any of the constants defined here are valid. On broker versions
  5. // prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many
  6. // acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced
  7. // by setting the `min.isr` value in the brokers configuration).
  8. type RequiredAcks int16
  9. const (
  10. // NoResponse doesn't send any response, the TCP ACK is all you get.
  11. NoResponse RequiredAcks = 0
  12. // WaitForLocal waits for only the local commit to succeed before responding.
  13. WaitForLocal RequiredAcks = 1
  14. // WaitForAll waits for all replicas to commit before responding.
  15. WaitForAll RequiredAcks = -1
  16. )
  17. type ProduceRequest struct {
  18. RequiredAcks RequiredAcks
  19. Timeout int32
  20. Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10
  21. msgSets map[string]map[int32]*MessageSet
  22. }
  23. func (r *ProduceRequest) encode(pe packetEncoder) error {
  24. pe.putInt16(int16(r.RequiredAcks))
  25. pe.putInt32(r.Timeout)
  26. err := pe.putArrayLength(len(r.msgSets))
  27. if err != nil {
  28. return err
  29. }
  30. metricRegistry := pe.metricRegistry()
  31. var batchSizeMetric metrics.Histogram
  32. var compressionRatioMetric metrics.Histogram
  33. if metricRegistry != nil {
  34. batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry)
  35. compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry)
  36. }
  37. totalRecordCount := int64(0)
  38. for topic, partitions := range r.msgSets {
  39. err = pe.putString(topic)
  40. if err != nil {
  41. return err
  42. }
  43. err = pe.putArrayLength(len(partitions))
  44. if err != nil {
  45. return err
  46. }
  47. topicRecordCount := int64(0)
  48. var topicCompressionRatioMetric metrics.Histogram
  49. if metricRegistry != nil {
  50. topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry)
  51. }
  52. for id, msgSet := range partitions {
  53. startOffset := pe.offset()
  54. pe.putInt32(id)
  55. pe.push(&lengthField{})
  56. err = msgSet.encode(pe)
  57. if err != nil {
  58. return err
  59. }
  60. err = pe.pop()
  61. if err != nil {
  62. return err
  63. }
  64. if metricRegistry != nil {
  65. for _, messageBlock := range msgSet.Messages {
  66. // Is this a fake "message" wrapping real messages?
  67. if messageBlock.Msg.Set != nil {
  68. topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
  69. } else {
  70. // A single uncompressed message
  71. topicRecordCount++
  72. }
  73. // Better be safe than sorry when computing the compression ratio
  74. if messageBlock.Msg.compressedSize != 0 {
  75. compressionRatio := float64(len(messageBlock.Msg.Value)) /
  76. float64(messageBlock.Msg.compressedSize)
  77. // Histogram do not support decimal values, let's multiple it by 100 for better precision
  78. intCompressionRatio := int64(100 * compressionRatio)
  79. compressionRatioMetric.Update(intCompressionRatio)
  80. topicCompressionRatioMetric.Update(intCompressionRatio)
  81. }
  82. }
  83. batchSize := int64(pe.offset() - startOffset)
  84. batchSizeMetric.Update(batchSize)
  85. getOrRegisterTopicHistogram("batch-size", topic, metricRegistry).Update(batchSize)
  86. }
  87. }
  88. if topicRecordCount > 0 {
  89. getOrRegisterTopicMeter("record-send-rate", topic, metricRegistry).Mark(topicRecordCount)
  90. getOrRegisterTopicHistogram("records-per-request", topic, metricRegistry).Update(topicRecordCount)
  91. totalRecordCount += topicRecordCount
  92. }
  93. }
  94. if totalRecordCount > 0 {
  95. metrics.GetOrRegisterMeter("record-send-rate", metricRegistry).Mark(totalRecordCount)
  96. getOrRegisterHistogram("records-per-request", metricRegistry).Update(totalRecordCount)
  97. }
  98. return nil
  99. }
  100. func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
  101. requiredAcks, err := pd.getInt16()
  102. if err != nil {
  103. return err
  104. }
  105. r.RequiredAcks = RequiredAcks(requiredAcks)
  106. if r.Timeout, err = pd.getInt32(); err != nil {
  107. return err
  108. }
  109. topicCount, err := pd.getArrayLength()
  110. if err != nil {
  111. return err
  112. }
  113. if topicCount == 0 {
  114. return nil
  115. }
  116. r.msgSets = make(map[string]map[int32]*MessageSet)
  117. for i := 0; i < topicCount; i++ {
  118. topic, err := pd.getString()
  119. if err != nil {
  120. return err
  121. }
  122. partitionCount, err := pd.getArrayLength()
  123. if err != nil {
  124. return err
  125. }
  126. r.msgSets[topic] = make(map[int32]*MessageSet)
  127. for j := 0; j < partitionCount; j++ {
  128. partition, err := pd.getInt32()
  129. if err != nil {
  130. return err
  131. }
  132. messageSetSize, err := pd.getInt32()
  133. if err != nil {
  134. return err
  135. }
  136. msgSetDecoder, err := pd.getSubset(int(messageSetSize))
  137. if err != nil {
  138. return err
  139. }
  140. msgSet := &MessageSet{}
  141. err = msgSet.decode(msgSetDecoder)
  142. if err != nil {
  143. return err
  144. }
  145. r.msgSets[topic][partition] = msgSet
  146. }
  147. }
  148. return nil
  149. }
  150. func (r *ProduceRequest) key() int16 {
  151. return 0
  152. }
  153. func (r *ProduceRequest) version() int16 {
  154. return r.Version
  155. }
  156. func (r *ProduceRequest) requiredVersion() KafkaVersion {
  157. switch r.Version {
  158. case 1:
  159. return V0_9_0_0
  160. case 2:
  161. return V0_10_0_0
  162. default:
  163. return minVersion
  164. }
  165. }
  166. func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
  167. if r.msgSets == nil {
  168. r.msgSets = make(map[string]map[int32]*MessageSet)
  169. }
  170. if r.msgSets[topic] == nil {
  171. r.msgSets[topic] = make(map[int32]*MessageSet)
  172. }
  173. set := r.msgSets[topic][partition]
  174. if set == nil {
  175. set = new(MessageSet)
  176. r.msgSets[topic][partition] = set
  177. }
  178. set.addMessage(msg)
  179. }
  180. func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) {
  181. if r.msgSets == nil {
  182. r.msgSets = make(map[string]map[int32]*MessageSet)
  183. }
  184. if r.msgSets[topic] == nil {
  185. r.msgSets[topic] = make(map[int32]*MessageSet)
  186. }
  187. r.msgSets[topic][partition] = set
  188. }