utils.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package sarama
  2. import "io"
  3. // make []int32 sortable so we can sort partition numbers
  4. type int32Slice []int32
  5. func (slice int32Slice) Len() int {
  6. return len(slice)
  7. }
  8. func (slice int32Slice) Less(i, j int) bool {
  9. return slice[i] < slice[j]
  10. }
  11. func (slice int32Slice) Swap(i, j int) {
  12. slice[i], slice[j] = slice[j], slice[i]
  13. }
  14. func withRecover(fn func()) {
  15. defer func() {
  16. if PanicHandler != nil {
  17. if err := recover(); err != nil {
  18. PanicHandler(err)
  19. }
  20. }
  21. }()
  22. fn()
  23. }
  24. func safeAsyncClose(c io.Closer) {
  25. tmp := c // local var prevents clobbering in goroutine
  26. go withRecover(func() {
  27. if err := tmp.Close(); err != nil {
  28. Logger.Println("Error closing", tmp, ":", err)
  29. }
  30. })
  31. }
  32. // Encoder is a simple interface for any type that can be encoded as an array of bytes
  33. // in order to be sent as the key or value of a Kafka message. Length() is provided as an
  34. // optimization, and must return the same as len() on the result of Encode().
  35. type Encoder interface {
  36. Encode() ([]byte, error)
  37. Length() int
  38. }
  39. // make strings and byte slices encodable for convenience so they can be used as keys
  40. // and/or values in kafka messages
  41. // StringEncoder implements the Encoder interface for Go strings so that you can do things like
  42. // producer.SendMessage(nil, sarama.StringEncoder("hello world"))
  43. type StringEncoder string
  44. func (s StringEncoder) Encode() ([]byte, error) {
  45. return []byte(s), nil
  46. }
  47. func (s StringEncoder) Length() int {
  48. return len(s)
  49. }
  50. // ByteEncoder implements the Encoder interface for Go byte slices so that you can do things like
  51. // producer.SendMessage(nil, sarama.ByteEncoder([]byte{0x00}))
  52. type ByteEncoder []byte
  53. func (b ByteEncoder) Encode() ([]byte, error) {
  54. return b, nil
  55. }
  56. func (b ByteEncoder) Length() int {
  57. return len(b)
  58. }