produce_request.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. package sarama
  2. import "github.com/rcrowley/go-metrics"
  3. // RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements
  4. // it must see before responding. Any of the constants defined here are valid. On broker versions
  5. // prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many
  6. // acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced
  7. // by setting the `min.isr` value in the brokers configuration).
  8. type RequiredAcks int16
  9. const (
  10. // NoResponse doesn't send any response, the TCP ACK is all you get.
  11. NoResponse RequiredAcks = 0
  12. // WaitForLocal waits for only the local commit to succeed before responding.
  13. WaitForLocal RequiredAcks = 1
  14. // WaitForAll waits for all in-sync replicas to commit before responding.
  15. // The minimum number of in-sync replicas is configured on the broker via
  16. // the `min.insync.replicas` configuration key.
  17. WaitForAll RequiredAcks = -1
  18. )
  19. type ProduceRequest struct {
  20. TransactionalID *string
  21. RequiredAcks RequiredAcks
  22. Timeout int32
  23. Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11
  24. records map[string]map[int32]Records
  25. }
  26. func updateMsgSetMetrics(msgSet *MessageSet, compressionRatioMetric metrics.Histogram,
  27. topicCompressionRatioMetric metrics.Histogram) int64 {
  28. var topicRecordCount int64
  29. for _, messageBlock := range msgSet.Messages {
  30. // Is this a fake "message" wrapping real messages?
  31. if messageBlock.Msg.Set != nil {
  32. topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
  33. } else {
  34. // A single uncompressed message
  35. topicRecordCount++
  36. }
  37. // Better be safe than sorry when computing the compression ratio
  38. if messageBlock.Msg.compressedSize != 0 {
  39. compressionRatio := float64(len(messageBlock.Msg.Value)) /
  40. float64(messageBlock.Msg.compressedSize)
  41. // Histogram do not support decimal values, let's multiple it by 100 for better precision
  42. intCompressionRatio := int64(100 * compressionRatio)
  43. compressionRatioMetric.Update(intCompressionRatio)
  44. topicCompressionRatioMetric.Update(intCompressionRatio)
  45. }
  46. }
  47. return topicRecordCount
  48. }
  49. func updateBatchMetrics(recordBatch *RecordBatch, compressionRatioMetric metrics.Histogram,
  50. topicCompressionRatioMetric metrics.Histogram) int64 {
  51. if recordBatch.compressedRecords != nil {
  52. compressionRatio := int64(float64(recordBatch.recordsLen) / float64(len(recordBatch.compressedRecords)) * 100)
  53. compressionRatioMetric.Update(compressionRatio)
  54. topicCompressionRatioMetric.Update(compressionRatio)
  55. }
  56. return int64(len(recordBatch.Records))
  57. }
  58. func (r *ProduceRequest) encode(pe packetEncoder) error {
  59. if r.Version >= 3 {
  60. if err := pe.putNullableString(r.TransactionalID); err != nil {
  61. return err
  62. }
  63. }
  64. pe.putInt16(int16(r.RequiredAcks))
  65. pe.putInt32(r.Timeout)
  66. metricRegistry := pe.metricRegistry()
  67. var batchSizeMetric metrics.Histogram
  68. var compressionRatioMetric metrics.Histogram
  69. if metricRegistry != nil {
  70. batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry)
  71. compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry)
  72. }
  73. totalRecordCount := int64(0)
  74. err := pe.putArrayLength(len(r.records))
  75. if err != nil {
  76. return err
  77. }
  78. for topic, partitions := range r.records {
  79. err = pe.putString(topic)
  80. if err != nil {
  81. return err
  82. }
  83. err = pe.putArrayLength(len(partitions))
  84. if err != nil {
  85. return err
  86. }
  87. topicRecordCount := int64(0)
  88. var topicCompressionRatioMetric metrics.Histogram
  89. if metricRegistry != nil {
  90. topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry)
  91. }
  92. for id, records := range partitions {
  93. startOffset := pe.offset()
  94. pe.putInt32(id)
  95. pe.push(&lengthField{})
  96. err = records.encode(pe)
  97. if err != nil {
  98. return err
  99. }
  100. err = pe.pop()
  101. if err != nil {
  102. return err
  103. }
  104. if metricRegistry != nil {
  105. if r.Version >= 3 {
  106. topicRecordCount += updateBatchMetrics(records.RecordBatch, compressionRatioMetric, topicCompressionRatioMetric)
  107. } else {
  108. topicRecordCount += updateMsgSetMetrics(records.MsgSet, compressionRatioMetric, topicCompressionRatioMetric)
  109. }
  110. batchSize := int64(pe.offset() - startOffset)
  111. batchSizeMetric.Update(batchSize)
  112. getOrRegisterTopicHistogram("batch-size", topic, metricRegistry).Update(batchSize)
  113. }
  114. }
  115. if topicRecordCount > 0 {
  116. getOrRegisterTopicMeter("record-send-rate", topic, metricRegistry).Mark(topicRecordCount)
  117. getOrRegisterTopicHistogram("records-per-request", topic, metricRegistry).Update(topicRecordCount)
  118. totalRecordCount += topicRecordCount
  119. }
  120. }
  121. if totalRecordCount > 0 {
  122. metrics.GetOrRegisterMeter("record-send-rate", metricRegistry).Mark(totalRecordCount)
  123. getOrRegisterHistogram("records-per-request", metricRegistry).Update(totalRecordCount)
  124. }
  125. return nil
  126. }
  127. func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
  128. r.Version = version
  129. if version >= 3 {
  130. id, err := pd.getNullableString()
  131. if err != nil {
  132. return err
  133. }
  134. r.TransactionalID = id
  135. }
  136. requiredAcks, err := pd.getInt16()
  137. if err != nil {
  138. return err
  139. }
  140. r.RequiredAcks = RequiredAcks(requiredAcks)
  141. if r.Timeout, err = pd.getInt32(); err != nil {
  142. return err
  143. }
  144. topicCount, err := pd.getArrayLength()
  145. if err != nil {
  146. return err
  147. }
  148. if topicCount == 0 {
  149. return nil
  150. }
  151. r.records = make(map[string]map[int32]Records)
  152. for i := 0; i < topicCount; i++ {
  153. topic, err := pd.getString()
  154. if err != nil {
  155. return err
  156. }
  157. partitionCount, err := pd.getArrayLength()
  158. if err != nil {
  159. return err
  160. }
  161. r.records[topic] = make(map[int32]Records)
  162. for j := 0; j < partitionCount; j++ {
  163. partition, err := pd.getInt32()
  164. if err != nil {
  165. return err
  166. }
  167. size, err := pd.getInt32()
  168. if err != nil {
  169. return err
  170. }
  171. recordsDecoder, err := pd.getSubset(int(size))
  172. if err != nil {
  173. return err
  174. }
  175. var records Records
  176. if err := records.decode(recordsDecoder); err != nil {
  177. return err
  178. }
  179. r.records[topic][partition] = records
  180. }
  181. }
  182. return nil
  183. }
  184. func (r *ProduceRequest) key() int16 {
  185. return 0
  186. }
  187. func (r *ProduceRequest) version() int16 {
  188. return r.Version
  189. }
  190. func (r *ProduceRequest) headerVersion() int16 {
  191. return 1
  192. }
  193. func (r *ProduceRequest) requiredVersion() KafkaVersion {
  194. switch r.Version {
  195. case 1:
  196. return V0_9_0_0
  197. case 2:
  198. return V0_10_0_0
  199. case 3:
  200. return V0_11_0_0
  201. case 7:
  202. return V2_1_0_0
  203. default:
  204. return MinVersion
  205. }
  206. }
  207. func (r *ProduceRequest) ensureRecords(topic string, partition int32) {
  208. if r.records == nil {
  209. r.records = make(map[string]map[int32]Records)
  210. }
  211. if r.records[topic] == nil {
  212. r.records[topic] = make(map[int32]Records)
  213. }
  214. }
  215. func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
  216. r.ensureRecords(topic, partition)
  217. set := r.records[topic][partition].MsgSet
  218. if set == nil {
  219. set = new(MessageSet)
  220. r.records[topic][partition] = newLegacyRecords(set)
  221. }
  222. set.addMessage(msg)
  223. }
  224. func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) {
  225. r.ensureRecords(topic, partition)
  226. r.records[topic][partition] = newLegacyRecords(set)
  227. }
  228. func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch) {
  229. r.ensureRecords(topic, partition)
  230. r.records[topic][partition] = newDefaultRecords(batch)
  231. }