produce_set_test.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. )
  7. func makeProduceSet() (*asyncProducer, *produceSet) {
  8. conf := NewConfig()
  9. txnmgr, _ := newTransactionManager(conf, nil)
  10. parent := &asyncProducer{
  11. conf: conf,
  12. txnmgr: txnmgr,
  13. }
  14. return parent, newProduceSet(parent)
  15. }
  16. func safeAddMessage(t *testing.T, ps *produceSet, msg *ProducerMessage) {
  17. if err := ps.add(msg); err != nil {
  18. t.Error(err)
  19. }
  20. }
  21. func TestProduceSetInitial(t *testing.T) {
  22. _, ps := makeProduceSet()
  23. if !ps.empty() {
  24. t.Error("New produceSet should be empty")
  25. }
  26. if ps.readyToFlush() {
  27. t.Error("Empty produceSet must never be ready to flush")
  28. }
  29. }
  30. func TestProduceSetAddingMessages(t *testing.T) {
  31. parent, ps := makeProduceSet()
  32. parent.conf.Producer.Flush.MaxMessages = 1000
  33. msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
  34. safeAddMessage(t, ps, msg)
  35. if ps.empty() {
  36. t.Error("set shouldn't be empty when a message is added")
  37. }
  38. if !ps.readyToFlush() {
  39. t.Error("by default set should be ready to flush when any message is in place")
  40. }
  41. for i := 0; i < 999; i++ {
  42. if ps.wouldOverflow(msg) {
  43. t.Error("set shouldn't fill up after only", i+1, "messages")
  44. }
  45. safeAddMessage(t, ps, msg)
  46. }
  47. if !ps.wouldOverflow(msg) {
  48. t.Error("set should be full after 1000 messages")
  49. }
  50. }
  51. func TestProduceSetPartitionTracking(t *testing.T) {
  52. _, ps := makeProduceSet()
  53. m1 := &ProducerMessage{Topic: "t1", Partition: 0}
  54. m2 := &ProducerMessage{Topic: "t1", Partition: 1}
  55. m3 := &ProducerMessage{Topic: "t2", Partition: 0}
  56. safeAddMessage(t, ps, m1)
  57. safeAddMessage(t, ps, m2)
  58. safeAddMessage(t, ps, m3)
  59. seenT1P0 := false
  60. seenT1P1 := false
  61. seenT2P0 := false
  62. ps.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  63. if len(pSet.msgs) != 1 {
  64. t.Error("Wrong message count")
  65. }
  66. if topic == "t1" && partition == 0 {
  67. seenT1P0 = true
  68. } else if topic == "t1" && partition == 1 {
  69. seenT1P1 = true
  70. } else if topic == "t2" && partition == 0 {
  71. seenT2P0 = true
  72. }
  73. })
  74. if !seenT1P0 {
  75. t.Error("Didn't see t1p0")
  76. }
  77. if !seenT1P1 {
  78. t.Error("Didn't see t1p1")
  79. }
  80. if !seenT2P0 {
  81. t.Error("Didn't see t2p0")
  82. }
  83. if len(ps.dropPartition("t1", 1)) != 1 {
  84. t.Error("Got wrong messages back from dropping partition")
  85. }
  86. if ps.bufferCount != 2 {
  87. t.Error("Incorrect buffer count after dropping partition")
  88. }
  89. }
  90. func TestProduceSetRequestBuilding(t *testing.T) {
  91. parent, ps := makeProduceSet()
  92. parent.conf.Producer.RequiredAcks = WaitForAll
  93. parent.conf.Producer.Timeout = 10 * time.Second
  94. msg := &ProducerMessage{
  95. Topic: "t1",
  96. Partition: 0,
  97. Key: StringEncoder(TestMessage),
  98. Value: StringEncoder(TestMessage),
  99. }
  100. for i := 0; i < 10; i++ {
  101. safeAddMessage(t, ps, msg)
  102. }
  103. msg.Partition = 1
  104. for i := 0; i < 10; i++ {
  105. safeAddMessage(t, ps, msg)
  106. }
  107. msg.Topic = "t2"
  108. for i := 0; i < 10; i++ {
  109. safeAddMessage(t, ps, msg)
  110. }
  111. req := ps.buildRequest()
  112. if req.RequiredAcks != WaitForAll {
  113. t.Error("RequiredAcks not set properly")
  114. }
  115. if req.Timeout != 10000 {
  116. t.Error("Timeout not set properly")
  117. }
  118. if len(req.records) != 2 {
  119. t.Error("Wrong number of topics in request")
  120. }
  121. }
  122. func TestProduceSetCompressedRequestBuilding(t *testing.T) {
  123. parent, ps := makeProduceSet()
  124. parent.conf.Producer.RequiredAcks = WaitForAll
  125. parent.conf.Producer.Timeout = 10 * time.Second
  126. parent.conf.Producer.Compression = CompressionGZIP
  127. parent.conf.Version = V0_10_0_0
  128. msg := &ProducerMessage{
  129. Topic: "t1",
  130. Partition: 0,
  131. Key: StringEncoder(TestMessage),
  132. Value: StringEncoder(TestMessage),
  133. Timestamp: time.Now(),
  134. }
  135. for i := 0; i < 10; i++ {
  136. safeAddMessage(t, ps, msg)
  137. }
  138. req := ps.buildRequest()
  139. if req.Version != 2 {
  140. t.Error("Wrong request version")
  141. }
  142. for _, msgBlock := range req.records["t1"][0].MsgSet.Messages {
  143. msg := msgBlock.Msg
  144. err := msg.decodeSet()
  145. if err != nil {
  146. t.Error("Failed to decode set from payload")
  147. }
  148. for i, compMsgBlock := range msg.Set.Messages {
  149. compMsg := compMsgBlock.Msg
  150. if compMsg.Version != 1 {
  151. t.Error("Wrong compressed message version")
  152. }
  153. if compMsgBlock.Offset != int64(i) {
  154. t.Errorf("Wrong relative inner offset, expected %d, got %d", i, compMsgBlock.Offset)
  155. }
  156. }
  157. if msg.Version != 1 {
  158. t.Error("Wrong compressed parent message version")
  159. }
  160. }
  161. }
  162. func TestProduceSetV3RequestBuilding(t *testing.T) {
  163. parent, ps := makeProduceSet()
  164. parent.conf.Producer.RequiredAcks = WaitForAll
  165. parent.conf.Producer.Timeout = 10 * time.Second
  166. parent.conf.Version = V0_11_0_0
  167. now := time.Now()
  168. msg := &ProducerMessage{
  169. Topic: "t1",
  170. Partition: 0,
  171. Key: StringEncoder(TestMessage),
  172. Value: StringEncoder(TestMessage),
  173. Headers: []RecordHeader{
  174. RecordHeader{
  175. Key: []byte("header-1"),
  176. Value: []byte("value-1"),
  177. },
  178. RecordHeader{
  179. Key: []byte("header-2"),
  180. Value: []byte("value-2"),
  181. },
  182. RecordHeader{
  183. Key: []byte("header-3"),
  184. Value: []byte("value-3"),
  185. },
  186. },
  187. Timestamp: now,
  188. }
  189. for i := 0; i < 10; i++ {
  190. safeAddMessage(t, ps, msg)
  191. msg.Timestamp = msg.Timestamp.Add(time.Second)
  192. }
  193. req := ps.buildRequest()
  194. if req.Version != 3 {
  195. t.Error("Wrong request version")
  196. }
  197. batch := req.records["t1"][0].RecordBatch
  198. if batch.FirstTimestamp != now {
  199. t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
  200. }
  201. for i := 0; i < 10; i++ {
  202. rec := batch.Records[i]
  203. if rec.TimestampDelta != time.Duration(i)*time.Second {
  204. t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta)
  205. }
  206. if rec.OffsetDelta != int64(i) {
  207. t.Errorf("Wrong relative inner offset, expected %d, got %d", i, rec.OffsetDelta)
  208. }
  209. for j, h := range batch.Records[i].Headers {
  210. exp := fmt.Sprintf("header-%d", j+1)
  211. if string(h.Key) != exp {
  212. t.Errorf("Wrong header key, expected %v, got %v", exp, h.Key)
  213. }
  214. exp = fmt.Sprintf("value-%d", j+1)
  215. if string(h.Value) != exp {
  216. t.Errorf("Wrong header value, expected %v, got %v", exp, h.Value)
  217. }
  218. }
  219. }
  220. }
  221. func TestProduceSetIdempotentRequestBuilding(t *testing.T) {
  222. const pID = 1000
  223. const pEpoch = 1234
  224. config := NewConfig()
  225. config.Producer.RequiredAcks = WaitForAll
  226. config.Producer.Idempotent = true
  227. config.Version = V0_11_0_0
  228. parent := &asyncProducer{
  229. conf: config,
  230. txnmgr: &transactionManager{
  231. producerID: pID,
  232. producerEpoch: pEpoch,
  233. },
  234. }
  235. ps := newProduceSet(parent)
  236. now := time.Now()
  237. msg := &ProducerMessage{
  238. Topic: "t1",
  239. Partition: 0,
  240. Key: StringEncoder(TestMessage),
  241. Value: StringEncoder(TestMessage),
  242. Headers: []RecordHeader{
  243. RecordHeader{
  244. Key: []byte("header-1"),
  245. Value: []byte("value-1"),
  246. },
  247. RecordHeader{
  248. Key: []byte("header-2"),
  249. Value: []byte("value-2"),
  250. },
  251. RecordHeader{
  252. Key: []byte("header-3"),
  253. Value: []byte("value-3"),
  254. },
  255. },
  256. Timestamp: now,
  257. sequenceNumber: 123,
  258. }
  259. for i := 0; i < 10; i++ {
  260. safeAddMessage(t, ps, msg)
  261. msg.Timestamp = msg.Timestamp.Add(time.Second)
  262. }
  263. req := ps.buildRequest()
  264. if req.Version != 3 {
  265. t.Error("Wrong request version")
  266. }
  267. batch := req.records["t1"][0].RecordBatch
  268. if batch.FirstTimestamp != now {
  269. t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
  270. }
  271. if batch.ProducerID != pID {
  272. t.Errorf("Wrong producerID: %v", batch.ProducerID)
  273. }
  274. if batch.ProducerEpoch != pEpoch {
  275. t.Errorf("Wrong producerEpoch: %v", batch.ProducerEpoch)
  276. }
  277. if batch.FirstSequence != 123 {
  278. t.Errorf("Wrong first sequence: %v", batch.FirstSequence)
  279. }
  280. for i := 0; i < 10; i++ {
  281. rec := batch.Records[i]
  282. if rec.TimestampDelta != time.Duration(i)*time.Second {
  283. t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta)
  284. }
  285. if rec.OffsetDelta != int64(i) {
  286. t.Errorf("Wrong relative inner offset, expected %d, got %d", i, rec.OffsetDelta)
  287. }
  288. for j, h := range batch.Records[i].Headers {
  289. exp := fmt.Sprintf("header-%d", j+1)
  290. if string(h.Key) != exp {
  291. t.Errorf("Wrong header key, expected %v, got %v", exp, h.Key)
  292. }
  293. exp = fmt.Sprintf("value-%d", j+1)
  294. if string(h.Value) != exp {
  295. t.Errorf("Wrong header value, expected %v, got %v", exp, h.Value)
  296. }
  297. }
  298. }
  299. }