partitioner.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package sarama
  2. import (
  3. "hash"
  4. "hash/fnv"
  5. "math/rand"
  6. "time"
  7. )
  8. // Partitioner is anything that, given a Kafka message key and a number of partitions indexed [0...numPartitions-1],
  9. // decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
  10. // as simple default implementations.
  11. type Partitioner interface {
  12. Partition(key Encoder, numPartitions int32) int32
  13. }
  14. // RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
  15. type RandomPartitioner struct {
  16. generator *rand.Rand
  17. }
  18. func NewRandomPartitioner() *RandomPartitioner {
  19. p := new(RandomPartitioner)
  20. p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
  21. return p
  22. }
  23. func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) int32 {
  24. return int32(p.generator.Intn(int(numPartitions)))
  25. }
  26. // RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
  27. type RoundRobinPartitioner struct {
  28. partition int32
  29. }
  30. func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) int32 {
  31. if p.partition >= numPartitions {
  32. p.partition = 0
  33. }
  34. ret := p.partition
  35. p.partition++
  36. return ret
  37. }
  38. // HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition
  39. // is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages
  40. // with the same key always end up on the same partition.
  41. type HashPartitioner struct {
  42. random *RandomPartitioner
  43. hasher hash.Hash32
  44. }
  45. func NewHashPartitioner() *HashPartitioner {
  46. p := new(HashPartitioner)
  47. p.random = NewRandomPartitioner()
  48. p.hasher = fnv.New32a()
  49. return p
  50. }
  51. func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) int32 {
  52. if key == nil {
  53. return p.random.Partition(key, numPartitions)
  54. }
  55. bytes, err := key.Encode()
  56. if err != nil {
  57. return p.random.Partition(key, numPartitions)
  58. }
  59. p.hasher.Reset()
  60. p.hasher.Write(bytes)
  61. hash := int32(p.hasher.Sum32())
  62. if hash < 0 {
  63. hash = -hash
  64. }
  65. return hash % numPartitions
  66. }