offset_manager_test.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. package sarama
  2. import (
  3. "log"
  4. "testing"
  5. "time"
  6. )
  7. var (
  8. broker, coordinator *mockBroker
  9. testClient Client
  10. config *Config
  11. )
  12. func initOM(t *testing.T) OffsetManager {
  13. config = NewConfig()
  14. config.Metadata.Retry.Max = 1
  15. config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
  16. broker = newMockBroker(t, 1)
  17. coordinator = newMockBroker(t, 2)
  18. seedMeta := new(MetadataResponse)
  19. seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
  20. seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
  21. seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
  22. broker.Returns(seedMeta)
  23. var err error
  24. testClient, err = NewClient([]string{broker.Addr()}, config)
  25. if err != nil {
  26. t.Fatal(err)
  27. }
  28. broker.Returns(&ConsumerMetadataResponse{
  29. CoordinatorID: coordinator.BrokerID(),
  30. CoordinatorHost: "127.0.0.1",
  31. CoordinatorPort: coordinator.Port(),
  32. })
  33. om, err := NewOffsetManagerFromClient("group", testClient)
  34. if err != nil {
  35. t.Fatal(err)
  36. }
  37. return om
  38. }
  39. func initPOM(t *testing.T) PartitionOffsetManager {
  40. om := initOM(t)
  41. fetchResponse := new(OffsetFetchResponse)
  42. fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
  43. Err: ErrNoError,
  44. Offset: 5,
  45. Metadata: "test_meta",
  46. })
  47. coordinator.Returns(fetchResponse)
  48. pom, err := om.ManagePartition("my_topic", 0)
  49. if err != nil {
  50. t.Fatal(err)
  51. }
  52. return pom
  53. }
  54. func TestNewOffsetManager(t *testing.T) {
  55. seedBroker := newMockBroker(t, 1)
  56. seedBroker.Returns(new(MetadataResponse))
  57. testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
  58. if err != nil {
  59. t.Fatal(err)
  60. }
  61. _, err = NewOffsetManagerFromClient("grouop", testClient)
  62. if err != nil {
  63. t.Error(err)
  64. }
  65. testClient.Close()
  66. _, err = NewOffsetManagerFromClient("group", testClient)
  67. if err != ErrClosedClient {
  68. t.Errorf("Error expected for closed client; actual value: %v", err)
  69. }
  70. seedBroker.Close()
  71. }
  72. // Test recovery from ErrNotCoordinatorForConsumer
  73. // on first fetchInitialOffset call
  74. func TestFetchInitialFail(t *testing.T) {
  75. om := initOM(t)
  76. // Error on first fetchInitialOffset call
  77. responseBlock := OffsetFetchResponseBlock{
  78. Err: ErrNotCoordinatorForConsumer,
  79. Offset: 5,
  80. Metadata: "test_meta",
  81. }
  82. fetchResponse := new(OffsetFetchResponse)
  83. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  84. coordinator.Returns(fetchResponse)
  85. // Refresh coordinator
  86. newCoordinator := newMockBroker(t, 3)
  87. refreshResponse := new(ConsumerMetadataResponse)
  88. refreshResponse.CoordinatorID = newCoordinator.BrokerID()
  89. refreshResponse.CoordinatorHost = "127.0.0.1"
  90. refreshResponse.CoordinatorPort = newCoordinator.Port()
  91. broker.Returns(refreshResponse)
  92. // Second fetchInitialOffset call is fine
  93. fetchResponse2 := new(OffsetFetchResponse)
  94. responseBlock2 := responseBlock
  95. responseBlock2.Err = ErrNoError
  96. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  97. newCoordinator.Returns(fetchResponse2)
  98. if _, err := om.ManagePartition("my_topic", 0); err != nil {
  99. t.Error(err)
  100. }
  101. }
  102. // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
  103. func TestFetchInitialInProgress(t *testing.T) {
  104. om := initOM(t)
  105. // Remove once fix is merged in
  106. config.Consumer.Offsets.CommitInterval = 10 * time.Second
  107. // Error on first fetchInitialOffset call
  108. responseBlock := OffsetFetchResponseBlock{
  109. Err: ErrOffsetsLoadInProgress,
  110. Offset: 5,
  111. Metadata: "test_meta",
  112. }
  113. fetchResponse := new(OffsetFetchResponse)
  114. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  115. coordinator.Returns(fetchResponse)
  116. // Second fetchInitialOffset call is fine
  117. fetchResponse2 := new(OffsetFetchResponse)
  118. responseBlock2 := responseBlock
  119. responseBlock2.Err = ErrNoError
  120. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  121. coordinator.Returns(fetchResponse2)
  122. if _, err := om.ManagePartition("my_topic", 0); err != nil {
  123. t.Error(err)
  124. }
  125. }
  126. func TestOffset(t *testing.T) {
  127. pom := initPOM(t)
  128. offset, meta := pom.Offset()
  129. if offset != 5 {
  130. t.Errorf("Expected offset 5. Actual: %v", offset)
  131. }
  132. if meta != "test_meta" {
  133. t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
  134. }
  135. pom.Close()
  136. testClient.Close()
  137. broker.Close()
  138. coordinator.Close()
  139. }
  140. func TestSetOffset(t *testing.T) {
  141. pom := initPOM(t)
  142. ocResponse := new(OffsetCommitResponse)
  143. ocResponse.AddError("my_topic", 0, ErrNoError)
  144. coordinator.Returns(ocResponse)
  145. pom.SetOffset(100, "modified_meta")
  146. offset, meta := pom.Offset()
  147. if offset != 100 {
  148. t.Errorf("Expected offset 100. Actual: %v", offset)
  149. }
  150. if meta != "modified_meta" {
  151. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  152. }
  153. pom.Close()
  154. testClient.Close()
  155. broker.Close()
  156. coordinator.Close()
  157. }
  158. func TestCommitErr(t *testing.T) {
  159. log.Println("TestCommitErr")
  160. log.Println("=============")
  161. pom := initPOM(t)
  162. ocResponse := new(OffsetCommitResponse)
  163. ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  164. ocResponse.AddError("my_topic", 1, ErrNoError)
  165. coordinator.Returns(ocResponse)
  166. // ocResponse2 := new(OffsetCommitResponse)
  167. // // ocResponse.Errors
  168. // ocResponse2.AddError("my_topic", 1, ErrNoError)
  169. // coordinator.Returns(ocResponse2)
  170. freshCoordinator := newMockBroker(t, 3)
  171. // For RefreshCoordinator()
  172. coordinatorResponse3 := new(ConsumerMetadataResponse)
  173. coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID()
  174. coordinatorResponse3.CoordinatorHost = "127.0.0.1"
  175. coordinatorResponse3.CoordinatorPort = freshCoordinator.Port()
  176. broker.Returns(coordinatorResponse3)
  177. ocResponse2 := new(OffsetCommitResponse)
  178. // ocResponse.Errors
  179. // ocResponse2.AddError("my_topic", 0, ErrOffsetOutOfRange)
  180. ocResponse2.AddError("my_topic", 0, ErrNoError)
  181. freshCoordinator.Returns(ocResponse2)
  182. pom.SetOffset(100, "modified_meta")
  183. err := pom.Close()
  184. if err != nil {
  185. t.Error(err)
  186. }
  187. }