utils.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package sarama
  2. import (
  3. "bufio"
  4. "fmt"
  5. "net"
  6. "regexp"
  7. )
  8. type none struct{}
  9. // make []int32 sortable so we can sort partition numbers
  10. type int32Slice []int32
  11. func (slice int32Slice) Len() int {
  12. return len(slice)
  13. }
  14. func (slice int32Slice) Less(i, j int) bool {
  15. return slice[i] < slice[j]
  16. }
  17. func (slice int32Slice) Swap(i, j int) {
  18. slice[i], slice[j] = slice[j], slice[i]
  19. }
  20. func dupInt32Slice(input []int32) []int32 {
  21. ret := make([]int32, 0, len(input))
  22. for _, val := range input {
  23. ret = append(ret, val)
  24. }
  25. return ret
  26. }
  27. func withRecover(fn func()) {
  28. defer func() {
  29. handler := PanicHandler
  30. if handler != nil {
  31. if err := recover(); err != nil {
  32. handler(err)
  33. }
  34. }
  35. }()
  36. fn()
  37. }
  38. func safeAsyncClose(b *Broker) {
  39. tmp := b // local var prevents clobbering in goroutine
  40. go withRecover(func() {
  41. if connected, _ := tmp.Connected(); connected {
  42. if err := tmp.Close(); err != nil {
  43. Logger.Println("Error closing broker", tmp.ID(), ":", err)
  44. }
  45. }
  46. })
  47. }
  48. // Encoder is a simple interface for any type that can be encoded as an array of bytes
  49. // in order to be sent as the key or value of a Kafka message. Length() is provided as an
  50. // optimization, and must return the same as len() on the result of Encode().
  51. type Encoder interface {
  52. Encode() ([]byte, error)
  53. Length() int
  54. }
  55. // make strings and byte slices encodable for convenience so they can be used as keys
  56. // and/or values in kafka messages
  57. // StringEncoder implements the Encoder interface for Go strings so that they can be used
  58. // as the Key or Value in a ProducerMessage.
  59. type StringEncoder string
  60. func (s StringEncoder) Encode() ([]byte, error) {
  61. return []byte(s), nil
  62. }
  63. func (s StringEncoder) Length() int {
  64. return len(s)
  65. }
  66. // ByteEncoder implements the Encoder interface for Go byte slices so that they can be used
  67. // as the Key or Value in a ProducerMessage.
  68. type ByteEncoder []byte
  69. func (b ByteEncoder) Encode() ([]byte, error) {
  70. return b, nil
  71. }
  72. func (b ByteEncoder) Length() int {
  73. return len(b)
  74. }
  75. // bufConn wraps a net.Conn with a buffer for reads to reduce the number of
  76. // reads that trigger syscalls.
  77. type bufConn struct {
  78. net.Conn
  79. buf *bufio.Reader
  80. }
  81. func newBufConn(conn net.Conn) *bufConn {
  82. return &bufConn{
  83. Conn: conn,
  84. buf: bufio.NewReader(conn),
  85. }
  86. }
  87. func (bc *bufConn) Read(b []byte) (n int, err error) {
  88. return bc.buf.Read(b)
  89. }
  90. // KafkaVersion instances represent versions of the upstream Kafka broker.
  91. type KafkaVersion struct {
  92. // it's a struct rather than just typing the array directly to make it opaque and stop people
  93. // generating their own arbitrary versions
  94. version [4]uint
  95. }
  96. func newKafkaVersion(major, minor, veryMinor, patch uint) KafkaVersion {
  97. return KafkaVersion{
  98. version: [4]uint{major, minor, veryMinor, patch},
  99. }
  100. }
  101. // IsAtLeast return true if and only if the version it is called on is
  102. // greater than or equal to the version passed in:
  103. // V1.IsAtLeast(V2) // false
  104. // V2.IsAtLeast(V1) // true
  105. func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool {
  106. for i := range v.version {
  107. if v.version[i] > other.version[i] {
  108. return true
  109. } else if v.version[i] < other.version[i] {
  110. return false
  111. }
  112. }
  113. return true
  114. }
  115. // Effective constants defining the supported kafka versions.
  116. var (
  117. V0_8_2_0 = newKafkaVersion(0, 8, 2, 0)
  118. V0_8_2_1 = newKafkaVersion(0, 8, 2, 1)
  119. V0_8_2_2 = newKafkaVersion(0, 8, 2, 2)
  120. V0_9_0_0 = newKafkaVersion(0, 9, 0, 0)
  121. V0_9_0_1 = newKafkaVersion(0, 9, 0, 1)
  122. V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
  123. V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
  124. V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
  125. V0_10_1_1 = newKafkaVersion(0, 10, 1, 1)
  126. V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
  127. V0_10_2_1 = newKafkaVersion(0, 10, 2, 1)
  128. V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
  129. V0_11_0_1 = newKafkaVersion(0, 11, 0, 1)
  130. V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
  131. V1_0_0_0 = newKafkaVersion(1, 0, 0, 0)
  132. V1_1_0_0 = newKafkaVersion(1, 1, 0, 0)
  133. V1_1_1_0 = newKafkaVersion(1, 1, 1, 0)
  134. V2_0_0_0 = newKafkaVersion(2, 0, 0, 0)
  135. V2_0_1_0 = newKafkaVersion(2, 0, 1, 0)
  136. V2_1_0_0 = newKafkaVersion(2, 1, 0, 0)
  137. V2_2_0_0 = newKafkaVersion(2, 2, 0, 0)
  138. V2_3_0_0 = newKafkaVersion(2, 3, 0, 0)
  139. SupportedVersions = []KafkaVersion{
  140. V0_8_2_0,
  141. V0_8_2_1,
  142. V0_8_2_2,
  143. V0_9_0_0,
  144. V0_9_0_1,
  145. V0_10_0_0,
  146. V0_10_0_1,
  147. V0_10_1_0,
  148. V0_10_1_1,
  149. V0_10_2_0,
  150. V0_10_2_1,
  151. V0_11_0_0,
  152. V0_11_0_1,
  153. V0_11_0_2,
  154. V1_0_0_0,
  155. V1_1_0_0,
  156. V1_1_1_0,
  157. V2_0_0_0,
  158. V2_0_1_0,
  159. V2_1_0_0,
  160. V2_2_0_0,
  161. V2_3_0_0,
  162. }
  163. MinVersion = V0_8_2_0
  164. MaxVersion = V2_3_0_0
  165. )
  166. //ParseKafkaVersion parses and returns kafka version or error from a string
  167. func ParseKafkaVersion(s string) (KafkaVersion, error) {
  168. if len(s) < 5 {
  169. return MinVersion, fmt.Errorf("invalid version `%s`", s)
  170. }
  171. var major, minor, veryMinor, patch uint
  172. var err error
  173. if s[0] == '0' {
  174. err = scanKafkaVersion(s, `^0\.\d+\.\d+\.\d+$`, "0.%d.%d.%d", [3]*uint{&minor, &veryMinor, &patch})
  175. } else {
  176. err = scanKafkaVersion(s, `^\d+\.\d+\.\d+$`, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor})
  177. }
  178. if err != nil {
  179. return MinVersion, err
  180. }
  181. return newKafkaVersion(major, minor, veryMinor, patch), nil
  182. }
  183. func scanKafkaVersion(s string, pattern string, format string, v [3]*uint) error {
  184. if !regexp.MustCompile(pattern).MatchString(s) {
  185. return fmt.Errorf("invalid version `%s`", s)
  186. }
  187. _, err := fmt.Sscanf(s, format, v[0], v[1], v[2])
  188. return err
  189. }
  190. func (v KafkaVersion) String() string {
  191. if v.version[0] == 0 {
  192. return fmt.Sprintf("0.%d.%d.%d", v.version[1], v.version[2], v.version[3])
  193. }
  194. return fmt.Sprintf("%d.%d.%d", v.version[0], v.version[1], v.version[2])
  195. }