offset_response_test.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package sarama
  2. import "testing"
  3. var (
  4. emptyOffsetResponse = []byte{
  5. 0x00, 0x00, 0x00, 0x00}
  6. normalOffsetResponse = []byte{
  7. 0x00, 0x00, 0x00, 0x02,
  8. 0x00, 0x01, 'a',
  9. 0x00, 0x00, 0x00, 0x00,
  10. 0x00, 0x01, 'z',
  11. 0x00, 0x00, 0x00, 0x01,
  12. 0x00, 0x00, 0x00, 0x02,
  13. 0x00, 0x00,
  14. 0x00, 0x00, 0x00, 0x02,
  15. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
  16. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06}
  17. normalOffsetResponseV1 = []byte{
  18. 0x00, 0x00, 0x00, 0x02,
  19. 0x00, 0x01, 'a',
  20. 0x00, 0x00, 0x00, 0x00,
  21. 0x00, 0x01, 'z',
  22. 0x00, 0x00, 0x00, 0x01,
  23. 0x00, 0x00, 0x00, 0x02,
  24. 0x00, 0x00,
  25. 0x00, 0x00, 0x01, 0x58, 0x1A, 0xE6, 0x48, 0x86,
  26. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06}
  27. )
  28. func TestEmptyOffsetResponse(t *testing.T) {
  29. response := OffsetResponse{}
  30. testVersionDecodable(t, "empty", &response, emptyOffsetResponse, 0)
  31. if len(response.Blocks) != 0 {
  32. t.Error("Decoding produced", len(response.Blocks), "topics where there were none.")
  33. }
  34. response = OffsetResponse{}
  35. testVersionDecodable(t, "empty", &response, emptyOffsetResponse, 1)
  36. if len(response.Blocks) != 0 {
  37. t.Error("Decoding produced", len(response.Blocks), "topics where there were none.")
  38. }
  39. }
  40. func TestNormalOffsetResponse(t *testing.T) {
  41. response := OffsetResponse{}
  42. testVersionDecodable(t, "normal", &response, normalOffsetResponse, 0)
  43. if len(response.Blocks) != 2 {
  44. t.Fatal("Decoding produced", len(response.Blocks), "topics where there were two.")
  45. }
  46. if len(response.Blocks["a"]) != 0 {
  47. t.Fatal("Decoding produced", len(response.Blocks["a"]), "partitions for topic 'a' where there were none.")
  48. }
  49. if len(response.Blocks["z"]) != 1 {
  50. t.Fatal("Decoding produced", len(response.Blocks["z"]), "partitions for topic 'z' where there was one.")
  51. }
  52. if response.Blocks["z"][2].Err != ErrNoError {
  53. t.Fatal("Decoding produced invalid error for topic z partition 2.")
  54. }
  55. if len(response.Blocks["z"][2].Offsets) != 2 {
  56. t.Fatal("Decoding produced invalid number of offsets for topic z partition 2.")
  57. }
  58. if response.Blocks["z"][2].Offsets[0] != 5 || response.Blocks["z"][2].Offsets[1] != 6 {
  59. t.Fatal("Decoding produced invalid offsets for topic z partition 2.")
  60. }
  61. }
  62. func TestNormalOffsetResponseV1(t *testing.T) {
  63. response := OffsetResponse{}
  64. testVersionDecodable(t, "normal", &response, normalOffsetResponseV1, 1)
  65. if len(response.Blocks) != 2 {
  66. t.Fatal("Decoding produced", len(response.Blocks), "topics where there were two.")
  67. }
  68. if len(response.Blocks["a"]) != 0 {
  69. t.Fatal("Decoding produced", len(response.Blocks["a"]), "partitions for topic 'a' where there were none.")
  70. }
  71. if len(response.Blocks["z"]) != 1 {
  72. t.Fatal("Decoding produced", len(response.Blocks["z"]), "partitions for topic 'z' where there was one.")
  73. }
  74. if response.Blocks["z"][2].Err != ErrNoError {
  75. t.Fatal("Decoding produced invalid error for topic z partition 2.")
  76. }
  77. if response.Blocks["z"][2].Timestamp != 1477920049286 {
  78. t.Fatal("Decoding produced invalid timestamp for topic z partition 2.", response.Blocks["z"][2].Timestamp)
  79. }
  80. if response.Blocks["z"][2].Offset != 6 {
  81. t.Fatal("Decoding produced invalid offsets for topic z partition 2.")
  82. }
  83. }