prep_encoder.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "math"
  7. "github.com/rcrowley/go-metrics"
  8. )
  9. type prepEncoder struct {
  10. stack []pushEncoder
  11. length int
  12. }
  13. // primitives
  14. func (pe *prepEncoder) putInt8(in int8) {
  15. pe.length++
  16. }
  17. func (pe *prepEncoder) putInt16(in int16) {
  18. pe.length += 2
  19. }
  20. func (pe *prepEncoder) putInt32(in int32) {
  21. pe.length += 4
  22. }
  23. func (pe *prepEncoder) putInt64(in int64) {
  24. pe.length += 8
  25. }
  26. func (pe *prepEncoder) putVarint(in int64) {
  27. var buf [binary.MaxVarintLen64]byte
  28. pe.length += binary.PutVarint(buf[:], in)
  29. }
  30. func (pe *prepEncoder) putUVarint(in uint64) {
  31. var buf [binary.MaxVarintLen64]byte
  32. pe.length += binary.PutUvarint(buf[:], in)
  33. }
  34. func (pe *prepEncoder) putArrayLength(in int) error {
  35. if in > math.MaxInt32 {
  36. return PacketEncodingError{fmt.Sprintf("array too long (%d)", in)}
  37. }
  38. pe.length += 4
  39. return nil
  40. }
  41. func (pe *prepEncoder) putCompactArrayLength(in int) {
  42. pe.putUVarint(uint64(in + 1))
  43. }
  44. func (pe *prepEncoder) putBool(in bool) {
  45. pe.length++
  46. }
  47. // arrays
  48. func (pe *prepEncoder) putBytes(in []byte) error {
  49. pe.length += 4
  50. if in == nil {
  51. return nil
  52. }
  53. return pe.putRawBytes(in)
  54. }
  55. func (pe *prepEncoder) putVarintBytes(in []byte) error {
  56. if in == nil {
  57. pe.putVarint(-1)
  58. return nil
  59. }
  60. pe.putVarint(int64(len(in)))
  61. return pe.putRawBytes(in)
  62. }
  63. func (pe *prepEncoder) putCompactString(in string) error {
  64. pe.putCompactArrayLength(len(in))
  65. return pe.putRawBytes([]byte(in))
  66. }
  67. func (pe *prepEncoder) putNullableCompactString(in *string) error {
  68. if in == nil {
  69. pe.putUVarint(0)
  70. return nil
  71. } else {
  72. return pe.putCompactString(*in)
  73. }
  74. }
  75. func (pe *prepEncoder) putRawBytes(in []byte) error {
  76. if len(in) > math.MaxInt32 {
  77. return PacketEncodingError{fmt.Sprintf("byteslice too long (%d)", len(in))}
  78. }
  79. pe.length += len(in)
  80. return nil
  81. }
  82. func (pe *prepEncoder) putNullableString(in *string) error {
  83. if in == nil {
  84. pe.length += 2
  85. return nil
  86. }
  87. return pe.putString(*in)
  88. }
  89. func (pe *prepEncoder) putString(in string) error {
  90. pe.length += 2
  91. if len(in) > math.MaxInt16 {
  92. return PacketEncodingError{fmt.Sprintf("string too long (%d)", len(in))}
  93. }
  94. pe.length += len(in)
  95. return nil
  96. }
  97. func (pe *prepEncoder) putStringArray(in []string) error {
  98. err := pe.putArrayLength(len(in))
  99. if err != nil {
  100. return err
  101. }
  102. for _, str := range in {
  103. if err := pe.putString(str); err != nil {
  104. return err
  105. }
  106. }
  107. return nil
  108. }
  109. func (pe *prepEncoder) putCompactInt32Array(in []int32) error {
  110. if in == nil {
  111. return errors.New("expected int32 array to be non null")
  112. }
  113. pe.putUVarint(uint64(len(in)) + 1)
  114. pe.length += 4 * len(in)
  115. return nil
  116. }
  117. func (pe *prepEncoder) putNullableCompactInt32Array(in []int32) error {
  118. if in == nil {
  119. pe.putUVarint(0)
  120. return nil
  121. }
  122. pe.putUVarint(uint64(len(in)) + 1)
  123. pe.length += 4 * len(in)
  124. return nil
  125. }
  126. func (pe *prepEncoder) putInt32Array(in []int32) error {
  127. err := pe.putArrayLength(len(in))
  128. if err != nil {
  129. return err
  130. }
  131. pe.length += 4 * len(in)
  132. return nil
  133. }
  134. func (pe *prepEncoder) putInt64Array(in []int64) error {
  135. err := pe.putArrayLength(len(in))
  136. if err != nil {
  137. return err
  138. }
  139. pe.length += 8 * len(in)
  140. return nil
  141. }
  142. func (pe *prepEncoder) putEmptyTaggedFieldArray() {
  143. pe.putUVarint(0)
  144. }
  145. func (pe *prepEncoder) offset() int {
  146. return pe.length
  147. }
  148. // stackable
  149. func (pe *prepEncoder) push(in pushEncoder) {
  150. in.saveOffset(pe.length)
  151. pe.length += in.reserveLength()
  152. pe.stack = append(pe.stack, in)
  153. }
  154. func (pe *prepEncoder) pop() error {
  155. in := pe.stack[len(pe.stack)-1]
  156. pe.stack = pe.stack[:len(pe.stack)-1]
  157. if dpe, ok := in.(dynamicPushEncoder); ok {
  158. pe.length += dpe.adjustLength(pe.length)
  159. }
  160. return nil
  161. }
  162. // we do not record metrics during the prep encoder pass
  163. func (pe *prepEncoder) metricRegistry() metrics.Registry {
  164. return nil
  165. }