utils.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package sarama
  2. import (
  3. "bufio"
  4. "net"
  5. "sort"
  6. )
  7. type none struct{}
  8. // make []int32 sortable so we can sort partition numbers
  9. type int32Slice []int32
  10. func (slice int32Slice) Len() int {
  11. return len(slice)
  12. }
  13. func (slice int32Slice) Less(i, j int) bool {
  14. return slice[i] < slice[j]
  15. }
  16. func (slice int32Slice) Swap(i, j int) {
  17. slice[i], slice[j] = slice[j], slice[i]
  18. }
  19. func dupeAndSort(input []int32) []int32 {
  20. ret := make([]int32, 0, len(input))
  21. for _, val := range input {
  22. ret = append(ret, val)
  23. }
  24. sort.Sort(int32Slice(ret))
  25. return ret
  26. }
  27. func withRecover(fn func()) {
  28. defer func() {
  29. handler := PanicHandler
  30. if handler != nil {
  31. if err := recover(); err != nil {
  32. handler(err)
  33. }
  34. }
  35. }()
  36. fn()
  37. }
  38. func safeAsyncClose(b *Broker) {
  39. tmp := b // local var prevents clobbering in goroutine
  40. go withRecover(func() {
  41. if connected, _ := tmp.Connected(); connected {
  42. if err := tmp.Close(); err != nil {
  43. Logger.Println("Error closing broker", tmp.ID(), ":", err)
  44. }
  45. }
  46. })
  47. }
  48. // Encoder is a simple interface for any type that can be encoded as an array of bytes
  49. // in order to be sent as the key or value of a Kafka message. Length() is provided as an
  50. // optimization, and must return the same as len() on the result of Encode().
  51. type Encoder interface {
  52. Encode() ([]byte, error)
  53. Length() int
  54. }
  55. // make strings and byte slices encodable for convenience so they can be used as keys
  56. // and/or values in kafka messages
  57. // StringEncoder implements the Encoder interface for Go strings so that they can be used
  58. // as the Key or Value in a ProducerMessage.
  59. type StringEncoder string
  60. func (s StringEncoder) Encode() ([]byte, error) {
  61. return []byte(s), nil
  62. }
  63. func (s StringEncoder) Length() int {
  64. return len(s)
  65. }
  66. // ByteEncoder implements the Encoder interface for Go byte slices so that they can be used
  67. // as the Key or Value in a ProducerMessage.
  68. type ByteEncoder []byte
  69. func (b ByteEncoder) Encode() ([]byte, error) {
  70. return b, nil
  71. }
  72. func (b ByteEncoder) Length() int {
  73. return len(b)
  74. }
  75. // bufConn wraps a net.Conn with a buffer for reads to reduce the number of
  76. // reads that trigger syscalls.
  77. type bufConn struct {
  78. net.Conn
  79. buf *bufio.Reader
  80. }
  81. func newBufConn(conn net.Conn) *bufConn {
  82. return &bufConn{
  83. Conn: conn,
  84. buf: bufio.NewReader(conn),
  85. }
  86. }
  87. func (bc *bufConn) Read(b []byte) (n int, err error) {
  88. return bc.buf.Read(b)
  89. }