produce_set_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. )
  7. func makeProduceSet() (*asyncProducer, *produceSet) {
  8. conf := NewConfig()
  9. txnmgr, _ := newTransactionManager(conf, nil)
  10. parent := &asyncProducer{
  11. conf: conf,
  12. txnmgr: txnmgr,
  13. }
  14. return parent, newProduceSet(parent)
  15. }
  16. func safeAddMessage(t *testing.T, ps *produceSet, msg *ProducerMessage) {
  17. if err := ps.add(msg); err != nil {
  18. t.Error(err)
  19. }
  20. }
  21. func TestProduceSetInitial(t *testing.T) {
  22. _, ps := makeProduceSet()
  23. if !ps.empty() {
  24. t.Error("New produceSet should be empty")
  25. }
  26. if ps.readyToFlush() {
  27. t.Error("Empty produceSet must never be ready to flush")
  28. }
  29. }
  30. func TestProduceSetAddingMessages(t *testing.T) {
  31. _, ps := makeProduceSet()
  32. msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
  33. safeAddMessage(t, ps, msg)
  34. if ps.empty() {
  35. t.Error("set shouldn't be empty when a message is added")
  36. }
  37. if !ps.readyToFlush() {
  38. t.Error("by default set should be ready to flush when any message is in place")
  39. }
  40. }
  41. func TestProduceSetAddingMessagesOverflowMessagesLimit(t *testing.T) {
  42. parent, ps := makeProduceSet()
  43. parent.conf.Producer.Flush.MaxMessages = 1000
  44. msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
  45. for i := 0; i < 1000; i++ {
  46. if ps.wouldOverflow(msg) {
  47. t.Error("set shouldn't fill up after only", i+1, "messages")
  48. }
  49. safeAddMessage(t, ps, msg)
  50. }
  51. if !ps.wouldOverflow(msg) {
  52. t.Error("set should be full after 1000 messages")
  53. }
  54. }
  55. func TestProduceSetAddingMessagesOverflowBytesLimit(t *testing.T) {
  56. parent, ps := makeProduceSet()
  57. parent.conf.Producer.MaxMessageBytes = 1000
  58. msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}
  59. for ps.bufferBytes+msg.byteSize(2) < parent.conf.Producer.MaxMessageBytes {
  60. if ps.wouldOverflow(msg) {
  61. t.Error("set shouldn't fill up before 1000 bytes")
  62. }
  63. safeAddMessage(t, ps, msg)
  64. }
  65. if !ps.wouldOverflow(msg) {
  66. t.Error("set should be full after 1000 bytes")
  67. }
  68. }
  69. func TestProduceSetPartitionTracking(t *testing.T) {
  70. _, ps := makeProduceSet()
  71. m1 := &ProducerMessage{Topic: "t1", Partition: 0}
  72. m2 := &ProducerMessage{Topic: "t1", Partition: 1}
  73. m3 := &ProducerMessage{Topic: "t2", Partition: 0}
  74. safeAddMessage(t, ps, m1)
  75. safeAddMessage(t, ps, m2)
  76. safeAddMessage(t, ps, m3)
  77. seenT1P0 := false
  78. seenT1P1 := false
  79. seenT2P0 := false
  80. ps.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  81. if len(pSet.msgs) != 1 {
  82. t.Error("Wrong message count")
  83. }
  84. if topic == "t1" && partition == 0 {
  85. seenT1P0 = true
  86. } else if topic == "t1" && partition == 1 {
  87. seenT1P1 = true
  88. } else if topic == "t2" && partition == 0 {
  89. seenT2P0 = true
  90. }
  91. })
  92. if !seenT1P0 {
  93. t.Error("Didn't see t1p0")
  94. }
  95. if !seenT1P1 {
  96. t.Error("Didn't see t1p1")
  97. }
  98. if !seenT2P0 {
  99. t.Error("Didn't see t2p0")
  100. }
  101. if len(ps.dropPartition("t1", 1)) != 1 {
  102. t.Error("Got wrong messages back from dropping partition")
  103. }
  104. if ps.bufferCount != 2 {
  105. t.Error("Incorrect buffer count after dropping partition")
  106. }
  107. }
  108. func TestProduceSetRequestBuilding(t *testing.T) {
  109. parent, ps := makeProduceSet()
  110. parent.conf.Producer.RequiredAcks = WaitForAll
  111. parent.conf.Producer.Timeout = 10 * time.Second
  112. msg := &ProducerMessage{
  113. Topic: "t1",
  114. Partition: 0,
  115. Key: StringEncoder(TestMessage),
  116. Value: StringEncoder(TestMessage),
  117. }
  118. for i := 0; i < 10; i++ {
  119. safeAddMessage(t, ps, msg)
  120. }
  121. msg.Partition = 1
  122. for i := 0; i < 10; i++ {
  123. safeAddMessage(t, ps, msg)
  124. }
  125. msg.Topic = "t2"
  126. for i := 0; i < 10; i++ {
  127. safeAddMessage(t, ps, msg)
  128. }
  129. req := ps.buildRequest()
  130. if req.RequiredAcks != WaitForAll {
  131. t.Error("RequiredAcks not set properly")
  132. }
  133. if req.Timeout != 10000 {
  134. t.Error("Timeout not set properly")
  135. }
  136. if len(req.records) != 2 {
  137. t.Error("Wrong number of topics in request")
  138. }
  139. }
  140. func TestProduceSetCompressedRequestBuilding(t *testing.T) {
  141. parent, ps := makeProduceSet()
  142. parent.conf.Producer.RequiredAcks = WaitForAll
  143. parent.conf.Producer.Timeout = 10 * time.Second
  144. parent.conf.Producer.Compression = CompressionGZIP
  145. parent.conf.Version = V0_10_0_0
  146. msg := &ProducerMessage{
  147. Topic: "t1",
  148. Partition: 0,
  149. Key: StringEncoder(TestMessage),
  150. Value: StringEncoder(TestMessage),
  151. Timestamp: time.Now(),
  152. }
  153. for i := 0; i < 10; i++ {
  154. safeAddMessage(t, ps, msg)
  155. }
  156. req := ps.buildRequest()
  157. if req.Version != 2 {
  158. t.Error("Wrong request version")
  159. }
  160. for _, msgBlock := range req.records["t1"][0].MsgSet.Messages {
  161. msg := msgBlock.Msg
  162. err := msg.decodeSet()
  163. if err != nil {
  164. t.Error("Failed to decode set from payload")
  165. }
  166. for i, compMsgBlock := range msg.Set.Messages {
  167. compMsg := compMsgBlock.Msg
  168. if compMsg.Version != 1 {
  169. t.Error("Wrong compressed message version")
  170. }
  171. if compMsgBlock.Offset != int64(i) {
  172. t.Errorf("Wrong relative inner offset, expected %d, got %d", i, compMsgBlock.Offset)
  173. }
  174. }
  175. if msg.Version != 1 {
  176. t.Error("Wrong compressed parent message version")
  177. }
  178. }
  179. }
  180. func TestProduceSetV3RequestBuilding(t *testing.T) {
  181. parent, ps := makeProduceSet()
  182. parent.conf.Producer.RequiredAcks = WaitForAll
  183. parent.conf.Producer.Timeout = 10 * time.Second
  184. parent.conf.Version = V0_11_0_0
  185. now := time.Now()
  186. msg := &ProducerMessage{
  187. Topic: "t1",
  188. Partition: 0,
  189. Key: StringEncoder(TestMessage),
  190. Value: StringEncoder(TestMessage),
  191. Headers: []RecordHeader{
  192. {
  193. Key: []byte("header-1"),
  194. Value: []byte("value-1"),
  195. },
  196. {
  197. Key: []byte("header-2"),
  198. Value: []byte("value-2"),
  199. },
  200. {
  201. Key: []byte("header-3"),
  202. Value: []byte("value-3"),
  203. },
  204. },
  205. Timestamp: now,
  206. }
  207. for i := 0; i < 10; i++ {
  208. safeAddMessage(t, ps, msg)
  209. msg.Timestamp = msg.Timestamp.Add(time.Second)
  210. }
  211. req := ps.buildRequest()
  212. if req.Version != 3 {
  213. t.Error("Wrong request version")
  214. }
  215. batch := req.records["t1"][0].RecordBatch
  216. if batch.FirstTimestamp != now.Truncate(time.Millisecond) {
  217. t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
  218. }
  219. for i := 0; i < 10; i++ {
  220. rec := batch.Records[i]
  221. if rec.TimestampDelta != time.Duration(i)*time.Second {
  222. t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta)
  223. }
  224. if rec.OffsetDelta != int64(i) {
  225. t.Errorf("Wrong relative inner offset, expected %d, got %d", i, rec.OffsetDelta)
  226. }
  227. for j, h := range batch.Records[i].Headers {
  228. exp := fmt.Sprintf("header-%d", j+1)
  229. if string(h.Key) != exp {
  230. t.Errorf("Wrong header key, expected %v, got %v", exp, h.Key)
  231. }
  232. exp = fmt.Sprintf("value-%d", j+1)
  233. if string(h.Value) != exp {
  234. t.Errorf("Wrong header value, expected %v, got %v", exp, h.Value)
  235. }
  236. }
  237. }
  238. }
  239. func TestProduceSetIdempotentRequestBuilding(t *testing.T) {
  240. const pID = 1000
  241. const pEpoch = 1234
  242. config := NewConfig()
  243. config.Producer.RequiredAcks = WaitForAll
  244. config.Producer.Idempotent = true
  245. config.Version = V0_11_0_0
  246. parent := &asyncProducer{
  247. conf: config,
  248. txnmgr: &transactionManager{
  249. producerID: pID,
  250. producerEpoch: pEpoch,
  251. },
  252. }
  253. ps := newProduceSet(parent)
  254. now := time.Now()
  255. msg := &ProducerMessage{
  256. Topic: "t1",
  257. Partition: 0,
  258. Key: StringEncoder(TestMessage),
  259. Value: StringEncoder(TestMessage),
  260. Headers: []RecordHeader{
  261. {
  262. Key: []byte("header-1"),
  263. Value: []byte("value-1"),
  264. },
  265. {
  266. Key: []byte("header-2"),
  267. Value: []byte("value-2"),
  268. },
  269. {
  270. Key: []byte("header-3"),
  271. Value: []byte("value-3"),
  272. },
  273. },
  274. Timestamp: now,
  275. sequenceNumber: 123,
  276. }
  277. for i := 0; i < 10; i++ {
  278. safeAddMessage(t, ps, msg)
  279. msg.Timestamp = msg.Timestamp.Add(time.Second)
  280. }
  281. req := ps.buildRequest()
  282. if req.Version != 3 {
  283. t.Error("Wrong request version")
  284. }
  285. batch := req.records["t1"][0].RecordBatch
  286. if batch.FirstTimestamp != now.Truncate(time.Millisecond) {
  287. t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
  288. }
  289. if batch.ProducerID != pID {
  290. t.Errorf("Wrong producerID: %v", batch.ProducerID)
  291. }
  292. if batch.ProducerEpoch != pEpoch {
  293. t.Errorf("Wrong producerEpoch: %v", batch.ProducerEpoch)
  294. }
  295. if batch.FirstSequence != 123 {
  296. t.Errorf("Wrong first sequence: %v", batch.FirstSequence)
  297. }
  298. for i := 0; i < 10; i++ {
  299. rec := batch.Records[i]
  300. if rec.TimestampDelta != time.Duration(i)*time.Second {
  301. t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta)
  302. }
  303. if rec.OffsetDelta != int64(i) {
  304. t.Errorf("Wrong relative inner offset, expected %d, got %d", i, rec.OffsetDelta)
  305. }
  306. for j, h := range batch.Records[i].Headers {
  307. exp := fmt.Sprintf("header-%d", j+1)
  308. if string(h.Key) != exp {
  309. t.Errorf("Wrong header key, expected %v, got %v", exp, h.Key)
  310. }
  311. exp = fmt.Sprintf("value-%d", j+1)
  312. if string(h.Value) != exp {
  313. t.Errorf("Wrong header value, expected %v, got %v", exp, h.Value)
  314. }
  315. }
  316. }
  317. }
  318. func TestProduceSetConsistentTimestamps(t *testing.T) {
  319. parent, ps1 := makeProduceSet()
  320. ps2 := newProduceSet(parent)
  321. parent.conf.Producer.RequiredAcks = WaitForAll
  322. parent.conf.Producer.Timeout = 10 * time.Second
  323. parent.conf.Version = V0_11_0_0
  324. msg1 := &ProducerMessage{
  325. Topic: "t1",
  326. Partition: 0,
  327. Key: StringEncoder(TestMessage),
  328. Value: StringEncoder(TestMessage),
  329. Timestamp: time.Unix(1555718400, 500000000),
  330. sequenceNumber: 123,
  331. }
  332. msg2 := &ProducerMessage{
  333. Topic: "t1",
  334. Partition: 0,
  335. Key: StringEncoder(TestMessage),
  336. Value: StringEncoder(TestMessage),
  337. Timestamp: time.Unix(1555718400, 500900000),
  338. sequenceNumber: 123,
  339. }
  340. msg3 := &ProducerMessage{
  341. Topic: "t1",
  342. Partition: 0,
  343. Key: StringEncoder(TestMessage),
  344. Value: StringEncoder(TestMessage),
  345. Timestamp: time.Unix(1555718400, 600000000),
  346. sequenceNumber: 123,
  347. }
  348. safeAddMessage(t, ps1, msg1)
  349. safeAddMessage(t, ps1, msg3)
  350. req1 := ps1.buildRequest()
  351. if req1.Version != 3 {
  352. t.Error("Wrong request version")
  353. }
  354. batch1 := req1.records["t1"][0].RecordBatch
  355. ft1 := batch1.FirstTimestamp.Unix()*1000 + int64(batch1.FirstTimestamp.Nanosecond()/1000000)
  356. time1 := ft1 + int64(batch1.Records[1].TimestampDelta/time.Millisecond)
  357. safeAddMessage(t, ps2, msg2)
  358. safeAddMessage(t, ps2, msg3)
  359. req2 := ps2.buildRequest()
  360. if req2.Version != 3 {
  361. t.Error("Wrong request version")
  362. }
  363. batch2 := req2.records["t1"][0].RecordBatch
  364. ft2 := batch2.FirstTimestamp.Unix()*1000 + int64(batch2.FirstTimestamp.Nanosecond()/1000000)
  365. time2 := ft2 + int64(batch2.Records[1].TimestampDelta/time.Millisecond)
  366. if time1 != time2 {
  367. t.Errorf("Message timestamps do not match: %v, %v", time1, time2)
  368. }
  369. }