produce_set_test.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. )
  7. func makeProduceSet() (*asyncProducer, *produceSet) {
  8. parent := &asyncProducer{
  9. conf: NewConfig(),
  10. }
  11. return parent, newProduceSet(parent)
  12. }
  13. func safeAddMessage(t *testing.T, ps *produceSet, msg *ProducerMessage) {
  14. if err := ps.add(msg); err != nil {
  15. t.Error(err)
  16. }
  17. }
  18. func TestProduceSetInitial(t *testing.T) {
  19. _, ps := makeProduceSet()
  20. if !ps.empty() {
  21. t.Error("New produceSet should be empty")
  22. }
  23. if ps.readyToFlush() {
  24. t.Error("Empty produceSet must never be ready to flush")
  25. }
  26. }
  27. func TestProduceSetAddingMessages(t *testing.T) {
  28. parent, ps := makeProduceSet()
  29. parent.conf.Producer.Flush.MaxMessages = 1000
  30. msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
  31. safeAddMessage(t, ps, msg)
  32. if ps.empty() {
  33. t.Error("set shouldn't be empty when a message is added")
  34. }
  35. if !ps.readyToFlush() {
  36. t.Error("by default set should be ready to flush when any message is in place")
  37. }
  38. for i := 0; i < 999; i++ {
  39. if ps.wouldOverflow(msg) {
  40. t.Error("set shouldn't fill up after only", i+1, "messages")
  41. }
  42. safeAddMessage(t, ps, msg)
  43. }
  44. if !ps.wouldOverflow(msg) {
  45. t.Error("set should be full after 1000 messages")
  46. }
  47. }
  48. func TestProduceSetPartitionTracking(t *testing.T) {
  49. _, ps := makeProduceSet()
  50. m1 := &ProducerMessage{Topic: "t1", Partition: 0}
  51. m2 := &ProducerMessage{Topic: "t1", Partition: 1}
  52. m3 := &ProducerMessage{Topic: "t2", Partition: 0}
  53. safeAddMessage(t, ps, m1)
  54. safeAddMessage(t, ps, m2)
  55. safeAddMessage(t, ps, m3)
  56. seenT1P0 := false
  57. seenT1P1 := false
  58. seenT2P0 := false
  59. ps.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
  60. if len(msgs) != 1 {
  61. t.Error("Wrong message count")
  62. }
  63. if topic == "t1" && partition == 0 {
  64. seenT1P0 = true
  65. } else if topic == "t1" && partition == 1 {
  66. seenT1P1 = true
  67. } else if topic == "t2" && partition == 0 {
  68. seenT2P0 = true
  69. }
  70. })
  71. if !seenT1P0 {
  72. t.Error("Didn't see t1p0")
  73. }
  74. if !seenT1P1 {
  75. t.Error("Didn't see t1p1")
  76. }
  77. if !seenT2P0 {
  78. t.Error("Didn't see t2p0")
  79. }
  80. if len(ps.dropPartition("t1", 1)) != 1 {
  81. t.Error("Got wrong messages back from dropping partition")
  82. }
  83. if ps.bufferCount != 2 {
  84. t.Error("Incorrect buffer count after dropping partition")
  85. }
  86. }
  87. func TestProduceSetRequestBuilding(t *testing.T) {
  88. parent, ps := makeProduceSet()
  89. parent.conf.Producer.RequiredAcks = WaitForAll
  90. parent.conf.Producer.Timeout = 10 * time.Second
  91. msg := &ProducerMessage{
  92. Topic: "t1",
  93. Partition: 0,
  94. Key: StringEncoder(TestMessage),
  95. Value: StringEncoder(TestMessage),
  96. }
  97. for i := 0; i < 10; i++ {
  98. safeAddMessage(t, ps, msg)
  99. }
  100. msg.Partition = 1
  101. for i := 0; i < 10; i++ {
  102. safeAddMessage(t, ps, msg)
  103. }
  104. msg.Topic = "t2"
  105. for i := 0; i < 10; i++ {
  106. safeAddMessage(t, ps, msg)
  107. }
  108. req := ps.buildRequest()
  109. if req.RequiredAcks != WaitForAll {
  110. t.Error("RequiredAcks not set properly")
  111. }
  112. if req.Timeout != 10000 {
  113. t.Error("Timeout not set properly")
  114. }
  115. if len(req.records) != 2 {
  116. t.Error("Wrong number of topics in request")
  117. }
  118. }
  119. func TestProduceSetCompressedRequestBuilding(t *testing.T) {
  120. parent, ps := makeProduceSet()
  121. parent.conf.Producer.RequiredAcks = WaitForAll
  122. parent.conf.Producer.Timeout = 10 * time.Second
  123. parent.conf.Producer.Compression = CompressionGZIP
  124. parent.conf.Version = V0_10_0_0
  125. msg := &ProducerMessage{
  126. Topic: "t1",
  127. Partition: 0,
  128. Key: StringEncoder(TestMessage),
  129. Value: StringEncoder(TestMessage),
  130. Timestamp: time.Now(),
  131. }
  132. for i := 0; i < 10; i++ {
  133. safeAddMessage(t, ps, msg)
  134. }
  135. req := ps.buildRequest()
  136. if req.Version != 2 {
  137. t.Error("Wrong request version")
  138. }
  139. for _, msgBlock := range req.records["t1"][0].MsgSet.Messages {
  140. msg := msgBlock.Msg
  141. err := msg.decodeSet()
  142. if err != nil {
  143. t.Error("Failed to decode set from payload")
  144. }
  145. for i, compMsgBlock := range msg.Set.Messages {
  146. compMsg := compMsgBlock.Msg
  147. if compMsg.Version != 1 {
  148. t.Error("Wrong compressed message version")
  149. }
  150. if compMsgBlock.Offset != int64(i) {
  151. t.Errorf("Wrong relative inner offset, expected %d, got %d", i, compMsgBlock.Offset)
  152. }
  153. }
  154. if msg.Version != 1 {
  155. t.Error("Wrong compressed parent message version")
  156. }
  157. }
  158. }
  159. func TestProduceSetV3RequestBuilding(t *testing.T) {
  160. parent, ps := makeProduceSet()
  161. parent.conf.Producer.RequiredAcks = WaitForAll
  162. parent.conf.Producer.Timeout = 10 * time.Second
  163. parent.conf.Version = V0_11_0_0
  164. now := time.Now()
  165. msg := &ProducerMessage{
  166. Topic: "t1",
  167. Partition: 0,
  168. Key: StringEncoder(TestMessage),
  169. Value: StringEncoder(TestMessage),
  170. Headers: []RecordHeader{
  171. RecordHeader{
  172. Key: []byte("header-1"),
  173. Value: []byte("value-1"),
  174. },
  175. RecordHeader{
  176. Key: []byte("header-2"),
  177. Value: []byte("value-2"),
  178. },
  179. RecordHeader{
  180. Key: []byte("header-3"),
  181. Value: []byte("value-3"),
  182. },
  183. },
  184. Timestamp: now,
  185. }
  186. for i := 0; i < 10; i++ {
  187. safeAddMessage(t, ps, msg)
  188. msg.Timestamp = msg.Timestamp.Add(time.Second)
  189. }
  190. req := ps.buildRequest()
  191. if req.Version != 3 {
  192. t.Error("Wrong request version")
  193. }
  194. batch := req.records["t1"][0].RecordBatch
  195. if batch.FirstTimestamp != now {
  196. t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
  197. }
  198. for i := 0; i < 10; i++ {
  199. rec := batch.Records[i]
  200. if rec.TimestampDelta != time.Duration(i)*time.Second {
  201. t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta)
  202. }
  203. if rec.OffsetDelta != int64(i) {
  204. t.Errorf("Wrong relative inner offset, expected %d, got %d", i, rec.OffsetDelta)
  205. }
  206. for j, h := range batch.Records[i].Headers {
  207. exp := fmt.Sprintf("header-%d", j+1)
  208. if string(h.Key) != exp {
  209. t.Errorf("Wrong header key, expected %v, got %v", exp, h.Key)
  210. }
  211. exp = fmt.Sprintf("value-%d", j+1)
  212. if string(h.Value) != exp {
  213. t.Errorf("Wrong header value, expected %v, got %v", exp, h.Value)
  214. }
  215. }
  216. }
  217. }