partitioner.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package sarama
  2. import (
  3. "hash"
  4. "hash/fnv"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. )
  9. // Partitioner is anything that, given a Kafka message key and a number of partitions indexed [0...numPartitions-1],
  10. // decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
  11. // as simple default implementations.
  12. type Partitioner interface {
  13. Partition(key Encoder, numPartitions int32) int32
  14. }
  15. // RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
  16. type RandomPartitioner struct {
  17. generator *rand.Rand
  18. m sync.Mutex
  19. }
  20. func NewRandomPartitioner() *RandomPartitioner {
  21. p := new(RandomPartitioner)
  22. p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
  23. return p
  24. }
  25. func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) int32 {
  26. p.m.Lock()
  27. defer p.m.Unlock()
  28. return int32(p.generator.Intn(int(numPartitions)))
  29. }
  30. // RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
  31. type RoundRobinPartitioner struct {
  32. partition int32
  33. }
  34. func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) int32 {
  35. if p.partition >= numPartitions {
  36. p.partition = 0
  37. }
  38. ret := p.partition
  39. p.partition++
  40. return ret
  41. }
  42. // HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition
  43. // is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages
  44. // with the same key always end up on the same partition.
  45. type HashPartitioner struct {
  46. random *RandomPartitioner
  47. hasher hash.Hash32
  48. }
  49. func NewHashPartitioner() *HashPartitioner {
  50. p := new(HashPartitioner)
  51. p.random = NewRandomPartitioner()
  52. p.hasher = fnv.New32a()
  53. return p
  54. }
  55. func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) int32 {
  56. if key == nil {
  57. return p.random.Partition(key, numPartitions)
  58. }
  59. bytes, err := key.Encode()
  60. if err != nil {
  61. return p.random.Partition(key, numPartitions)
  62. }
  63. p.hasher.Reset()
  64. p.hasher.Write(bytes)
  65. hash := int32(p.hasher.Sum32())
  66. if hash < 0 {
  67. hash = -hash
  68. }
  69. return hash % numPartitions
  70. }