partitioner_test.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package sarama
  2. import (
  3. "crypto/rand"
  4. "log"
  5. "testing"
  6. )
  7. func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, message *ProducerMessage, numPartitions int32) {
  8. choice, err := partitioner.Partition(message, numPartitions)
  9. if err != nil {
  10. t.Error(partitioner, err)
  11. }
  12. if choice < 0 || choice >= numPartitions {
  13. t.Error(partitioner, "returned partition", choice, "outside of range for", message)
  14. }
  15. for i := 1; i < 50; i++ {
  16. newChoice, err := partitioner.Partition(message, numPartitions)
  17. if err != nil {
  18. t.Error(partitioner, err)
  19. }
  20. if newChoice != choice {
  21. t.Error(partitioner, "returned partition", newChoice, "inconsistent with", choice, ".")
  22. }
  23. }
  24. }
  25. func TestRandomPartitioner(t *testing.T) {
  26. partitioner := NewRandomPartitioner("mytopic")
  27. choice, err := partitioner.Partition(nil, 1)
  28. if err != nil {
  29. t.Error(partitioner, err)
  30. }
  31. if choice != 0 {
  32. t.Error("Returned non-zero partition when only one available.")
  33. }
  34. for i := 1; i < 50; i++ {
  35. choice, err := partitioner.Partition(nil, 50)
  36. if err != nil {
  37. t.Error(partitioner, err)
  38. }
  39. if choice < 0 || choice >= 50 {
  40. t.Error("Returned partition", choice, "outside of range.")
  41. }
  42. }
  43. }
  44. func TestRoundRobinPartitioner(t *testing.T) {
  45. partitioner := NewRoundRobinPartitioner("mytopic")
  46. choice, err := partitioner.Partition(nil, 1)
  47. if err != nil {
  48. t.Error(partitioner, err)
  49. }
  50. if choice != 0 {
  51. t.Error("Returned non-zero partition when only one available.")
  52. }
  53. var i int32
  54. for i = 1; i < 50; i++ {
  55. choice, err := partitioner.Partition(nil, 7)
  56. if err != nil {
  57. t.Error(partitioner, err)
  58. }
  59. if choice != i%7 {
  60. t.Error("Returned partition", choice, "expecting", i%7)
  61. }
  62. }
  63. }
  64. func TestHashPartitioner(t *testing.T) {
  65. partitioner := NewHashPartitioner("mytopic")
  66. choice, err := partitioner.Partition(&ProducerMessage{}, 1)
  67. if err != nil {
  68. t.Error(partitioner, err)
  69. }
  70. if choice != 0 {
  71. t.Error("Returned non-zero partition when only one available.")
  72. }
  73. for i := 1; i < 50; i++ {
  74. choice, err := partitioner.Partition(&ProducerMessage{}, 50)
  75. if err != nil {
  76. t.Error(partitioner, err)
  77. }
  78. if choice < 0 || choice >= 50 {
  79. t.Error("Returned partition", choice, "outside of range for nil key.")
  80. }
  81. }
  82. buf := make([]byte, 256)
  83. for i := 1; i < 50; i++ {
  84. if _, err := rand.Read(buf); err != nil {
  85. t.Error(err)
  86. }
  87. assertPartitioningConsistent(t, partitioner, &ProducerMessage{Key: ByteEncoder(buf)}, 50)
  88. }
  89. }
  90. func TestHashPartitionerMinInt32(t *testing.T) {
  91. partitioner := NewHashPartitioner("mytopic")
  92. msg := ProducerMessage{}
  93. // "1468509572224" generates 2147483648 (uint32) result from Sum32 function
  94. // which is -2147483648 or int32's min value
  95. msg.Key = StringEncoder("1468509572224")
  96. choice, err := partitioner.Partition(&msg, 50)
  97. if err != nil {
  98. t.Error(partitioner, err)
  99. }
  100. if choice < 0 || choice >= 50 {
  101. t.Error("Returned partition", choice, "outside of range for nil key.")
  102. }
  103. }
  104. func TestManualPartitioner(t *testing.T) {
  105. partitioner := NewManualPartitioner("mytopic")
  106. choice, err := partitioner.Partition(&ProducerMessage{}, 1)
  107. if err != nil {
  108. t.Error(partitioner, err)
  109. }
  110. if choice != 0 {
  111. t.Error("Returned non-zero partition when only one available.")
  112. }
  113. for i := int32(1); i < 50; i++ {
  114. choice, err := partitioner.Partition(&ProducerMessage{Partition: i}, 50)
  115. if err != nil {
  116. t.Error(partitioner, err)
  117. }
  118. if choice != i {
  119. t.Error("Returned partition not the same as the input partition")
  120. }
  121. }
  122. }
  123. // By default, Sarama uses the message's key to consistently assign a partition to
  124. // a message using hashing. If no key is set, a random partition will be chosen.
  125. // This example shows how you can partition messages randomly, even when a key is set,
  126. // by overriding Config.Producer.Partitioner.
  127. func ExamplePartitioner_random() {
  128. config := NewConfig()
  129. config.Producer.Partitioner = NewRandomPartitioner
  130. producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
  131. if err != nil {
  132. log.Fatal(err)
  133. }
  134. defer func() {
  135. if err := producer.Close(); err != nil {
  136. log.Println("Failed to close producer:", err)
  137. }
  138. }()
  139. msg := &ProducerMessage{Topic: "test", Key: StringEncoder("key is set"), Value: StringEncoder("test")}
  140. partition, offset, err := producer.SendMessage(msg)
  141. if err != nil {
  142. log.Fatalln("Failed to produce message to kafka cluster.")
  143. }
  144. log.Printf("Produced message to partition %d with offset %d", partition, offset)
  145. }
  146. // This example shows how to assign partitions to your messages manually.
  147. func ExamplePartitioner_manual() {
  148. config := NewConfig()
  149. // First, we tell the producer that we are going to partition ourselves.
  150. config.Producer.Partitioner = NewManualPartitioner
  151. producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
  152. if err != nil {
  153. log.Fatal(err)
  154. }
  155. defer func() {
  156. if err := producer.Close(); err != nil {
  157. log.Println("Failed to close producer:", err)
  158. }
  159. }()
  160. // Now, we set the Partition field of the ProducerMessage struct.
  161. msg := &ProducerMessage{Topic: "test", Partition: 6, Value: StringEncoder("test")}
  162. partition, offset, err := producer.SendMessage(msg)
  163. if err != nil {
  164. log.Fatalln("Failed to produce message to kafka cluster.")
  165. }
  166. if partition != 6 {
  167. log.Fatal("Message should have been produced to partition 6!")
  168. }
  169. log.Printf("Produced message to partition %d with offset %d", partition, offset)
  170. }
  171. // This example shows how to set a different partitioner depending on the topic.
  172. func ExamplePartitioner_per_topic() {
  173. config := NewConfig()
  174. config.Producer.Partitioner = func(topic string) Partitioner {
  175. switch topic {
  176. case "access_log", "error_log":
  177. return NewRandomPartitioner(topic)
  178. default:
  179. return NewHashPartitioner(topic)
  180. }
  181. }
  182. // ...
  183. }