utils.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package sarama
  2. import "io"
  3. // make []int32 sortable so we can sort partition numbers
  4. type int32Slice []int32
  5. func (slice int32Slice) Len() int {
  6. return len(slice)
  7. }
  8. func (slice int32Slice) Less(i, j int) bool {
  9. return slice[i] < slice[j]
  10. }
  11. func (slice int32Slice) Swap(i, j int) {
  12. slice[i], slice[j] = slice[j], slice[i]
  13. }
  14. func withRecover(fn func()) {
  15. defer func() {
  16. handler := PanicHandler
  17. if handler != nil {
  18. if err := recover(); err != nil {
  19. handler(err)
  20. }
  21. }
  22. }()
  23. fn()
  24. }
  25. func safeAsyncClose(c io.Closer) {
  26. tmp := c // local var prevents clobbering in goroutine
  27. go withRecover(func() {
  28. if err := tmp.Close(); err != nil {
  29. Logger.Println("Error closing", tmp, ":", err)
  30. }
  31. })
  32. }
  33. // Encoder is a simple interface for any type that can be encoded as an array of bytes
  34. // in order to be sent as the key or value of a Kafka message. Length() is provided as an
  35. // optimization, and must return the same as len() on the result of Encode().
  36. type Encoder interface {
  37. Encode() ([]byte, error)
  38. Length() int
  39. }
  40. // make strings and byte slices encodable for convenience so they can be used as keys
  41. // and/or values in kafka messages
  42. // StringEncoder implements the Encoder interface for Go strings so that you can do things like
  43. // producer.SendMessage(nil, sarama.StringEncoder("hello world"))
  44. type StringEncoder string
  45. func (s StringEncoder) Encode() ([]byte, error) {
  46. return []byte(s), nil
  47. }
  48. func (s StringEncoder) Length() int {
  49. return len(s)
  50. }
  51. // ByteEncoder implements the Encoder interface for Go byte slices so that you can do things like
  52. // producer.SendMessage(nil, sarama.ByteEncoder([]byte{0x00}))
  53. type ByteEncoder []byte
  54. func (b ByteEncoder) Encode() ([]byte, error) {
  55. return b, nil
  56. }
  57. func (b ByteEncoder) Length() int {
  58. return len(b)
  59. }