produce_request_test.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package sarama
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. var (
  7. produceRequestEmpty = []byte{
  8. 0x00, 0x00,
  9. 0x00, 0x00, 0x00, 0x00,
  10. 0x00, 0x00, 0x00, 0x00}
  11. produceRequestHeader = []byte{
  12. 0x01, 0x23,
  13. 0x00, 0x00, 0x04, 0x44,
  14. 0x00, 0x00, 0x00, 0x00}
  15. produceRequestOneMessage = []byte{
  16. 0x01, 0x23,
  17. 0x00, 0x00, 0x04, 0x44,
  18. 0x00, 0x00, 0x00, 0x01,
  19. 0x00, 0x05, 't', 'o', 'p', 'i', 'c',
  20. 0x00, 0x00, 0x00, 0x01,
  21. 0x00, 0x00, 0x00, 0xAD,
  22. 0x00, 0x00, 0x00, 0x1C,
  23. // messageSet
  24. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  25. 0x00, 0x00, 0x00, 0x10,
  26. // message
  27. 0x23, 0x96, 0x4a, 0xf7, // CRC
  28. 0x00,
  29. 0x00,
  30. 0xFF, 0xFF, 0xFF, 0xFF,
  31. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
  32. produceRequestOneRecord = []byte{
  33. 0xFF, 0xFF, // Transaction ID
  34. 0x01, 0x23, // Required Acks
  35. 0x00, 0x00, 0x04, 0x44, // Timeout
  36. 0x00, 0x00, 0x00, 0x01, // Number of Topics
  37. 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
  38. 0x00, 0x00, 0x00, 0x01, // Number of Partitions
  39. 0x00, 0x00, 0x00, 0xAD, // Partition
  40. 0x00, 0x00, 0x00, 0x52, // Records length
  41. // recordBatch
  42. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  43. 0x00, 0x00, 0x00, 0x46,
  44. 0x00, 0x00, 0x00, 0x00,
  45. 0x02,
  46. 0xCA, 0x33, 0xBC, 0x05,
  47. 0x00, 0x00,
  48. 0x00, 0x00, 0x00, 0x01,
  49. 0x00, 0x00, 0x01, 0x58, 0x8D, 0xCD, 0x59, 0x38,
  50. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  51. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  52. 0x00, 0x00,
  53. 0x00, 0x00, 0x00, 0x00,
  54. 0x00, 0x00, 0x00, 0x01,
  55. // record
  56. 0x28,
  57. 0x00,
  58. 0x0A,
  59. 0x00,
  60. 0x08, 0x01, 0x02, 0x03, 0x04,
  61. 0x06, 0x05, 0x06, 0x07,
  62. 0x02,
  63. 0x06, 0x08, 0x09, 0x0A,
  64. 0x04, 0x0B, 0x0C,
  65. }
  66. )
  67. func TestProduceRequest(t *testing.T) {
  68. request := new(ProduceRequest)
  69. testRequest(t, "empty", request, produceRequestEmpty)
  70. request.RequiredAcks = 0x123
  71. request.Timeout = 0x444
  72. testRequest(t, "header", request, produceRequestHeader)
  73. request.AddMessage("topic", 0xAD, &Message{Codec: CompressionNone, Key: nil, Value: []byte{0x00, 0xEE}})
  74. testRequest(t, "one message", request, produceRequestOneMessage)
  75. request.Version = 3
  76. batch := &RecordBatch{
  77. LastOffsetDelta: 1,
  78. Version: 2,
  79. FirstTimestamp: time.Unix(1479847795, 0),
  80. MaxTimestamp: time.Unix(0, 0),
  81. Records: []*Record{{
  82. TimestampDelta: 5 * time.Millisecond,
  83. Key: []byte{0x01, 0x02, 0x03, 0x04},
  84. Value: []byte{0x05, 0x06, 0x07},
  85. Headers: []*RecordHeader{{
  86. Key: []byte{0x08, 0x09, 0x0A},
  87. Value: []byte{0x0B, 0x0C},
  88. }},
  89. }},
  90. }
  91. request.AddBatch("topic", 0xAD, batch)
  92. packet := testRequestEncode(t, "one record", request, produceRequestOneRecord)
  93. // compressRecords field is not populated on decoding because consumers
  94. // are only interested in decoded records.
  95. batch.compressedRecords = nil
  96. testRequestDecode(t, "one record", request, packet)
  97. }