partitioner_test.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package sarama
  2. import (
  3. "crypto/rand"
  4. "log"
  5. "testing"
  6. )
  7. func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, message *ProducerMessage, numPartitions int32) {
  8. choice, err := partitioner.Partition(message, numPartitions)
  9. if err != nil {
  10. t.Error(partitioner, err)
  11. }
  12. if choice < 0 || choice >= numPartitions {
  13. t.Error(partitioner, "returned partition", choice, "outside of range for", message)
  14. }
  15. for i := 1; i < 50; i++ {
  16. newChoice, err := partitioner.Partition(message, numPartitions)
  17. if err != nil {
  18. t.Error(partitioner, err)
  19. }
  20. if newChoice != choice {
  21. t.Error(partitioner, "returned partition", newChoice, "inconsistent with", choice, ".")
  22. }
  23. }
  24. }
  25. func TestRandomPartitioner(t *testing.T) {
  26. partitioner := NewRandomPartitioner("mytopic")
  27. choice, err := partitioner.Partition(nil, 1)
  28. if err != nil {
  29. t.Error(partitioner, err)
  30. }
  31. if choice != 0 {
  32. t.Error("Returned non-zero partition when only one available.")
  33. }
  34. for i := 1; i < 50; i++ {
  35. choice, err := partitioner.Partition(nil, 50)
  36. if err != nil {
  37. t.Error(partitioner, err)
  38. }
  39. if choice < 0 || choice >= 50 {
  40. t.Error("Returned partition", choice, "outside of range.")
  41. }
  42. }
  43. }
  44. func TestRoundRobinPartitioner(t *testing.T) {
  45. partitioner := NewRoundRobinPartitioner("mytopic")
  46. choice, err := partitioner.Partition(nil, 1)
  47. if err != nil {
  48. t.Error(partitioner, err)
  49. }
  50. if choice != 0 {
  51. t.Error("Returned non-zero partition when only one available.")
  52. }
  53. var i int32
  54. for i = 1; i < 50; i++ {
  55. choice, err := partitioner.Partition(nil, 7)
  56. if err != nil {
  57. t.Error(partitioner, err)
  58. }
  59. if choice != i%7 {
  60. t.Error("Returned partition", choice, "expecting", i%7)
  61. }
  62. }
  63. }
  64. func TestHashPartitioner(t *testing.T) {
  65. partitioner := NewHashPartitioner("mytopic")
  66. choice, err := partitioner.Partition(&ProducerMessage{}, 1)
  67. if err != nil {
  68. t.Error(partitioner, err)
  69. }
  70. if choice != 0 {
  71. t.Error("Returned non-zero partition when only one available.")
  72. }
  73. for i := 1; i < 50; i++ {
  74. choice, err := partitioner.Partition(&ProducerMessage{}, 50)
  75. if err != nil {
  76. t.Error(partitioner, err)
  77. }
  78. if choice < 0 || choice >= 50 {
  79. t.Error("Returned partition", choice, "outside of range for nil key.")
  80. }
  81. }
  82. buf := make([]byte, 256)
  83. for i := 1; i < 50; i++ {
  84. if _, err := rand.Read(buf); err != nil {
  85. t.Error(err)
  86. }
  87. assertPartitioningConsistent(t, partitioner, &ProducerMessage{Key: ByteEncoder(buf)}, 50)
  88. }
  89. }
  90. func TestManualPartitioner(t *testing.T) {
  91. partitioner := NewManualPartitioner("mytopic")
  92. choice, err := partitioner.Partition(&ProducerMessage{}, 1)
  93. if err != nil {
  94. t.Error(partitioner, err)
  95. }
  96. if choice != 0 {
  97. t.Error("Returned non-zero partition when only one available.")
  98. }
  99. for i := int32(1); i < 50; i++ {
  100. choice, err := partitioner.Partition(&ProducerMessage{Partition: i}, 50)
  101. if err != nil {
  102. t.Error(partitioner, err)
  103. }
  104. if choice != i {
  105. t.Error("Returned partition not the same as the input partition")
  106. }
  107. }
  108. }
  109. // By default, Sarama uses the message's key to consistently assign a partition to
  110. // a message using hashing. If no key is set, a random partition will be chosen.
  111. // This example shows how you can partition messages randomly, even when a key is set,
  112. // by overriding Config.Producer.Partitioner.
  113. func ExamplePartitioner_random() {
  114. config := NewConfig()
  115. config.Producer.Partitioner = NewRandomPartitioner
  116. producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
  117. if err != nil {
  118. log.Fatal(err)
  119. }
  120. defer func() {
  121. if err := producer.Close(); err != nil {
  122. log.Println("Failed to close producer:", err)
  123. }
  124. }()
  125. msg := &ProducerMessage{Topic: "test", Key: StringEncoder("key is set"), Value: StringEncoder("test")}
  126. partition, offset, err := producer.SendMessage(msg)
  127. if err != nil {
  128. log.Fatalln("Failed to produce message to kafka cluster.")
  129. }
  130. log.Printf("Produced message to partition %d with offset %d", partition, offset)
  131. }
  132. // This example shows how to assign partitions to your messages manually.
  133. func ExamplePartitioner_manual() {
  134. config := NewConfig()
  135. // First, we tell the producer that we are going to partition ourselves.
  136. config.Producer.Partitioner = NewManualPartitioner
  137. producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
  138. if err != nil {
  139. log.Fatal(err)
  140. }
  141. defer func() {
  142. if err := producer.Close(); err != nil {
  143. log.Println("Failed to close producer:", err)
  144. }
  145. }()
  146. // Now, we set the Partition field of the ProducerMessage struct.
  147. msg := &ProducerMessage{Topic: "test", Partition: 6, Value: StringEncoder("test")}
  148. partition, offset, err := producer.SendMessage(msg)
  149. if err != nil {
  150. log.Fatalln("Failed to produce message to kafka cluster.")
  151. }
  152. if partition != 6 {
  153. log.Fatal("Message should have been produced to partition 6!")
  154. }
  155. log.Printf("Produced message to partition %d with offset %d", partition, offset)
  156. }
  157. // This example shows how to set a different partitioner depending on the topic.
  158. func ExamplePartitioner_per_topic() {
  159. config := NewConfig()
  160. config.Producer.Partitioner = func(topic string) Partitioner {
  161. switch topic {
  162. case "access_log", "error_log":
  163. return NewRandomPartitioner(topic)
  164. default:
  165. return NewHashPartitioner(topic)
  166. }
  167. }
  168. // ...
  169. }