partitioner.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package sarama
  2. import (
  3. "hash"
  4. "hash/fnv"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. )
  9. // Partitioner is anything that, given a Kafka message key and a number of partitions indexed [0...numPartitions-1],
  10. // decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
  11. // as simple default implementations.
  12. type Partitioner interface {
  13. Partition(key Encoder, numPartitions int32) int32
  14. }
  15. // RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
  16. type RandomPartitioner struct {
  17. generator *rand.Rand
  18. m sync.Mutex
  19. }
  20. func NewRandomPartitioner() *RandomPartitioner {
  21. p := new(RandomPartitioner)
  22. p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
  23. return p
  24. }
  25. func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) int32 {
  26. p.m.Lock()
  27. defer p.m.Unlock()
  28. return int32(p.generator.Intn(int(numPartitions)))
  29. }
  30. // RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
  31. type RoundRobinPartitioner struct {
  32. partition int32
  33. m sync.Mutex
  34. }
  35. func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) int32 {
  36. p.m.Lock()
  37. defer p.m.Unlock()
  38. if p.partition >= numPartitions {
  39. p.partition = 0
  40. }
  41. ret := p.partition
  42. p.partition++
  43. return ret
  44. }
  45. // HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition
  46. // is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages
  47. // with the same key always end up on the same partition.
  48. type HashPartitioner struct {
  49. random *RandomPartitioner
  50. hasher hash.Hash32
  51. m sync.Mutex
  52. }
  53. func NewHashPartitioner() *HashPartitioner {
  54. p := new(HashPartitioner)
  55. p.random = NewRandomPartitioner()
  56. p.hasher = fnv.New32a()
  57. return p
  58. }
  59. func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) int32 {
  60. p.m.Lock()
  61. defer p.m.Unlock()
  62. if key == nil {
  63. return p.random.Partition(key, numPartitions)
  64. }
  65. bytes, err := key.Encode()
  66. if err != nil {
  67. return p.random.Partition(key, numPartitions)
  68. }
  69. p.hasher.Reset()
  70. p.hasher.Write(bytes)
  71. hash := int32(p.hasher.Sum32())
  72. if hash < 0 {
  73. hash = -hash
  74. }
  75. return hash % numPartitions
  76. }