utils.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package sarama
  2. import (
  3. "io"
  4. "sort"
  5. )
  6. // make []int32 sortable so we can sort partition numbers
  7. type int32Slice []int32
  8. func (slice int32Slice) Len() int {
  9. return len(slice)
  10. }
  11. func (slice int32Slice) Less(i, j int) bool {
  12. return slice[i] < slice[j]
  13. }
  14. func (slice int32Slice) Swap(i, j int) {
  15. slice[i], slice[j] = slice[j], slice[i]
  16. }
  17. func dupeAndSort(input []int32) []int32 {
  18. ret := make([]int32, 0, len(input))
  19. for _, val := range input {
  20. ret = append(ret, val)
  21. }
  22. sort.Sort(int32Slice(ret))
  23. return ret
  24. }
  25. func withRecover(fn func()) {
  26. defer func() {
  27. handler := PanicHandler
  28. if handler != nil {
  29. if err := recover(); err != nil {
  30. handler(err)
  31. }
  32. }
  33. }()
  34. fn()
  35. }
  36. func safeAsyncClose(c io.Closer) {
  37. tmp := c // local var prevents clobbering in goroutine
  38. go withRecover(func() {
  39. if err := tmp.Close(); err != nil {
  40. Logger.Println("Error closing", tmp, ":", err)
  41. }
  42. })
  43. }
  44. // Encoder is a simple interface for any type that can be encoded as an array of bytes
  45. // in order to be sent as the key or value of a Kafka message. Length() is provided as an
  46. // optimization, and must return the same as len() on the result of Encode().
  47. type Encoder interface {
  48. Encode() ([]byte, error)
  49. Length() int
  50. }
  51. // make strings and byte slices encodable for convenience so they can be used as keys
  52. // and/or values in kafka messages
  53. // StringEncoder implements the Encoder interface for Go strings so that you can do things like
  54. // producer.SendMessage(nil, sarama.StringEncoder("hello world"))
  55. type StringEncoder string
  56. func (s StringEncoder) Encode() ([]byte, error) {
  57. return []byte(s), nil
  58. }
  59. func (s StringEncoder) Length() int {
  60. return len(s)
  61. }
  62. // ByteEncoder implements the Encoder interface for Go byte slices so that you can do things like
  63. // producer.SendMessage(nil, sarama.ByteEncoder([]byte{0x00}))
  64. type ByteEncoder []byte
  65. func (b ByteEncoder) Encode() ([]byte, error) {
  66. return b, nil
  67. }
  68. func (b ByteEncoder) Length() int {
  69. return len(b)
  70. }