client_test.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699
  1. package sarama
  2. import (
  3. "io"
  4. "sync"
  5. "sync/atomic"
  6. "testing"
  7. "time"
  8. )
  9. func safeClose(t testing.TB, c io.Closer) {
  10. err := c.Close()
  11. if err != nil {
  12. t.Error(err)
  13. }
  14. }
  15. func TestSimpleClient(t *testing.T) {
  16. seedBroker := NewMockBroker(t, 1)
  17. seedBroker.Returns(new(MetadataResponse))
  18. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  19. if err != nil {
  20. t.Fatal(err)
  21. }
  22. seedBroker.Close()
  23. safeClose(t, client)
  24. }
  25. func TestCachedPartitions(t *testing.T) {
  26. seedBroker := NewMockBroker(t, 1)
  27. replicas := []int32{3, 1, 5}
  28. isr := []int32{5, 1}
  29. metadataResponse := new(MetadataResponse)
  30. metadataResponse.AddBroker("localhost:12345", 2)
  31. metadataResponse.AddTopicPartition("my_topic", 0, 2, replicas, isr, ErrNoError)
  32. metadataResponse.AddTopicPartition("my_topic", 1, 2, replicas, isr, ErrLeaderNotAvailable)
  33. seedBroker.Returns(metadataResponse)
  34. config := NewConfig()
  35. config.Metadata.Retry.Max = 0
  36. c, err := NewClient([]string{seedBroker.Addr()}, config)
  37. if err != nil {
  38. t.Fatal(err)
  39. }
  40. client := c.(*client)
  41. // Verify they aren't cached the same
  42. allP := client.cachedPartitionsResults["my_topic"][allPartitions]
  43. writeP := client.cachedPartitionsResults["my_topic"][writablePartitions]
  44. if len(allP) == len(writeP) {
  45. t.Fatal("Invalid lengths!")
  46. }
  47. tmp := client.cachedPartitionsResults["my_topic"]
  48. // Verify we actually use the cache at all!
  49. tmp[allPartitions] = []int32{1, 2, 3, 4}
  50. client.cachedPartitionsResults["my_topic"] = tmp
  51. if 4 != len(client.cachedPartitions("my_topic", allPartitions)) {
  52. t.Fatal("Not using the cache!")
  53. }
  54. seedBroker.Close()
  55. safeClose(t, client)
  56. }
  57. func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
  58. seedBroker := NewMockBroker(t, 1)
  59. replicas := []int32{seedBroker.BrokerID()}
  60. metadataResponse := new(MetadataResponse)
  61. metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
  62. metadataResponse.AddTopicPartition("my_topic", 1, replicas[0], replicas, replicas, ErrNoError)
  63. metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, ErrNoError)
  64. seedBroker.Returns(metadataResponse)
  65. config := NewConfig()
  66. config.Metadata.Retry.Max = 0
  67. client, err := NewClient([]string{seedBroker.Addr()}, config)
  68. if err != nil {
  69. t.Fatal(err)
  70. }
  71. metadataResponse = new(MetadataResponse)
  72. metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
  73. seedBroker.Returns(metadataResponse)
  74. partitions, err := client.Partitions("unknown")
  75. if err != ErrUnknownTopicOrPartition {
  76. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  77. }
  78. if partitions != nil {
  79. t.Errorf("Should return nil as partition list, found %v", partitions)
  80. }
  81. // Should still use the cache of a known topic
  82. partitions, err = client.Partitions("my_topic")
  83. if err != nil {
  84. t.Errorf("Expected no error, found %v", err)
  85. }
  86. metadataResponse = new(MetadataResponse)
  87. metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
  88. seedBroker.Returns(metadataResponse)
  89. // Should not use cache for unknown topic
  90. partitions, err = client.Partitions("unknown")
  91. if err != ErrUnknownTopicOrPartition {
  92. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  93. }
  94. if partitions != nil {
  95. t.Errorf("Should return nil as partition list, found %v", partitions)
  96. }
  97. seedBroker.Close()
  98. safeClose(t, client)
  99. }
  100. func TestClientSeedBrokers(t *testing.T) {
  101. seedBroker := NewMockBroker(t, 1)
  102. metadataResponse := new(MetadataResponse)
  103. metadataResponse.AddBroker("localhost:12345", 2)
  104. seedBroker.Returns(metadataResponse)
  105. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  106. if err != nil {
  107. t.Fatal(err)
  108. }
  109. seedBroker.Close()
  110. safeClose(t, client)
  111. }
  112. func TestClientMetadata(t *testing.T) {
  113. seedBroker := NewMockBroker(t, 1)
  114. leader := NewMockBroker(t, 5)
  115. replicas := []int32{3, 1, 5}
  116. isr := []int32{5, 1}
  117. metadataResponse := new(MetadataResponse)
  118. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  119. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, ErrNoError)
  120. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, ErrLeaderNotAvailable)
  121. seedBroker.Returns(metadataResponse)
  122. config := NewConfig()
  123. config.Metadata.Retry.Max = 0
  124. client, err := NewClient([]string{seedBroker.Addr()}, config)
  125. if err != nil {
  126. t.Fatal(err)
  127. }
  128. topics, err := client.Topics()
  129. if err != nil {
  130. t.Error(err)
  131. } else if len(topics) != 1 || topics[0] != "my_topic" {
  132. t.Error("Client returned incorrect topics:", topics)
  133. }
  134. parts, err := client.Partitions("my_topic")
  135. if err != nil {
  136. t.Error(err)
  137. } else if len(parts) != 2 || parts[0] != 0 || parts[1] != 1 {
  138. t.Error("Client returned incorrect partitions for my_topic:", parts)
  139. }
  140. parts, err = client.WritablePartitions("my_topic")
  141. if err != nil {
  142. t.Error(err)
  143. } else if len(parts) != 1 || parts[0] != 0 {
  144. t.Error("Client returned incorrect writable partitions for my_topic:", parts)
  145. }
  146. tst, err := client.Leader("my_topic", 0)
  147. if err != nil {
  148. t.Error(err)
  149. } else if tst.ID() != 5 {
  150. t.Error("Leader for my_topic had incorrect ID.")
  151. }
  152. replicas, err = client.Replicas("my_topic", 0)
  153. if err != nil {
  154. t.Error(err)
  155. } else if replicas[0] != 3 {
  156. t.Error("Incorrect (or sorted) replica")
  157. } else if replicas[1] != 1 {
  158. t.Error("Incorrect (or sorted) replica")
  159. } else if replicas[2] != 5 {
  160. t.Error("Incorrect (or sorted) replica")
  161. }
  162. isr, err = client.InSyncReplicas("my_topic", 0)
  163. if err != nil {
  164. t.Error(err)
  165. } else if len(isr) != 2 {
  166. t.Error("Client returned incorrect ISRs for partition:", isr)
  167. } else if isr[0] != 5 {
  168. t.Error("Incorrect (or sorted) ISR:", isr)
  169. } else if isr[1] != 1 {
  170. t.Error("Incorrect (or sorted) ISR:", isr)
  171. }
  172. leader.Close()
  173. seedBroker.Close()
  174. safeClose(t, client)
  175. }
  176. func TestClientGetOffset(t *testing.T) {
  177. seedBroker := NewMockBroker(t, 1)
  178. leader := NewMockBroker(t, 2)
  179. leaderAddr := leader.Addr()
  180. metadata := new(MetadataResponse)
  181. metadata.AddTopicPartition("foo", 0, leader.BrokerID(), nil, nil, ErrNoError)
  182. metadata.AddBroker(leaderAddr, leader.BrokerID())
  183. seedBroker.Returns(metadata)
  184. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  185. if err != nil {
  186. t.Fatal(err)
  187. }
  188. offsetResponse := new(OffsetResponse)
  189. offsetResponse.AddTopicPartition("foo", 0, 123)
  190. leader.Returns(offsetResponse)
  191. offset, err := client.GetOffset("foo", 0, OffsetNewest)
  192. if err != nil {
  193. t.Error(err)
  194. }
  195. if offset != 123 {
  196. t.Error("Unexpected offset, got ", offset)
  197. }
  198. leader.Close()
  199. seedBroker.Returns(metadata)
  200. leader = NewMockBrokerAddr(t, 2, leaderAddr)
  201. offsetResponse = new(OffsetResponse)
  202. offsetResponse.AddTopicPartition("foo", 0, 456)
  203. leader.Returns(offsetResponse)
  204. offset, err = client.GetOffset("foo", 0, OffsetNewest)
  205. if err != nil {
  206. t.Error(err)
  207. }
  208. if offset != 456 {
  209. t.Error("Unexpected offset, got ", offset)
  210. }
  211. seedBroker.Close()
  212. leader.Close()
  213. safeClose(t, client)
  214. }
  215. func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) {
  216. seedBroker := NewMockBroker(t, 1)
  217. metadataResponse1 := new(MetadataResponse)
  218. seedBroker.Returns(metadataResponse1)
  219. retryCount := int32(0)
  220. config := NewConfig()
  221. config.Metadata.Retry.Max = 1
  222. config.Metadata.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
  223. atomic.AddInt32(&retryCount, 1)
  224. return 0
  225. }
  226. client, err := NewClient([]string{seedBroker.Addr()}, config)
  227. if err != nil {
  228. t.Fatal(err)
  229. }
  230. metadataUnknownTopic := new(MetadataResponse)
  231. metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
  232. seedBroker.Returns(metadataUnknownTopic)
  233. seedBroker.Returns(metadataUnknownTopic)
  234. if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
  235. t.Error("ErrUnknownTopicOrPartition expected, got", err)
  236. }
  237. safeClose(t, client)
  238. seedBroker.Close()
  239. actualRetryCount := atomic.LoadInt32(&retryCount)
  240. if actualRetryCount != 1 {
  241. t.Fatalf("Expected BackoffFunc to be called exactly once, but saw %d", actualRetryCount)
  242. }
  243. }
  244. func TestClientReceivingUnknownTopic(t *testing.T) {
  245. seedBroker := NewMockBroker(t, 1)
  246. metadataResponse1 := new(MetadataResponse)
  247. seedBroker.Returns(metadataResponse1)
  248. config := NewConfig()
  249. config.Metadata.Retry.Max = 1
  250. config.Metadata.Retry.Backoff = 0
  251. client, err := NewClient([]string{seedBroker.Addr()}, config)
  252. if err != nil {
  253. t.Fatal(err)
  254. }
  255. metadataUnknownTopic := new(MetadataResponse)
  256. metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
  257. seedBroker.Returns(metadataUnknownTopic)
  258. seedBroker.Returns(metadataUnknownTopic)
  259. if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
  260. t.Error("ErrUnknownTopicOrPartition expected, got", err)
  261. }
  262. // If we are asking for the leader of a partition of the non-existing topic.
  263. // we will request metadata again.
  264. seedBroker.Returns(metadataUnknownTopic)
  265. seedBroker.Returns(metadataUnknownTopic)
  266. if _, err = client.Leader("new_topic", 1); err != ErrUnknownTopicOrPartition {
  267. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  268. }
  269. safeClose(t, client)
  270. seedBroker.Close()
  271. }
  272. func TestClientReceivingPartialMetadata(t *testing.T) {
  273. seedBroker := NewMockBroker(t, 1)
  274. leader := NewMockBroker(t, 5)
  275. metadataResponse1 := new(MetadataResponse)
  276. metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
  277. seedBroker.Returns(metadataResponse1)
  278. config := NewConfig()
  279. config.Metadata.Retry.Max = 0
  280. client, err := NewClient([]string{seedBroker.Addr()}, config)
  281. if err != nil {
  282. t.Fatal(err)
  283. }
  284. replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()}
  285. metadataPartial := new(MetadataResponse)
  286. metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable)
  287. metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, ErrNoError)
  288. metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, ErrLeaderNotAvailable)
  289. seedBroker.Returns(metadataPartial)
  290. if err := client.RefreshMetadata("new_topic"); err != nil {
  291. t.Error("ErrLeaderNotAvailable should not make RefreshMetadata respond with an error")
  292. }
  293. // Even though the metadata was incomplete, we should be able to get the leader of a partition
  294. // for which we did get a useful response, without doing additional requests.
  295. partition0Leader, err := client.Leader("new_topic", 0)
  296. if err != nil {
  297. t.Error(err)
  298. } else if partition0Leader.Addr() != leader.Addr() {
  299. t.Error("Unexpected leader returned", partition0Leader.Addr())
  300. }
  301. // If we are asking for the leader of a partition that didn't have a leader before,
  302. // we will do another metadata request.
  303. seedBroker.Returns(metadataPartial)
  304. // Still no leader for the partition, so asking for it should return an error.
  305. _, err = client.Leader("new_topic", 1)
  306. if err != ErrLeaderNotAvailable {
  307. t.Error("Expected ErrLeaderNotAvailable, got", err)
  308. }
  309. safeClose(t, client)
  310. seedBroker.Close()
  311. leader.Close()
  312. }
  313. func TestClientRefreshBehaviour(t *testing.T) {
  314. seedBroker := NewMockBroker(t, 1)
  315. leader := NewMockBroker(t, 5)
  316. metadataResponse1 := new(MetadataResponse)
  317. metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
  318. seedBroker.Returns(metadataResponse1)
  319. metadataResponse2 := new(MetadataResponse)
  320. metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, ErrNoError)
  321. seedBroker.Returns(metadataResponse2)
  322. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  323. if err != nil {
  324. t.Fatal(err)
  325. }
  326. parts, err := client.Partitions("my_topic")
  327. if err != nil {
  328. t.Error(err)
  329. } else if len(parts) != 1 || parts[0] != 0xb {
  330. t.Error("Client returned incorrect partitions for my_topic:", parts)
  331. }
  332. tst, err := client.Leader("my_topic", 0xb)
  333. if err != nil {
  334. t.Error(err)
  335. } else if tst.ID() != 5 {
  336. t.Error("Leader for my_topic had incorrect ID.")
  337. }
  338. leader.Close()
  339. seedBroker.Close()
  340. safeClose(t, client)
  341. }
  342. func TestClientResurrectDeadSeeds(t *testing.T) {
  343. initialSeed := NewMockBroker(t, 0)
  344. emptyMetadata := new(MetadataResponse)
  345. initialSeed.Returns(emptyMetadata)
  346. conf := NewConfig()
  347. conf.Metadata.Retry.Backoff = 0
  348. conf.Metadata.RefreshFrequency = 0
  349. c, err := NewClient([]string{initialSeed.Addr()}, conf)
  350. if err != nil {
  351. t.Fatal(err)
  352. }
  353. initialSeed.Close()
  354. client := c.(*client)
  355. seed1 := NewMockBroker(t, 1)
  356. seed2 := NewMockBroker(t, 2)
  357. seed3 := NewMockBroker(t, 3)
  358. addr1 := seed1.Addr()
  359. addr2 := seed2.Addr()
  360. addr3 := seed3.Addr()
  361. // Overwrite the seed brokers with a fixed ordering to make this test deterministic.
  362. safeClose(t, client.seedBrokers[0])
  363. client.seedBrokers = []*Broker{NewBroker(addr1), NewBroker(addr2), NewBroker(addr3)}
  364. client.deadSeeds = []*Broker{}
  365. wg := sync.WaitGroup{}
  366. wg.Add(1)
  367. go func() {
  368. if err := client.RefreshMetadata(); err != nil {
  369. t.Error(err)
  370. }
  371. wg.Done()
  372. }()
  373. seed1.Close()
  374. seed2.Close()
  375. seed1 = NewMockBrokerAddr(t, 1, addr1)
  376. seed2 = NewMockBrokerAddr(t, 2, addr2)
  377. seed3.Close()
  378. seed1.Close()
  379. seed2.Returns(emptyMetadata)
  380. wg.Wait()
  381. if len(client.seedBrokers) != 2 {
  382. t.Error("incorrect number of live seeds")
  383. }
  384. if len(client.deadSeeds) != 1 {
  385. t.Error("incorrect number of dead seeds")
  386. }
  387. safeClose(t, c)
  388. }
  389. func TestClientController(t *testing.T) {
  390. seedBroker := NewMockBroker(t, 1)
  391. defer seedBroker.Close()
  392. controllerBroker := NewMockBroker(t, 2)
  393. defer controllerBroker.Close()
  394. seedBroker.SetHandlerByMap(map[string]MockResponse{
  395. "MetadataRequest": NewMockMetadataResponse(t).
  396. SetController(controllerBroker.BrokerID()).
  397. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  398. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()),
  399. })
  400. cfg := NewConfig()
  401. // test kafka version greater than 0.10.0.0
  402. cfg.Version = V0_10_0_0
  403. client1, err := NewClient([]string{seedBroker.Addr()}, cfg)
  404. if err != nil {
  405. t.Fatal(err)
  406. }
  407. defer safeClose(t, client1)
  408. broker, err := client1.Controller()
  409. if err != nil {
  410. t.Fatal(err)
  411. }
  412. if broker.Addr() != controllerBroker.Addr() {
  413. t.Errorf("Expected controller to have address %s, found %s", controllerBroker.Addr(), broker.Addr())
  414. }
  415. // test kafka version earlier than 0.10.0.0
  416. cfg.Version = V0_9_0_1
  417. client2, err := NewClient([]string{seedBroker.Addr()}, cfg)
  418. if err != nil {
  419. t.Fatal(err)
  420. }
  421. defer safeClose(t, client2)
  422. if _, err = client2.Controller(); err != ErrUnsupportedVersion {
  423. t.Errorf("Expected Contoller() to return %s, found %s", ErrUnsupportedVersion, err)
  424. }
  425. }
  426. func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
  427. seedBroker := NewMockBroker(t, 1)
  428. staleCoordinator := NewMockBroker(t, 2)
  429. freshCoordinator := NewMockBroker(t, 3)
  430. replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()}
  431. metadataResponse1 := new(MetadataResponse)
  432. metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID())
  433. metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID())
  434. metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, ErrNoError)
  435. seedBroker.Returns(metadataResponse1)
  436. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  437. if err != nil {
  438. t.Fatal(err)
  439. }
  440. coordinatorResponse1 := new(ConsumerMetadataResponse)
  441. coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
  442. seedBroker.Returns(coordinatorResponse1)
  443. coordinatorResponse2 := new(ConsumerMetadataResponse)
  444. coordinatorResponse2.CoordinatorID = staleCoordinator.BrokerID()
  445. coordinatorResponse2.CoordinatorHost = "127.0.0.1"
  446. coordinatorResponse2.CoordinatorPort = staleCoordinator.Port()
  447. seedBroker.Returns(coordinatorResponse2)
  448. broker, err := client.Coordinator("my_group")
  449. if err != nil {
  450. t.Error(err)
  451. }
  452. if staleCoordinator.Addr() != broker.Addr() {
  453. t.Errorf("Expected coordinator to have address %s, found %s", staleCoordinator.Addr(), broker.Addr())
  454. }
  455. if staleCoordinator.BrokerID() != broker.ID() {
  456. t.Errorf("Expected coordinator to have ID %d, found %d", staleCoordinator.BrokerID(), broker.ID())
  457. }
  458. // Grab the cached value
  459. broker2, err := client.Coordinator("my_group")
  460. if err != nil {
  461. t.Error(err)
  462. }
  463. if broker2.Addr() != broker.Addr() {
  464. t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr())
  465. }
  466. coordinatorResponse3 := new(ConsumerMetadataResponse)
  467. coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID()
  468. coordinatorResponse3.CoordinatorHost = "127.0.0.1"
  469. coordinatorResponse3.CoordinatorPort = freshCoordinator.Port()
  470. seedBroker.Returns(coordinatorResponse3)
  471. // Refresh the locally cahced value because it's stale
  472. if err := client.RefreshCoordinator("my_group"); err != nil {
  473. t.Error(err)
  474. }
  475. // Grab the fresh value
  476. broker3, err := client.Coordinator("my_group")
  477. if err != nil {
  478. t.Error(err)
  479. }
  480. if broker3.Addr() != freshCoordinator.Addr() {
  481. t.Errorf("Expected the freshCoordinator to be returned, but found %s.", broker3.Addr())
  482. }
  483. freshCoordinator.Close()
  484. staleCoordinator.Close()
  485. seedBroker.Close()
  486. safeClose(t, client)
  487. }
  488. func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
  489. seedBroker := NewMockBroker(t, 1)
  490. coordinator := NewMockBroker(t, 2)
  491. metadataResponse1 := new(MetadataResponse)
  492. seedBroker.Returns(metadataResponse1)
  493. config := NewConfig()
  494. config.Metadata.Retry.Max = 1
  495. config.Metadata.Retry.Backoff = 0
  496. client, err := NewClient([]string{seedBroker.Addr()}, config)
  497. if err != nil {
  498. t.Fatal(err)
  499. }
  500. coordinatorResponse1 := new(ConsumerMetadataResponse)
  501. coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
  502. seedBroker.Returns(coordinatorResponse1)
  503. metadataResponse2 := new(MetadataResponse)
  504. metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition)
  505. seedBroker.Returns(metadataResponse2)
  506. replicas := []int32{coordinator.BrokerID()}
  507. metadataResponse3 := new(MetadataResponse)
  508. metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, ErrNoError)
  509. seedBroker.Returns(metadataResponse3)
  510. coordinatorResponse2 := new(ConsumerMetadataResponse)
  511. coordinatorResponse2.CoordinatorID = coordinator.BrokerID()
  512. coordinatorResponse2.CoordinatorHost = "127.0.0.1"
  513. coordinatorResponse2.CoordinatorPort = coordinator.Port()
  514. seedBroker.Returns(coordinatorResponse2)
  515. broker, err := client.Coordinator("my_group")
  516. if err != nil {
  517. t.Error(err)
  518. }
  519. if coordinator.Addr() != broker.Addr() {
  520. t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr())
  521. }
  522. if coordinator.BrokerID() != broker.ID() {
  523. t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID())
  524. }
  525. coordinator.Close()
  526. seedBroker.Close()
  527. safeClose(t, client)
  528. }
  529. func TestClientAutorefreshShutdownRace(t *testing.T) {
  530. seedBroker := NewMockBroker(t, 1)
  531. metadataResponse := new(MetadataResponse)
  532. seedBroker.Returns(metadataResponse)
  533. conf := NewConfig()
  534. conf.Metadata.RefreshFrequency = 100 * time.Millisecond
  535. client, err := NewClient([]string{seedBroker.Addr()}, conf)
  536. if err != nil {
  537. t.Fatal(err)
  538. }
  539. // Wait for the background refresh to kick in
  540. time.Sleep(110 * time.Millisecond)
  541. done := make(chan none)
  542. go func() {
  543. // Close the client
  544. if err := client.Close(); err != nil {
  545. t.Fatal(err)
  546. }
  547. close(done)
  548. }()
  549. // Wait for the Close to kick in
  550. time.Sleep(10 * time.Millisecond)
  551. // Then return some metadata to the still-running background thread
  552. leader := NewMockBroker(t, 2)
  553. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  554. metadataResponse.AddTopicPartition("foo", 0, leader.BrokerID(), []int32{2}, []int32{2}, ErrNoError)
  555. seedBroker.Returns(metadataResponse)
  556. <-done
  557. seedBroker.Close()
  558. // give the update time to happen so we get a panic if it's still running (which it shouldn't)
  559. time.Sleep(10 * time.Millisecond)
  560. }