utils.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. package kafka
  2. // make []int32 sortable so we can sort partition numbers
  3. type int32Slice []int32
  4. func (slice int32Slice) Len() int {
  5. return len(slice)
  6. }
  7. func (slice int32Slice) Less(i, j int) bool {
  8. return slice[i] < slice[j]
  9. }
  10. func (slice int32Slice) Swap(i, j int) {
  11. slice[i], slice[j] = slice[j], slice[i]
  12. }
  13. // A simple interface for any type that can be encoded as an array of bytes
  14. // in order to be sent as the key or value of a Kafka message.
  15. type Encoder interface {
  16. Encode() ([]byte, error)
  17. }
  18. // make strings and byte slices encodable for convenience so they can be used as keys
  19. // and/or values in kafka messages
  20. // StringEncoder implements the Encoder interface for Go strings so that you can do things like
  21. // producer.SendMessage(nil, kafka.StringEncoder("hello world"))
  22. type StringEncoder string
  23. func (s StringEncoder) Encode() ([]byte, error) {
  24. return []byte(s), nil
  25. }
  26. // ByteEncoder implements the Encoder interface for Go byte slices so that you can do things like
  27. // producer.SendMessage(nil, kafka.ByteEncoder([]byte{0x00}))
  28. type ByteEncoder []byte
  29. func (b ByteEncoder) Encode() ([]byte, error) {
  30. return b, nil
  31. }
  32. // create a message struct to return from high-level fetch requests
  33. // we could in theory use sarama/protocol/message.go but that has to match the
  34. // wire protocol, which doesn't quite line up with what we actually need to return
  35. // Message is what is returned from fetch requests.
  36. type Message struct {
  37. Offset int64
  38. Key []byte
  39. Value []byte
  40. }