produce_request_test.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package sarama
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. var (
  7. produceRequestEmpty = []byte{
  8. 0x00, 0x00,
  9. 0x00, 0x00, 0x00, 0x00,
  10. 0x00, 0x00, 0x00, 0x00}
  11. produceRequestHeader = []byte{
  12. 0x01, 0x23,
  13. 0x00, 0x00, 0x04, 0x44,
  14. 0x00, 0x00, 0x00, 0x00}
  15. produceRequestOneMessage = []byte{
  16. 0x01, 0x23,
  17. 0x00, 0x00, 0x04, 0x44,
  18. 0x00, 0x00, 0x00, 0x01,
  19. 0x00, 0x05, 't', 'o', 'p', 'i', 'c',
  20. 0x00, 0x00, 0x00, 0x01,
  21. 0x00, 0x00, 0x00, 0xAD,
  22. 0x00, 0x00, 0x00, 0x1C,
  23. // messageSet
  24. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  25. 0x00, 0x00, 0x00, 0x10,
  26. // message
  27. 0x23, 0x96, 0x4a, 0xf7, // CRC
  28. 0x00,
  29. 0x00,
  30. 0xFF, 0xFF, 0xFF, 0xFF,
  31. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
  32. produceRequestOneRecord = []byte{
  33. 0xFF, 0xFF, // Transaction ID
  34. 0x01, 0x23, // Required Acks
  35. 0x00, 0x00, 0x04, 0x44, // Timeout
  36. 0x00, 0x00, 0x00, 0x01, // Number of Topics
  37. 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
  38. 0x00, 0x00, 0x00, 0x01, // Number of Partitions
  39. 0x00, 0x00, 0x00, 0xAD, // Partition
  40. 0x00, 0x00, 0x00, 0x52, // Records length
  41. // recordBatch
  42. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  43. 0x00, 0x00, 0x00, 0x46,
  44. 0x00, 0x00, 0x00, 0x00,
  45. 0x02,
  46. 0x54, 0x79, 0x61, 0xFD,
  47. 0x00, 0x00,
  48. 0x00, 0x00, 0x00, 0x00,
  49. 0x00, 0x00, 0x01, 0x58, 0x8D, 0xCD, 0x59, 0x38,
  50. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  51. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  52. 0x00, 0x00,
  53. 0x00, 0x00, 0x00, 0x00,
  54. 0x00, 0x00, 0x00, 0x01,
  55. // record
  56. 0x28,
  57. 0x00,
  58. 0x0A,
  59. 0x00,
  60. 0x08, 0x01, 0x02, 0x03, 0x04,
  61. 0x06, 0x05, 0x06, 0x07,
  62. 0x02,
  63. 0x06, 0x08, 0x09, 0x0A,
  64. 0x04, 0x0B, 0x0C,
  65. }
  66. )
  67. func TestProduceRequest(t *testing.T) {
  68. request := new(ProduceRequest)
  69. testRequest(t, "empty", request, produceRequestEmpty)
  70. request.RequiredAcks = 0x123
  71. request.Timeout = 0x444
  72. testRequest(t, "header", request, produceRequestHeader)
  73. request.AddMessage("topic", 0xAD, &Message{Codec: CompressionNone, Key: nil, Value: []byte{0x00, 0xEE}})
  74. testRequest(t, "one message", request, produceRequestOneMessage)
  75. request.Version = 3
  76. batch := &RecordBatch{
  77. Version: 2,
  78. FirstTimestamp: time.Unix(1479847795, 0),
  79. MaxTimestamp: time.Unix(0, 0),
  80. Records: []*Record{{
  81. TimestampDelta: 5 * time.Millisecond,
  82. Key: []byte{0x01, 0x02, 0x03, 0x04},
  83. Value: []byte{0x05, 0x06, 0x07},
  84. Headers: []*RecordHeader{{
  85. Key: []byte{0x08, 0x09, 0x0A},
  86. Value: []byte{0x0B, 0x0C},
  87. }},
  88. }},
  89. }
  90. request.AddBatch("topic", 0xAD, batch)
  91. packet := testRequestEncode(t, "one record", request, produceRequestOneRecord)
  92. batch.Records[0].length.startOffset = 0
  93. testRequestDecode(t, "one record", request, packet)
  94. }