functional_consumer_group_test.go 10 KB


  1. // +build go1.9
  2. package sarama
  3. import (
  4. "context"
  5. "fmt"
  6. "log"
  7. "reflect"
  8. "sync"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. )
  13. func TestFuncConsumerGroupPartitioning(t *testing.T) {
  14. checkKafkaVersion(t, "0.10.2")
  15. setupFunctionalTest(t)
  16. defer teardownFunctionalTest(t)
  17. groupID := testFuncConsumerGroupID(t)
  18. // start M1
  19. m1 := runTestFuncConsumerGroupMember(t, groupID, "M1", 0, nil)
  20. defer m1.Stop()
  21. m1.WaitForState(2)
  22. m1.WaitForClaims(map[string]int{"test.4": 4})
  23. m1.WaitForHandlers(4)
  24. // start M2
  25. m2 := runTestFuncConsumerGroupMember(t, groupID, "M2", 0, nil, "test.1", "test.4")
  26. defer m2.Stop()
  27. m2.WaitForState(2)
  28. // assert that claims are shared among both members
  29. m1.WaitForClaims(map[string]int{"test.4": 2})
  30. m1.WaitForHandlers(2)
  31. m2.WaitForClaims(map[string]int{"test.1": 1, "test.4": 2})
  32. m2.WaitForHandlers(3)
  33. // shutdown M1, wait for M2 to take over
  34. m1.AssertCleanShutdown()
  35. m2.WaitForClaims(map[string]int{"test.1": 1, "test.4": 4})
  36. m2.WaitForHandlers(5)
  37. // shutdown M2
  38. m2.AssertCleanShutdown()
  39. }
  40. func TestFuncConsumerGroupExcessConsumers(t *testing.T) {
  41. checkKafkaVersion(t, "0.10.2")
  42. setupFunctionalTest(t)
  43. defer teardownFunctionalTest(t)
  44. groupID := testFuncConsumerGroupID(t)
  45. // start members
  46. m1 := runTestFuncConsumerGroupMember(t, groupID, "M1", 0, nil)
  47. defer m1.Stop()
  48. m2 := runTestFuncConsumerGroupMember(t, groupID, "M2", 0, nil)
  49. defer m2.Stop()
  50. m3 := runTestFuncConsumerGroupMember(t, groupID, "M3", 0, nil)
  51. defer m3.Stop()
  52. m4 := runTestFuncConsumerGroupMember(t, groupID, "M4", 0, nil)
  53. defer m4.Stop()
  54. m1.WaitForClaims(map[string]int{"test.4": 1})
  55. m2.WaitForClaims(map[string]int{"test.4": 1})
  56. m3.WaitForClaims(map[string]int{"test.4": 1})
  57. m4.WaitForClaims(map[string]int{"test.4": 1})
  58. // start M5
  59. m5 := runTestFuncConsumerGroupMember(t, groupID, "M5", 0, nil)
  60. defer m5.Stop()
  61. m5.WaitForState(1)
  62. m5.AssertNoErrs()
  63. // assert that claims are shared among both members
  64. m4.AssertCleanShutdown()
  65. m5.WaitForState(2)
  66. m5.WaitForClaims(map[string]int{"test.4": 1})
  67. // shutdown everything
  68. m1.AssertCleanShutdown()
  69. m2.AssertCleanShutdown()
  70. m3.AssertCleanShutdown()
  71. m5.AssertCleanShutdown()
  72. }
  73. func TestFuncConsumerGroupFuzzy(t *testing.T) {
  74. checkKafkaVersion(t, "0.10.2")
  75. setupFunctionalTest(t)
  76. defer teardownFunctionalTest(t)
  77. if err := testFuncConsumerGroupFuzzySeed("test.4"); err != nil {
  78. t.Fatal(err)
  79. }
  80. groupID := testFuncConsumerGroupID(t)
  81. sink := &testFuncConsumerGroupSink{msgs: make(chan testFuncConsumerGroupMessage, 20000)}
  82. waitForMessages := func(t *testing.T, n int) {
  83. t.Helper()
  84. for i := 0; i < 600; i++ {
  85. if sink.Len() >= n {
  86. break
  87. }
  88. time.Sleep(100 * time.Millisecond)
  89. }
  90. if sz := sink.Len(); sz < n {
  91. log.Fatalf("expected to consume %d messages, but consumed %d", n, sz)
  92. }
  93. }
  94. defer runTestFuncConsumerGroupMember(t, groupID, "M1", 1500, sink).Stop()
  95. defer runTestFuncConsumerGroupMember(t, groupID, "M2", 3000, sink).Stop()
  96. defer runTestFuncConsumerGroupMember(t, groupID, "M3", 1500, sink).Stop()
  97. defer runTestFuncConsumerGroupMember(t, groupID, "M4", 200, sink).Stop()
  98. defer runTestFuncConsumerGroupMember(t, groupID, "M5", 100, sink).Stop()
  99. waitForMessages(t, 3000)
  100. defer runTestFuncConsumerGroupMember(t, groupID, "M6", 300, sink).Stop()
  101. defer runTestFuncConsumerGroupMember(t, groupID, "M7", 400, sink).Stop()
  102. defer runTestFuncConsumerGroupMember(t, groupID, "M8", 500, sink).Stop()
  103. defer runTestFuncConsumerGroupMember(t, groupID, "M9", 2000, sink).Stop()
  104. waitForMessages(t, 8000)
  105. defer runTestFuncConsumerGroupMember(t, groupID, "M10", 1000, sink).Stop()
  106. waitForMessages(t, 10000)
  107. defer runTestFuncConsumerGroupMember(t, groupID, "M11", 1000, sink).Stop()
  108. defer runTestFuncConsumerGroupMember(t, groupID, "M12", 2500, sink).Stop()
  109. waitForMessages(t, 12000)
  110. defer runTestFuncConsumerGroupMember(t, groupID, "M13", 1000, sink).Stop()
  111. waitForMessages(t, 15000)
  112. if umap := sink.Close(); len(umap) != 15000 {
  113. dupes := make(map[string][]string)
  114. for k, v := range umap {
  115. if len(v) > 1 {
  116. dupes[k] = v
  117. }
  118. }
  119. t.Fatalf("expected %d unique messages to be consumed but got %d, including %d duplicates:\n%v", 15000, len(umap), len(dupes), dupes)
  120. }
  121. }
  122. // --------------------------------------------------------------------
  123. func testFuncConsumerGroupID(t *testing.T) string {
  124. return fmt.Sprintf("sarama.%s%d", t.Name(), time.Now().UnixNano())
  125. }
  126. func testFuncConsumerGroupFuzzySeed(topic string) error {
  127. client, err := NewClient(kafkaBrokers, nil)
  128. if err != nil {
  129. return err
  130. }
  131. defer func() { _ = client.Close() }()
  132. total := int64(0)
  133. for pn := int32(0); pn < 4; pn++ {
  134. newest, err := client.GetOffset(topic, pn, OffsetNewest)
  135. if err != nil {
  136. return err
  137. }
  138. oldest, err := client.GetOffset(topic, pn, OffsetOldest)
  139. if err != nil {
  140. return err
  141. }
  142. total = total + newest - oldest
  143. }
  144. if total >= 21000 {
  145. return nil
  146. }
  147. producer, err := NewAsyncProducerFromClient(client)
  148. if err != nil {
  149. return err
  150. }
  151. for i := total; i < 21000; i++ {
  152. producer.Input() <- &ProducerMessage{Topic: topic, Value: ByteEncoder([]byte("testdata"))}
  153. }
  154. return producer.Close()
  155. }
  156. type testFuncConsumerGroupMessage struct {
  157. ClientID string
  158. *ConsumerMessage
  159. }
  160. type testFuncConsumerGroupSink struct {
  161. msgs chan testFuncConsumerGroupMessage
  162. count int32
  163. }
  164. func (s *testFuncConsumerGroupSink) Len() int {
  165. if s == nil {
  166. return -1
  167. }
  168. return int(atomic.LoadInt32(&s.count))
  169. }
  170. func (s *testFuncConsumerGroupSink) Push(clientID string, m *ConsumerMessage) {
  171. if s != nil {
  172. s.msgs <- testFuncConsumerGroupMessage{ClientID: clientID, ConsumerMessage: m}
  173. atomic.AddInt32(&s.count, 1)
  174. }
  175. }
  176. func (s *testFuncConsumerGroupSink) Close() map[string][]string {
  177. close(s.msgs)
  178. res := make(map[string][]string)
  179. for msg := range s.msgs {
  180. key := fmt.Sprintf("%s-%d:%d", msg.Topic, msg.Partition, msg.Offset)
  181. res[key] = append(res[key], msg.ClientID)
  182. }
  183. return res
  184. }
  185. type testFuncConsumerGroupMember struct {
  186. ConsumerGroup
  187. clientID string
  188. claims map[string]int
  189. state int32
  190. handlers int32
  191. errs []error
  192. maxMessages int32
  193. isCapped bool
  194. sink *testFuncConsumerGroupSink
  195. t *testing.T
  196. mu sync.RWMutex
  197. }
  198. func runTestFuncConsumerGroupMember(t *testing.T, groupID, clientID string, maxMessages int32, sink *testFuncConsumerGroupSink, topics ...string) *testFuncConsumerGroupMember {
  199. t.Helper()
  200. config := NewConfig()
  201. config.ClientID = clientID
  202. config.Version = V0_10_2_0
  203. config.Consumer.Return.Errors = true
  204. config.Consumer.Offsets.Initial = OffsetOldest
  205. config.Consumer.Group.Rebalance.Timeout = 10 * time.Second
  206. group, err := NewConsumerGroup(kafkaBrokers, groupID, config)
  207. if err != nil {
  208. t.Fatal(err)
  209. return nil
  210. }
  211. if len(topics) == 0 {
  212. topics = []string{"test.4"}
  213. }
  214. member := &testFuncConsumerGroupMember{
  215. ConsumerGroup: group,
  216. clientID: clientID,
  217. claims: make(map[string]int),
  218. maxMessages: maxMessages,
  219. isCapped: maxMessages != 0,
  220. sink: sink,
  221. t: t,
  222. }
  223. go member.loop(topics)
  224. return member
  225. }
  226. func (m *testFuncConsumerGroupMember) AssertCleanShutdown() {
  227. m.t.Helper()
  228. if err := m.Close(); err != nil {
  229. m.t.Fatalf("unexpected error on Close(): %v", err)
  230. }
  231. m.WaitForState(4)
  232. m.WaitForHandlers(0)
  233. m.AssertNoErrs()
  234. }
  235. func (m *testFuncConsumerGroupMember) AssertNoErrs() {
  236. m.t.Helper()
  237. var errs []error
  238. m.mu.RLock()
  239. errs = append(errs, m.errs...)
  240. m.mu.RUnlock()
  241. if len(errs) != 0 {
  242. m.t.Fatalf("unexpected consumer errors: %v", errs)
  243. }
  244. }
  245. func (m *testFuncConsumerGroupMember) WaitForState(expected int32) {
  246. m.t.Helper()
  247. m.waitFor("state", expected, func() (interface{}, error) {
  248. return atomic.LoadInt32(&m.state), nil
  249. })
  250. }
  251. func (m *testFuncConsumerGroupMember) WaitForHandlers(expected int) {
  252. m.t.Helper()
  253. m.waitFor("handlers", expected, func() (interface{}, error) {
  254. return int(atomic.LoadInt32(&m.handlers)), nil
  255. })
  256. }
  257. func (m *testFuncConsumerGroupMember) WaitForClaims(expected map[string]int) {
  258. m.t.Helper()
  259. m.waitFor("claims", expected, func() (interface{}, error) {
  260. m.mu.RLock()
  261. claims := m.claims
  262. m.mu.RUnlock()
  263. return claims, nil
  264. })
  265. }
  266. func (m *testFuncConsumerGroupMember) Stop() { _ = m.Close() }
  267. func (m *testFuncConsumerGroupMember) Setup(s ConsumerGroupSession) error {
  268. // store claims
  269. claims := make(map[string]int)
  270. for topic, partitions := range s.Claims() {
  271. claims[topic] = len(partitions)
  272. }
  273. m.mu.Lock()
  274. m.claims = claims
  275. m.mu.Unlock()
  276. // enter post-setup state
  277. atomic.StoreInt32(&m.state, 2)
  278. return nil
  279. }
  280. func (m *testFuncConsumerGroupMember) Cleanup(s ConsumerGroupSession) error {
  281. // enter post-cleanup state
  282. atomic.StoreInt32(&m.state, 3)
  283. return nil
  284. }
  285. func (m *testFuncConsumerGroupMember) ConsumeClaim(s ConsumerGroupSession, c ConsumerGroupClaim) error {
  286. atomic.AddInt32(&m.handlers, 1)
  287. defer atomic.AddInt32(&m.handlers, -1)
  288. for msg := range c.Messages() {
  289. if n := atomic.AddInt32(&m.maxMessages, -1); m.isCapped && n < 0 {
  290. break
  291. }
  292. s.MarkMessage(msg, "")
  293. m.sink.Push(m.clientID, msg)
  294. }
  295. return nil
  296. }
  297. func (m *testFuncConsumerGroupMember) waitFor(kind string, expected interface{}, factory func() (interface{}, error)) {
  298. m.t.Helper()
  299. deadline := time.NewTimer(60 * time.Second)
  300. defer deadline.Stop()
  301. ticker := time.NewTicker(100 * time.Millisecond)
  302. defer ticker.Stop()
  303. var actual interface{}
  304. for {
  305. var err error
  306. if actual, err = factory(); err != nil {
  307. m.t.Errorf("failed retrieve value, expected %s %#v but received error %v", kind, expected, err)
  308. }
  309. if reflect.DeepEqual(expected, actual) {
  310. return
  311. }
  312. select {
  313. case <-deadline.C:
  314. m.t.Fatalf("ttl exceeded, expected %s %#v but got %#v", kind, expected, actual)
  315. return
  316. case <-ticker.C:
  317. }
  318. }
  319. }
  320. func (m *testFuncConsumerGroupMember) loop(topics []string) {
  321. defer atomic.StoreInt32(&m.state, 4)
  322. go func() {
  323. for err := range m.Errors() {
  324. _ = m.Close()
  325. m.mu.Lock()
  326. m.errs = append(m.errs, err)
  327. m.mu.Unlock()
  328. }
  329. }()
  330. ctx := context.Background()
  331. for {
  332. // set state to pre-consume
  333. atomic.StoreInt32(&m.state, 1)
  334. if err := m.Consume(ctx, topics, m); err == ErrClosedConsumerGroup {
  335. return
  336. } else if err != nil {
  337. m.mu.Lock()
  338. m.errs = append(m.errs, err)
  339. m.mu.Unlock()
  340. return
  341. }
  342. // return if capped
  343. if n := atomic.LoadInt32(&m.maxMessages); m.isCapped && n < 0 {
  344. return
  345. }
  346. }
  347. }