produce_response_test.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. )
  7. var (
  8. produceResponseNoBlocksV0 = []byte{
  9. 0x00, 0x00, 0x00, 0x00}
  10. produceResponseManyBlocksVersions = map[int][]byte{
  11. 0: {
  12. 0x00, 0x00, 0x00, 0x01,
  13. 0x00, 0x03, 'f', 'o', 'o',
  14. 0x00, 0x00, 0x00, 0x01,
  15. 0x00, 0x00, 0x00, 0x01, // Partition 1
  16. 0x00, 0x02, // ErrInvalidMessage
  17. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
  18. },
  19. 1: {
  20. 0x00, 0x00, 0x00, 0x01,
  21. 0x00, 0x03, 'f', 'o', 'o',
  22. 0x00, 0x00, 0x00, 0x01,
  23. 0x00, 0x00, 0x00, 0x01, // Partition 1
  24. 0x00, 0x02, // ErrInvalidMessage
  25. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
  26. 0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
  27. },
  28. 2: {
  29. 0x00, 0x00, 0x00, 0x01,
  30. 0x00, 0x03, 'f', 'o', 'o',
  31. 0x00, 0x00, 0x00, 0x01,
  32. 0x00, 0x00, 0x00, 0x01, // Partition 1
  33. 0x00, 0x02, // ErrInvalidMessage
  34. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
  35. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)
  36. 0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
  37. },
  38. 7: { // version 7 adds StartOffset
  39. 0x00, 0x00, 0x00, 0x01,
  40. 0x00, 0x03, 'f', 'o', 'o',
  41. 0x00, 0x00, 0x00, 0x01,
  42. 0x00, 0x00, 0x00, 0x01, // Partition 1
  43. 0x00, 0x02, // ErrInvalidMessage
  44. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
  45. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)
  46. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x32, // StartOffset 50
  47. 0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
  48. },
  49. }
  50. )
  51. func TestProduceResponseDecode(t *testing.T) {
  52. response := ProduceResponse{}
  53. testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocksV0, 0)
  54. if len(response.Blocks) != 0 {
  55. t.Error("Decoding produced", len(response.Blocks), "topics where there were none")
  56. }
  57. for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {
  58. t.Logf("Decoding produceResponseManyBlocks version %d", v)
  59. testVersionDecodable(t, "many blocks", &response, produceResponseManyBlocks, int16(v))
  60. if len(response.Blocks) != 1 {
  61. t.Error("Decoding produced", len(response.Blocks), "topics where there was 1")
  62. }
  63. if len(response.Blocks["foo"]) != 1 {
  64. t.Error("Decoding produced", len(response.Blocks["foo"]), "partitions for 'foo' where there was one")
  65. }
  66. block := response.GetBlock("foo", 1)
  67. if block == nil {
  68. t.Error("Decoding did not produce a block for foo/1")
  69. } else {
  70. if block.Err != ErrInvalidMessage {
  71. t.Error("Decoding failed for foo/1/Err, got:", int16(block.Err))
  72. }
  73. if block.Offset != 255 {
  74. t.Error("Decoding failed for foo/1/Offset, got:", block.Offset)
  75. }
  76. if v >= 2 {
  77. if block.Timestamp != time.Unix(1, 0) {
  78. t.Error("Decoding failed for foo/1/Timestamp, got:", block.Timestamp)
  79. }
  80. }
  81. if v >= 7 {
  82. if block.StartOffset != 50 {
  83. t.Error("Decoding failed for foo/1/StartOffset, got:", block.StartOffset)
  84. }
  85. }
  86. }
  87. if v >= 1 {
  88. if expected := 100 * time.Millisecond; response.ThrottleTime != expected {
  89. t.Error("Failed decoding produced throttle time, expected:", expected, ", got:", response.ThrottleTime)
  90. }
  91. }
  92. }
  93. }
  94. func TestProduceResponseEncode(t *testing.T) {
  95. response := ProduceResponse{}
  96. response.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
  97. testEncodable(t, "empty", &response, produceResponseNoBlocksV0)
  98. response.Blocks["foo"] = make(map[int32]*ProduceResponseBlock)
  99. response.Blocks["foo"][1] = &ProduceResponseBlock{
  100. Err: ErrInvalidMessage,
  101. Offset: 255,
  102. Timestamp: time.Unix(1, 0),
  103. StartOffset: 50,
  104. }
  105. response.ThrottleTime = 100 * time.Millisecond
  106. for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {
  107. response.Version = int16(v)
  108. testEncodable(t, fmt.Sprintf("many blocks version %d", v), &response, produceResponseManyBlocks)
  109. }
  110. }
  111. func TestProduceResponseEncodeInvalidTimestamp(t *testing.T) {
  112. response := ProduceResponse{}
  113. response.Version = 2
  114. response.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
  115. response.Blocks["t"] = make(map[int32]*ProduceResponseBlock)
  116. response.Blocks["t"][0] = &ProduceResponseBlock{
  117. Err: ErrNoError,
  118. Offset: 0,
  119. // Use a timestamp before Unix time
  120. Timestamp: time.Unix(0, 0).Add(-1 * time.Millisecond),
  121. }
  122. response.ThrottleTime = 100 * time.Millisecond
  123. _, err := encode(&response, nil)
  124. if err == nil {
  125. t.Error("Expecting error, got nil")
  126. }
  127. if _, ok := err.(PacketEncodingError); !ok {
  128. t.Error("Expecting PacketEncodingError, got:", err)
  129. }
  130. }