real_encoder.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "github.com/rcrowley/go-metrics"
  6. )
  7. type realEncoder struct {
  8. raw []byte
  9. off int
  10. stack []pushEncoder
  11. registry metrics.Registry
  12. }
  13. // primitives
  14. func (re *realEncoder) putInt8(in int8) {
  15. re.raw[re.off] = byte(in)
  16. re.off++
  17. }
  18. func (re *realEncoder) putInt16(in int16) {
  19. binary.BigEndian.PutUint16(re.raw[re.off:], uint16(in))
  20. re.off += 2
  21. }
  22. func (re *realEncoder) putInt32(in int32) {
  23. binary.BigEndian.PutUint32(re.raw[re.off:], uint32(in))
  24. re.off += 4
  25. }
  26. func (re *realEncoder) putInt64(in int64) {
  27. binary.BigEndian.PutUint64(re.raw[re.off:], uint64(in))
  28. re.off += 8
  29. }
  30. func (re *realEncoder) putVarint(in int64) {
  31. re.off += binary.PutVarint(re.raw[re.off:], in)
  32. }
  33. func (re *realEncoder) putUVarint(in uint64) {
  34. re.off += binary.PutUvarint(re.raw[re.off:], in)
  35. }
  36. func (re *realEncoder) putArrayLength(in int) error {
  37. re.putInt32(int32(in))
  38. return nil
  39. }
  40. func (re *realEncoder) putCompactArrayLength(in int) {
  41. // 0 represents a null array, so +1 has to be added
  42. re.putUVarint(uint64(in + 1))
  43. }
  44. func (re *realEncoder) putBool(in bool) {
  45. if in {
  46. re.putInt8(1)
  47. return
  48. }
  49. re.putInt8(0)
  50. }
  51. // collection
  52. func (re *realEncoder) putRawBytes(in []byte) error {
  53. copy(re.raw[re.off:], in)
  54. re.off += len(in)
  55. return nil
  56. }
  57. func (re *realEncoder) putBytes(in []byte) error {
  58. if in == nil {
  59. re.putInt32(-1)
  60. return nil
  61. }
  62. re.putInt32(int32(len(in)))
  63. return re.putRawBytes(in)
  64. }
  65. func (re *realEncoder) putVarintBytes(in []byte) error {
  66. if in == nil {
  67. re.putVarint(-1)
  68. return nil
  69. }
  70. re.putVarint(int64(len(in)))
  71. return re.putRawBytes(in)
  72. }
  73. func (re *realEncoder) putCompactString(in string) error {
  74. re.putCompactArrayLength(len(in))
  75. return re.putRawBytes([]byte(in))
  76. }
  77. func (re *realEncoder) putNullableCompactString(in *string) error {
  78. if in == nil {
  79. re.putInt8(0)
  80. return nil
  81. }
  82. return re.putCompactString(*in)
  83. }
  84. func (re *realEncoder) putString(in string) error {
  85. re.putInt16(int16(len(in)))
  86. copy(re.raw[re.off:], in)
  87. re.off += len(in)
  88. return nil
  89. }
  90. func (re *realEncoder) putNullableString(in *string) error {
  91. if in == nil {
  92. re.putInt16(-1)
  93. return nil
  94. }
  95. return re.putString(*in)
  96. }
  97. func (re *realEncoder) putStringArray(in []string) error {
  98. err := re.putArrayLength(len(in))
  99. if err != nil {
  100. return err
  101. }
  102. for _, val := range in {
  103. if err := re.putString(val); err != nil {
  104. return err
  105. }
  106. }
  107. return nil
  108. }
  109. func (re *realEncoder) putCompactInt32Array(in []int32) error {
  110. if in == nil {
  111. return errors.New("expected int32 array to be non null")
  112. }
  113. // 0 represents a null array, so +1 has to be added
  114. re.putUVarint(uint64(len(in)) + 1)
  115. for _, val := range in {
  116. re.putInt32(val)
  117. }
  118. return nil
  119. }
  120. func (re *realEncoder) putNullableCompactInt32Array(in []int32) error {
  121. if in == nil {
  122. re.putUVarint(0)
  123. return nil
  124. }
  125. // 0 represents a null array, so +1 has to be added
  126. re.putUVarint(uint64(len(in)) + 1)
  127. for _, val := range in {
  128. re.putInt32(val)
  129. }
  130. return nil
  131. }
  132. func (re *realEncoder) putInt32Array(in []int32) error {
  133. err := re.putArrayLength(len(in))
  134. if err != nil {
  135. return err
  136. }
  137. for _, val := range in {
  138. re.putInt32(val)
  139. }
  140. return nil
  141. }
  142. func (re *realEncoder) putInt64Array(in []int64) error {
  143. err := re.putArrayLength(len(in))
  144. if err != nil {
  145. return err
  146. }
  147. for _, val := range in {
  148. re.putInt64(val)
  149. }
  150. return nil
  151. }
  152. func (re *realEncoder) putEmptyTaggedFieldArray() {
  153. re.putUVarint(0)
  154. }
  155. func (re *realEncoder) offset() int {
  156. return re.off
  157. }
  158. // stacks
  159. func (re *realEncoder) push(in pushEncoder) {
  160. in.saveOffset(re.off)
  161. re.off += in.reserveLength()
  162. re.stack = append(re.stack, in)
  163. }
  164. func (re *realEncoder) pop() error {
  165. // this is go's ugly pop pattern (the inverse of append)
  166. in := re.stack[len(re.stack)-1]
  167. re.stack = re.stack[:len(re.stack)-1]
  168. return in.run(re.off, re.raw)
  169. }
  170. // we do record metrics during the real encoder pass
  171. func (re *realEncoder) metricRegistry() metrics.Registry {
  172. return re.registry
  173. }