offset_manager_test.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package sarama
  2. import (
  3. "log"
  4. "testing"
  5. "time"
  6. )
  7. var (
  8. broker, coordinator *mockBroker
  9. testClient Client
  10. )
  11. func TestNewOffsetManager(t *testing.T) {
  12. seedBroker := newMockBroker(t, 1)
  13. seedBroker.Returns(new(MetadataResponse))
  14. testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
  15. if err != nil {
  16. t.Fatal(err)
  17. }
  18. _, err = NewOffsetManagerFromClient("grouop", testClient)
  19. if err != nil {
  20. t.Error(err)
  21. }
  22. testClient.Close()
  23. _, err = NewOffsetManagerFromClient("group", testClient)
  24. if err != ErrClosedClient {
  25. t.Errorf("Error expected for closed client; actual value: %v", err)
  26. }
  27. seedBroker.Close()
  28. }
  29. func TestFetchInitialFail(t *testing.T) {
  30. // Mostly copy-paste from initPOM
  31. // TODO: eliminate this repetition
  32. config := NewConfig()
  33. config.Metadata.Retry.Max = 0
  34. config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
  35. broker = newMockBroker(t, 1)
  36. coordinator = newMockBroker(t, 2)
  37. seedMeta := new(MetadataResponse)
  38. seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
  39. seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
  40. seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
  41. broker.Returns(seedMeta)
  42. var err error
  43. testClient, err = NewClient([]string{broker.Addr()}, config)
  44. if err != nil {
  45. t.Fatal(err)
  46. }
  47. broker.Returns(&ConsumerMetadataResponse{
  48. CoordinatorID: coordinator.BrokerID(),
  49. CoordinatorHost: "127.0.0.1",
  50. CoordinatorPort: coordinator.Port(),
  51. })
  52. om, err := NewOffsetManagerFromClient("group", testClient)
  53. if err != nil {
  54. t.Fatal(err)
  55. }
  56. fetchResponse := new(OffsetFetchResponse)
  57. // Only Err below modified
  58. fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
  59. Err: ErrNotCoordinatorForConsumer,
  60. Offset: 5,
  61. Metadata: "test_meta",
  62. })
  63. coordinator.Returns(fetchResponse)
  64. _, err = om.ManagePartition("my_topic", 0)
  65. if err != ErrNotCoordinatorForConsumer {
  66. t.Error(err)
  67. }
  68. }
  69. func initPOM(t *testing.T) PartitionOffsetManager {
  70. config := NewConfig()
  71. config.Metadata.Retry.Max = 0
  72. config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
  73. broker = newMockBroker(t, 1)
  74. coordinator = newMockBroker(t, 2)
  75. seedMeta := new(MetadataResponse)
  76. seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
  77. seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
  78. seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
  79. broker.Returns(seedMeta)
  80. var err error
  81. testClient, err = NewClient([]string{broker.Addr()}, config)
  82. if err != nil {
  83. t.Fatal(err)
  84. }
  85. broker.Returns(&ConsumerMetadataResponse{
  86. CoordinatorID: coordinator.BrokerID(),
  87. CoordinatorHost: "127.0.0.1",
  88. CoordinatorPort: coordinator.Port(),
  89. })
  90. om, err := NewOffsetManagerFromClient("group", testClient)
  91. if err != nil {
  92. t.Fatal(err)
  93. }
  94. fetchResponse := new(OffsetFetchResponse)
  95. fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
  96. Err: ErrNoError,
  97. Offset: 5,
  98. Metadata: "test_meta",
  99. })
  100. coordinator.Returns(fetchResponse)
  101. pom, err := om.ManagePartition("my_topic", 0)
  102. if err != nil {
  103. t.Fatal(err)
  104. }
  105. return pom
  106. }
  107. func TestOffset(t *testing.T) {
  108. pom := initPOM(t)
  109. offset, meta := pom.Offset()
  110. if offset != 5 {
  111. t.Errorf("Expected offset 5. Actual: %v", offset)
  112. }
  113. if meta != "test_meta" {
  114. t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
  115. }
  116. pom.Close()
  117. testClient.Close()
  118. broker.Close()
  119. coordinator.Close()
  120. }
  121. func TestSetOffset(t *testing.T) {
  122. pom := initPOM(t)
  123. ocResponse := new(OffsetCommitResponse)
  124. ocResponse.AddError("my_topic", 0, ErrNoError)
  125. coordinator.Returns(ocResponse)
  126. pom.SetOffset(100, "modified_meta")
  127. offset, meta := pom.Offset()
  128. if offset != 100 {
  129. t.Errorf("Expected offset 100. Actual: %v", offset)
  130. }
  131. if meta != "modified_meta" {
  132. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  133. }
  134. pom.Close()
  135. testClient.Close()
  136. broker.Close()
  137. coordinator.Close()
  138. }
  139. // This test is not passing
  140. func TestCommitErr(t *testing.T) {
  141. log.Println("TestCommitErr")
  142. log.Println("=============")
  143. pom := initPOM(t)
  144. ocResponse := new(OffsetCommitResponse)
  145. // ocResponse.Errors
  146. ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  147. ocResponse.AddError("my_topic", 1, ErrNoError)
  148. coordinator.Returns(ocResponse)
  149. // ocResponse2 := new(OffsetCommitResponse)
  150. // // ocResponse.Errors
  151. // ocResponse2.AddError("my_topic", 1, ErrNoError)
  152. // coordinator.Returns(ocResponse2)
  153. freshCoordinator := newMockBroker(t, 3)
  154. // For RefreshCoordinator()
  155. coordinatorResponse3 := new(ConsumerMetadataResponse)
  156. coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID()
  157. coordinatorResponse3.CoordinatorHost = "127.0.0.1"
  158. coordinatorResponse3.CoordinatorPort = freshCoordinator.Port()
  159. broker.Returns(coordinatorResponse3)
  160. ocResponse2 := new(OffsetCommitResponse)
  161. // ocResponse.Errors
  162. // ocResponse2.AddError("my_topic", 0, ErrOffsetOutOfRange)
  163. ocResponse2.AddError("my_topic", 0, ErrNoError)
  164. freshCoordinator.Returns(ocResponse2)
  165. pom.SetOffset(100, "modified_meta")
  166. err := pom.Close()
  167. if err != nil {
  168. t.Error(err)
  169. }
  170. }