utils.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package sarama
  2. import (
  3. "io"
  4. "sort"
  5. )
  6. type none struct{}
  7. // make []int32 sortable so we can sort partition numbers
  8. type int32Slice []int32
  9. func (slice int32Slice) Len() int {
  10. return len(slice)
  11. }
  12. func (slice int32Slice) Less(i, j int) bool {
  13. return slice[i] < slice[j]
  14. }
  15. func (slice int32Slice) Swap(i, j int) {
  16. slice[i], slice[j] = slice[j], slice[i]
  17. }
  18. func dupeAndSort(input []int32) []int32 {
  19. ret := make([]int32, 0, len(input))
  20. for _, val := range input {
  21. ret = append(ret, val)
  22. }
  23. sort.Sort(int32Slice(ret))
  24. return ret
  25. }
  26. func withRecover(fn func()) {
  27. defer func() {
  28. handler := PanicHandler
  29. if handler != nil {
  30. if err := recover(); err != nil {
  31. handler(err)
  32. }
  33. }
  34. }()
  35. fn()
  36. }
  37. func safeAsyncClose(c io.Closer) {
  38. tmp := c // local var prevents clobbering in goroutine
  39. go withRecover(func() {
  40. if err := tmp.Close(); err != nil {
  41. Logger.Println("Error closing", tmp, ":", err)
  42. }
  43. })
  44. }
  45. // Encoder is a simple interface for any type that can be encoded as an array of bytes
  46. // in order to be sent as the key or value of a Kafka message. Length() is provided as an
  47. // optimization, and must return the same as len() on the result of Encode().
  48. type Encoder interface {
  49. Encode() ([]byte, error)
  50. Length() int
  51. }
  52. // make strings and byte slices encodable for convenience so they can be used as keys
  53. // and/or values in kafka messages
  54. // StringEncoder implements the Encoder interface for Go strings so that you can do things like
  55. // producer.SendMessage(nil, sarama.StringEncoder("hello world"))
  56. type StringEncoder string
  57. func (s StringEncoder) Encode() ([]byte, error) {
  58. return []byte(s), nil
  59. }
  60. func (s StringEncoder) Length() int {
  61. return len(s)
  62. }
  63. // ByteEncoder implements the Encoder interface for Go byte slices so that you can do things like
  64. // producer.SendMessage(nil, sarama.ByteEncoder([]byte{0x00}))
  65. type ByteEncoder []byte
  66. func (b ByteEncoder) Encode() ([]byte, error) {
  67. return b, nil
  68. }
  69. func (b ByteEncoder) Length() int {
  70. return len(b)
  71. }