offset_fetch_response_test.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. )
  6. var (
  7. emptyOffsetFetchResponse = []byte{
  8. 0x00, 0x00, 0x00, 0x00}
  9. emptyOffsetFetchResponseV2 = []byte{
  10. 0x00, 0x00, 0x00, 0x00,
  11. 0x00, 0x2A}
  12. emptyOffsetFetchResponseV3 = []byte{
  13. 0x00, 0x00, 0x00, 0x09,
  14. 0x00, 0x00, 0x00, 0x00,
  15. 0x00, 0x2A}
  16. )
  17. func TestEmptyOffsetFetchResponse(t *testing.T) {
  18. for version := 0; version <= 1; version++ {
  19. response := OffsetFetchResponse{Version: int16(version)}
  20. testResponse(t, fmt.Sprintf("empty v%d", version), &response, emptyOffsetFetchResponse)
  21. }
  22. responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
  23. testResponse(t, "empty V2", &responseV2, emptyOffsetFetchResponseV2)
  24. for version := 3; version <= 5; version++ {
  25. responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9}
  26. testResponse(t, fmt.Sprintf("empty v%d", version), &responseV3, emptyOffsetFetchResponseV3)
  27. }
  28. }
  29. func TestNormalOffsetFetchResponse(t *testing.T) {
  30. // The response encoded form cannot be checked for it varies due to
  31. // unpredictable map traversal order.
  32. // Hence the 'nil' as byte[] parameter in the 'testResponse(..)' calls
  33. for version := 0; version <= 1; version++ {
  34. response := OffsetFetchResponse{Version: int16(version)}
  35. response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
  36. response.Blocks["m"] = nil
  37. testResponse(t, fmt.Sprintf("Normal v%d", version), &response, nil)
  38. }
  39. responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
  40. responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
  41. responseV2.Blocks["m"] = nil
  42. testResponse(t, "normal V2", &responseV2, nil)
  43. for version := 3; version <= 4; version++ {
  44. responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9}
  45. responseV3.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
  46. responseV3.Blocks["m"] = nil
  47. testResponse(t, fmt.Sprintf("Normal v%d", version), &responseV3, nil)
  48. }
  49. responseV5 := OffsetFetchResponse{Version: 5, Err: ErrInvalidRequest, ThrottleTimeMs: 9}
  50. responseV5.AddBlock("t", 0, &OffsetFetchResponseBlock{Offset: 10, LeaderEpoch: 100, Metadata: "md", Err: ErrRequestTimedOut})
  51. responseV5.Blocks["m"] = nil
  52. testResponse(t, "normal V5", &responseV5, nil)
  53. }