offset_manager_test.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. package sarama
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. func initOffsetManager(t *testing.T) (om OffsetManager,
  7. testClient Client, broker, coordinator *MockBroker) {
  8. config := NewConfig()
  9. config.Metadata.Retry.Max = 1
  10. config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
  11. config.Version = V0_9_0_0
  12. broker = NewMockBroker(t, 1)
  13. coordinator = NewMockBroker(t, 2)
  14. seedMeta := new(MetadataResponse)
  15. seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
  16. seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
  17. seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
  18. broker.Returns(seedMeta)
  19. var err error
  20. testClient, err = NewClient([]string{broker.Addr()}, config)
  21. if err != nil {
  22. t.Fatal(err)
  23. }
  24. broker.Returns(&ConsumerMetadataResponse{
  25. CoordinatorID: coordinator.BrokerID(),
  26. CoordinatorHost: "127.0.0.1",
  27. CoordinatorPort: coordinator.Port(),
  28. })
  29. om, err = NewOffsetManagerFromClient("group", testClient)
  30. if err != nil {
  31. t.Fatal(err)
  32. }
  33. return om, testClient, broker, coordinator
  34. }
  35. func initPartitionOffsetManager(t *testing.T, om OffsetManager,
  36. coordinator *MockBroker, initialOffset int64, metadata string) PartitionOffsetManager {
  37. fetchResponse := new(OffsetFetchResponse)
  38. fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
  39. Err: ErrNoError,
  40. Offset: initialOffset,
  41. Metadata: metadata,
  42. })
  43. coordinator.Returns(fetchResponse)
  44. pom, err := om.ManagePartition("my_topic", 0)
  45. if err != nil {
  46. t.Fatal(err)
  47. }
  48. return pom
  49. }
  50. func TestNewOffsetManager(t *testing.T) {
  51. seedBroker := NewMockBroker(t, 1)
  52. seedBroker.Returns(new(MetadataResponse))
  53. testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
  54. if err != nil {
  55. t.Fatal(err)
  56. }
  57. _, err = NewOffsetManagerFromClient("group", testClient)
  58. if err != nil {
  59. t.Error(err)
  60. }
  61. safeClose(t, testClient)
  62. _, err = NewOffsetManagerFromClient("group", testClient)
  63. if err != ErrClosedClient {
  64. t.Errorf("Error expected for closed client; actual value: %v", err)
  65. }
  66. seedBroker.Close()
  67. }
  68. // Test recovery from ErrNotCoordinatorForConsumer
  69. // on first fetchInitialOffset call
  70. func TestOffsetManagerFetchInitialFail(t *testing.T) {
  71. om, testClient, broker, coordinator := initOffsetManager(t)
  72. // Error on first fetchInitialOffset call
  73. responseBlock := OffsetFetchResponseBlock{
  74. Err: ErrNotCoordinatorForConsumer,
  75. Offset: 5,
  76. Metadata: "test_meta",
  77. }
  78. fetchResponse := new(OffsetFetchResponse)
  79. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  80. coordinator.Returns(fetchResponse)
  81. // Refresh coordinator
  82. newCoordinator := NewMockBroker(t, 3)
  83. broker.Returns(&ConsumerMetadataResponse{
  84. CoordinatorID: newCoordinator.BrokerID(),
  85. CoordinatorHost: "127.0.0.1",
  86. CoordinatorPort: newCoordinator.Port(),
  87. })
  88. // Second fetchInitialOffset call is fine
  89. fetchResponse2 := new(OffsetFetchResponse)
  90. responseBlock2 := responseBlock
  91. responseBlock2.Err = ErrNoError
  92. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  93. newCoordinator.Returns(fetchResponse2)
  94. pom, err := om.ManagePartition("my_topic", 0)
  95. if err != nil {
  96. t.Error(err)
  97. }
  98. broker.Close()
  99. coordinator.Close()
  100. newCoordinator.Close()
  101. safeClose(t, pom)
  102. safeClose(t, om)
  103. safeClose(t, testClient)
  104. }
  105. // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
  106. func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
  107. om, testClient, broker, coordinator := initOffsetManager(t)
  108. // Error on first fetchInitialOffset call
  109. responseBlock := OffsetFetchResponseBlock{
  110. Err: ErrOffsetsLoadInProgress,
  111. Offset: 5,
  112. Metadata: "test_meta",
  113. }
  114. fetchResponse := new(OffsetFetchResponse)
  115. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  116. coordinator.Returns(fetchResponse)
  117. // Second fetchInitialOffset call is fine
  118. fetchResponse2 := new(OffsetFetchResponse)
  119. responseBlock2 := responseBlock
  120. responseBlock2.Err = ErrNoError
  121. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  122. coordinator.Returns(fetchResponse2)
  123. pom, err := om.ManagePartition("my_topic", 0)
  124. if err != nil {
  125. t.Error(err)
  126. }
  127. broker.Close()
  128. coordinator.Close()
  129. safeClose(t, pom)
  130. safeClose(t, om)
  131. safeClose(t, testClient)
  132. }
  133. func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
  134. om, testClient, broker, coordinator := initOffsetManager(t)
  135. testClient.Config().Consumer.Offsets.Initial = OffsetOldest
  136. // Kafka returns -1 if no offset has been stored for this partition yet.
  137. pom := initPartitionOffsetManager(t, om, coordinator, -1, "")
  138. offset, meta := pom.NextOffset()
  139. if offset != OffsetOldest {
  140. t.Errorf("Expected offset 5. Actual: %v", offset)
  141. }
  142. if meta != "" {
  143. t.Errorf("Expected metadata to be empty. Actual: %q", meta)
  144. }
  145. safeClose(t, pom)
  146. safeClose(t, om)
  147. broker.Close()
  148. coordinator.Close()
  149. safeClose(t, testClient)
  150. }
  151. func TestPartitionOffsetManagerNextOffset(t *testing.T) {
  152. om, testClient, broker, coordinator := initOffsetManager(t)
  153. pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
  154. offset, meta := pom.NextOffset()
  155. if offset != 5 {
  156. t.Errorf("Expected offset 5. Actual: %v", offset)
  157. }
  158. if meta != "test_meta" {
  159. t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
  160. }
  161. safeClose(t, pom)
  162. safeClose(t, om)
  163. broker.Close()
  164. coordinator.Close()
  165. safeClose(t, testClient)
  166. }
  167. func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
  168. om, testClient, broker, coordinator := initOffsetManager(t)
  169. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  170. ocResponse := new(OffsetCommitResponse)
  171. ocResponse.AddError("my_topic", 0, ErrNoError)
  172. coordinator.Returns(ocResponse)
  173. pom.MarkOffset(100, "modified_meta")
  174. offset, meta := pom.NextOffset()
  175. if offset != 100 {
  176. t.Errorf("Expected offset 100. Actual: %v", offset)
  177. }
  178. if meta != "modified_meta" {
  179. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  180. }
  181. safeClose(t, pom)
  182. safeClose(t, om)
  183. safeClose(t, testClient)
  184. broker.Close()
  185. coordinator.Close()
  186. }
  187. func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
  188. om, testClient, broker, coordinator := initOffsetManager(t)
  189. testClient.Config().Consumer.Offsets.Retention = time.Hour
  190. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  191. ocResponse := new(OffsetCommitResponse)
  192. ocResponse.AddError("my_topic", 0, ErrNoError)
  193. handler := func(req *request) (res encoder) {
  194. if req.body.version() != 2 {
  195. t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
  196. }
  197. offsetCommitRequest := req.body.(*OffsetCommitRequest)
  198. if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
  199. t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
  200. }
  201. return ocResponse
  202. }
  203. coordinator.setHandler(handler)
  204. pom.MarkOffset(100, "modified_meta")
  205. offset, meta := pom.NextOffset()
  206. if offset != 100 {
  207. t.Errorf("Expected offset 100. Actual: %v", offset)
  208. }
  209. if meta != "modified_meta" {
  210. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  211. }
  212. safeClose(t, pom)
  213. safeClose(t, om)
  214. safeClose(t, testClient)
  215. broker.Close()
  216. coordinator.Close()
  217. }
  218. func TestPartitionOffsetManagerCommitErr(t *testing.T) {
  219. om, testClient, broker, coordinator := initOffsetManager(t)
  220. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  221. // Error on one partition
  222. ocResponse := new(OffsetCommitResponse)
  223. ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  224. ocResponse.AddError("my_topic", 1, ErrNoError)
  225. coordinator.Returns(ocResponse)
  226. newCoordinator := NewMockBroker(t, 3)
  227. // For RefreshCoordinator()
  228. broker.Returns(&ConsumerMetadataResponse{
  229. CoordinatorID: newCoordinator.BrokerID(),
  230. CoordinatorHost: "127.0.0.1",
  231. CoordinatorPort: newCoordinator.Port(),
  232. })
  233. // Nothing in response.Errors at all
  234. ocResponse2 := new(OffsetCommitResponse)
  235. newCoordinator.Returns(ocResponse2)
  236. // For RefreshCoordinator()
  237. broker.Returns(&ConsumerMetadataResponse{
  238. CoordinatorID: newCoordinator.BrokerID(),
  239. CoordinatorHost: "127.0.0.1",
  240. CoordinatorPort: newCoordinator.Port(),
  241. })
  242. // Error on the wrong partition for this pom
  243. ocResponse3 := new(OffsetCommitResponse)
  244. ocResponse3.AddError("my_topic", 1, ErrNoError)
  245. newCoordinator.Returns(ocResponse3)
  246. // For RefreshCoordinator()
  247. broker.Returns(&ConsumerMetadataResponse{
  248. CoordinatorID: newCoordinator.BrokerID(),
  249. CoordinatorHost: "127.0.0.1",
  250. CoordinatorPort: newCoordinator.Port(),
  251. })
  252. // ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block
  253. ocResponse4 := new(OffsetCommitResponse)
  254. ocResponse4.AddError("my_topic", 0, ErrUnknownTopicOrPartition)
  255. newCoordinator.Returns(ocResponse4)
  256. // For RefreshCoordinator()
  257. broker.Returns(&ConsumerMetadataResponse{
  258. CoordinatorID: newCoordinator.BrokerID(),
  259. CoordinatorHost: "127.0.0.1",
  260. CoordinatorPort: newCoordinator.Port(),
  261. })
  262. // Normal error response
  263. ocResponse5 := new(OffsetCommitResponse)
  264. ocResponse5.AddError("my_topic", 0, ErrNoError)
  265. newCoordinator.Returns(ocResponse5)
  266. pom.MarkOffset(100, "modified_meta")
  267. err := pom.Close()
  268. if err != nil {
  269. t.Error(err)
  270. }
  271. broker.Close()
  272. coordinator.Close()
  273. newCoordinator.Close()
  274. safeClose(t, om)
  275. safeClose(t, testClient)
  276. }
  277. // Test of recovery from abort
  278. func TestAbortPartitionOffsetManager(t *testing.T) {
  279. om, testClient, broker, coordinator := initOffsetManager(t)
  280. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  281. // this triggers an error in the CommitOffset request,
  282. // which leads to the abort call
  283. coordinator.Close()
  284. // Response to refresh coordinator request
  285. newCoordinator := NewMockBroker(t, 3)
  286. broker.Returns(&ConsumerMetadataResponse{
  287. CoordinatorID: newCoordinator.BrokerID(),
  288. CoordinatorHost: "127.0.0.1",
  289. CoordinatorPort: newCoordinator.Port(),
  290. })
  291. ocResponse := new(OffsetCommitResponse)
  292. ocResponse.AddError("my_topic", 0, ErrNoError)
  293. newCoordinator.Returns(ocResponse)
  294. pom.MarkOffset(100, "modified_meta")
  295. safeClose(t, pom)
  296. safeClose(t, om)
  297. broker.Close()
  298. safeClose(t, testClient)
  299. }