utils.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package sarama
  2. import "sort"
  3. type none struct{}
  4. // make []int32 sortable so we can sort partition numbers
  5. type int32Slice []int32
  6. func (slice int32Slice) Len() int {
  7. return len(slice)
  8. }
  9. func (slice int32Slice) Less(i, j int) bool {
  10. return slice[i] < slice[j]
  11. }
  12. func (slice int32Slice) Swap(i, j int) {
  13. slice[i], slice[j] = slice[j], slice[i]
  14. }
  15. func dupeAndSort(input []int32) []int32 {
  16. ret := make([]int32, 0, len(input))
  17. for _, val := range input {
  18. ret = append(ret, val)
  19. }
  20. sort.Sort(int32Slice(ret))
  21. return ret
  22. }
  23. func withRecover(fn func()) {
  24. defer func() {
  25. handler := PanicHandler
  26. if handler != nil {
  27. if err := recover(); err != nil {
  28. handler(err)
  29. }
  30. }
  31. }()
  32. fn()
  33. }
  34. func safeAsyncClose(b *Broker) {
  35. tmp := b // local var prevents clobbering in goroutine
  36. go withRecover(func() {
  37. if err := tmp.Close(); err != nil {
  38. Logger.Println("Error closing broker", tmp.ID(), ":", err)
  39. }
  40. })
  41. }
  42. // Encoder is a simple interface for any type that can be encoded as an array of bytes
  43. // in order to be sent as the key or value of a Kafka message. Length() is provided as an
  44. // optimization, and must return the same as len() on the result of Encode().
  45. type Encoder interface {
  46. Encode() ([]byte, error)
  47. Length() int
  48. }
  49. // make strings and byte slices encodable for convenience so they can be used as keys
  50. // and/or values in kafka messages
  51. // StringEncoder implements the Encoder interface for Go strings so that they can be used
  52. // as the Key or Value in a ProducerMessage.
  53. type StringEncoder string
  54. func (s StringEncoder) Encode() ([]byte, error) {
  55. return []byte(s), nil
  56. }
  57. func (s StringEncoder) Length() int {
  58. return len(s)
  59. }
  60. // ByteEncoder implements the Encoder interface for Go byte slices so that they can be used
  61. // as the Key or Value in a ProducerMessage.
  62. type ByteEncoder []byte
  63. func (b ByteEncoder) Encode() ([]byte, error) {
  64. return b, nil
  65. }
  66. func (b ByteEncoder) Length() int {
  67. return len(b)
  68. }