produce_response_test.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. )
  7. var (
  8. produceResponseNoBlocksV0 = []byte{
  9. 0x00, 0x00, 0x00, 0x00}
  10. produceResponseManyBlocksVersions = [][]byte{
  11. {
  12. 0x00, 0x00, 0x00, 0x01,
  13. 0x00, 0x03, 'f', 'o', 'o',
  14. 0x00, 0x00, 0x00, 0x01,
  15. 0x00, 0x00, 0x00, 0x01, // Partition 1
  16. 0x00, 0x02, // ErrInvalidMessage
  17. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
  18. }, {
  19. 0x00, 0x00, 0x00, 0x01,
  20. 0x00, 0x03, 'f', 'o', 'o',
  21. 0x00, 0x00, 0x00, 0x01,
  22. 0x00, 0x00, 0x00, 0x01, // Partition 1
  23. 0x00, 0x02, // ErrInvalidMessage
  24. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
  25. 0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
  26. }, {
  27. 0x00, 0x00, 0x00, 0x01,
  28. 0x00, 0x03, 'f', 'o', 'o',
  29. 0x00, 0x00, 0x00, 0x01,
  30. 0x00, 0x00, 0x00, 0x01, // Partition 1
  31. 0x00, 0x02, // ErrInvalidMessage
  32. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
  33. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)
  34. 0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
  35. },
  36. }
  37. )
  38. func TestProduceResponseDecode(t *testing.T) {
  39. response := ProduceResponse{}
  40. testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocksV0, 0)
  41. if len(response.Blocks) != 0 {
  42. t.Error("Decoding produced", len(response.Blocks), "topics where there were none")
  43. }
  44. for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {
  45. t.Logf("Decoding produceResponseManyBlocks version %d", v)
  46. testVersionDecodable(t, "many blocks", &response, produceResponseManyBlocks, int16(v))
  47. if len(response.Blocks) != 1 {
  48. t.Error("Decoding produced", len(response.Blocks), "topics where there was 1")
  49. }
  50. if len(response.Blocks["foo"]) != 1 {
  51. t.Error("Decoding produced", len(response.Blocks["foo"]), "partitions for 'foo' where there was one")
  52. }
  53. block := response.GetBlock("foo", 1)
  54. if block == nil {
  55. t.Error("Decoding did not produce a block for foo/1")
  56. } else {
  57. if block.Err != ErrInvalidMessage {
  58. t.Error("Decoding failed for foo/2/Err, got:", int16(block.Err))
  59. }
  60. if block.Offset != 255 {
  61. t.Error("Decoding failed for foo/1/Offset, got:", block.Offset)
  62. }
  63. if v >= 2 {
  64. if block.Timestamp != time.Unix(1, 0) {
  65. t.Error("Decoding failed for foo/2/Timestamp, got:", block.Timestamp)
  66. }
  67. }
  68. }
  69. if v >= 1 {
  70. if expected := 100 * time.Millisecond; response.ThrottleTime != expected {
  71. t.Error("Failed decoding produced throttle time, expected:", expected, ", got:", response.ThrottleTime)
  72. }
  73. }
  74. }
  75. }
  76. func TestProduceResponseEncode(t *testing.T) {
  77. response := ProduceResponse{}
  78. response.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
  79. testEncodable(t, "empty", &response, produceResponseNoBlocksV0)
  80. response.Blocks["foo"] = make(map[int32]*ProduceResponseBlock)
  81. response.Blocks["foo"][1] = &ProduceResponseBlock{
  82. Err: ErrInvalidMessage,
  83. Offset: 255,
  84. Timestamp: time.Unix(1, 0),
  85. }
  86. response.ThrottleTime = 100 * time.Millisecond
  87. for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {
  88. response.Version = int16(v)
  89. testEncodable(t, fmt.Sprintf("many blocks version %d", v), &response, produceResponseManyBlocks)
  90. }
  91. }
  92. func TestProduceResponseEncodeInvalidTimestamp(t *testing.T) {
  93. response := ProduceResponse{}
  94. response.Version = 2
  95. response.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
  96. response.Blocks["t"] = make(map[int32]*ProduceResponseBlock)
  97. response.Blocks["t"][0] = &ProduceResponseBlock{
  98. Err: ErrNoError,
  99. Offset: 0,
  100. // Use a timestamp before Unix time
  101. Timestamp: time.Unix(0, 0).Add(-1 * time.Millisecond),
  102. }
  103. response.ThrottleTime = 100 * time.Millisecond
  104. _, err := encode(&response, nil)
  105. if err == nil {
  106. t.Error("Expecting error, got nil")
  107. }
  108. if _, ok := err.(PacketEncodingError); !ok {
  109. t.Error("Expecting PacketEncodingError, got:", err)
  110. }
  111. }