produce_set_test.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package sarama
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. func makeProduceSet() (*asyncProducer, *produceSet) {
  7. parent := &asyncProducer{
  8. conf: NewConfig(),
  9. }
  10. return parent, newProduceSet(parent)
  11. }
  12. func safeAddMessage(t *testing.T, ps *produceSet, msg *ProducerMessage) {
  13. if err := ps.add(msg); err != nil {
  14. t.Error(err)
  15. }
  16. }
  17. func TestProduceSetInitial(t *testing.T) {
  18. _, ps := makeProduceSet()
  19. if !ps.empty() {
  20. t.Error("New produceSet should be empty")
  21. }
  22. if ps.readyToFlush() {
  23. t.Error("Empty produceSet must never be ready to flush")
  24. }
  25. }
  26. func TestProduceSetAddingMessages(t *testing.T) {
  27. parent, ps := makeProduceSet()
  28. parent.conf.Producer.Flush.MaxMessages = 1000
  29. msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
  30. safeAddMessage(t, ps, msg)
  31. if ps.empty() {
  32. t.Error("set shouldn't be empty when a message is added")
  33. }
  34. if !ps.readyToFlush() {
  35. t.Error("by default set should be ready to flush when any message is in place")
  36. }
  37. for i := 0; i < 999; i++ {
  38. if ps.wouldOverflow(msg) {
  39. t.Error("set shouldn't fill up after only", i+1, "messages")
  40. }
  41. safeAddMessage(t, ps, msg)
  42. }
  43. if !ps.wouldOverflow(msg) {
  44. t.Error("set should be full after 1000 messages")
  45. }
  46. }
  47. func TestProduceSetPartitionTracking(t *testing.T) {
  48. _, ps := makeProduceSet()
  49. m1 := &ProducerMessage{Topic: "t1", Partition: 0}
  50. m2 := &ProducerMessage{Topic: "t1", Partition: 1}
  51. m3 := &ProducerMessage{Topic: "t2", Partition: 0}
  52. safeAddMessage(t, ps, m1)
  53. safeAddMessage(t, ps, m2)
  54. safeAddMessage(t, ps, m3)
  55. seenT1P0 := false
  56. seenT1P1 := false
  57. seenT2P0 := false
  58. ps.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
  59. if len(msgs) != 1 {
  60. t.Error("Wrong message count")
  61. }
  62. if topic == "t1" && partition == 0 {
  63. seenT1P0 = true
  64. } else if topic == "t1" && partition == 1 {
  65. seenT1P1 = true
  66. } else if topic == "t2" && partition == 0 {
  67. seenT2P0 = true
  68. }
  69. })
  70. if !seenT1P0 {
  71. t.Error("Didn't see t1p0")
  72. }
  73. if !seenT1P1 {
  74. t.Error("Didn't see t1p1")
  75. }
  76. if !seenT2P0 {
  77. t.Error("Didn't see t2p0")
  78. }
  79. if len(ps.dropPartition("t1", 1)) != 1 {
  80. t.Error("Got wrong messages back from dropping partition")
  81. }
  82. if ps.bufferCount != 2 {
  83. t.Error("Incorrect buffer count after dropping partition")
  84. }
  85. }
  86. func TestProduceSetRequestBuilding(t *testing.T) {
  87. parent, ps := makeProduceSet()
  88. parent.conf.Producer.RequiredAcks = WaitForAll
  89. parent.conf.Producer.Timeout = 10 * time.Second
  90. msg := &ProducerMessage{
  91. Topic: "t1",
  92. Partition: 0,
  93. Key: StringEncoder(TestMessage),
  94. Value: StringEncoder(TestMessage),
  95. }
  96. for i := 0; i < 10; i++ {
  97. safeAddMessage(t, ps, msg)
  98. }
  99. msg.Partition = 1
  100. for i := 0; i < 10; i++ {
  101. safeAddMessage(t, ps, msg)
  102. }
  103. msg.Topic = "t2"
  104. for i := 0; i < 10; i++ {
  105. safeAddMessage(t, ps, msg)
  106. }
  107. req := ps.buildRequest()
  108. if req.RequiredAcks != WaitForAll {
  109. t.Error("RequiredAcks not set properly")
  110. }
  111. if req.Timeout != 10000 {
  112. t.Error("Timeout not set properly")
  113. }
  114. if len(req.msgSets) != 2 {
  115. t.Error("Wrong number of topics in request")
  116. }
  117. }