utils.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package sarama
  2. import "sort"
  3. type none struct{}
  4. // make []int32 sortable so we can sort partition numbers
  5. type int32Slice []int32
  6. func (slice int32Slice) Len() int {
  7. return len(slice)
  8. }
  9. func (slice int32Slice) Less(i, j int) bool {
  10. return slice[i] < slice[j]
  11. }
  12. func (slice int32Slice) Swap(i, j int) {
  13. slice[i], slice[j] = slice[j], slice[i]
  14. }
  15. func dupeAndSort(input []int32) []int32 {
  16. ret := make([]int32, 0, len(input))
  17. for _, val := range input {
  18. ret = append(ret, val)
  19. }
  20. sort.Sort(int32Slice(ret))
  21. return ret
  22. }
  23. func withRecover(fn func()) {
  24. defer func() {
  25. handler := PanicHandler
  26. if handler != nil {
  27. if err := recover(); err != nil {
  28. handler(err)
  29. }
  30. }
  31. }()
  32. fn()
  33. }
  34. func safeAsyncClose(b *Broker) {
  35. tmp := b // local var prevents clobbering in goroutine
  36. go withRecover(func() {
  37. if connected, _ := tmp.Connected(); connected {
  38. if err := tmp.Close(); err != nil {
  39. Logger.Println("Error closing broker", tmp.ID(), ":", err)
  40. }
  41. }
  42. })
  43. }
  44. // Encoder is a simple interface for any type that can be encoded as an array of bytes
  45. // in order to be sent as the key or value of a Kafka message. Length() is provided as an
  46. // optimization, and must return the same as len() on the result of Encode().
  47. type Encoder interface {
  48. Encode() ([]byte, error)
  49. Length() int
  50. }
  51. // make strings and byte slices encodable for convenience so they can be used as keys
  52. // and/or values in kafka messages
  53. // StringEncoder implements the Encoder interface for Go strings so that they can be used
  54. // as the Key or Value in a ProducerMessage.
  55. type StringEncoder string
  56. func (s StringEncoder) Encode() ([]byte, error) {
  57. return []byte(s), nil
  58. }
  59. func (s StringEncoder) Length() int {
  60. return len(s)
  61. }
  62. // ByteEncoder implements the Encoder interface for Go byte slices so that they can be used
  63. // as the Key or Value in a ProducerMessage.
  64. type ByteEncoder []byte
  65. func (b ByteEncoder) Encode() ([]byte, error) {
  66. return b, nil
  67. }
  68. func (b ByteEncoder) Length() int {
  69. return len(b)
  70. }