client_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787
  1. package sarama
  2. import (
  3. "io"
  4. "sync"
  5. "sync/atomic"
  6. "testing"
  7. "time"
  8. )
  9. func safeClose(t testing.TB, c io.Closer) {
  10. err := c.Close()
  11. if err != nil {
  12. t.Error(err)
  13. }
  14. }
  15. func TestSimpleClient(t *testing.T) {
  16. seedBroker := NewMockBroker(t, 1)
  17. seedBroker.Returns(new(MetadataResponse))
  18. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  19. if err != nil {
  20. t.Fatal(err)
  21. }
  22. seedBroker.Close()
  23. safeClose(t, client)
  24. }
  25. func TestCachedPartitions(t *testing.T) {
  26. seedBroker := NewMockBroker(t, 1)
  27. replicas := []int32{3, 1, 5}
  28. isr := []int32{5, 1}
  29. metadataResponse := new(MetadataResponse)
  30. metadataResponse.AddBroker("localhost:12345", 2)
  31. metadataResponse.AddTopicPartition("my_topic", 0, 2, replicas, isr, []int32{}, ErrNoError)
  32. metadataResponse.AddTopicPartition("my_topic", 1, 2, replicas, isr, []int32{}, ErrLeaderNotAvailable)
  33. seedBroker.Returns(metadataResponse)
  34. config := NewConfig()
  35. config.Metadata.Retry.Max = 0
  36. c, err := NewClient([]string{seedBroker.Addr()}, config)
  37. if err != nil {
  38. t.Fatal(err)
  39. }
  40. client := c.(*client)
  41. // Verify they aren't cached the same
  42. allP := client.cachedPartitionsResults["my_topic"][allPartitions]
  43. writeP := client.cachedPartitionsResults["my_topic"][writablePartitions]
  44. if len(allP) == len(writeP) {
  45. t.Fatal("Invalid lengths!")
  46. }
  47. tmp := client.cachedPartitionsResults["my_topic"]
  48. // Verify we actually use the cache at all!
  49. tmp[allPartitions] = []int32{1, 2, 3, 4}
  50. client.cachedPartitionsResults["my_topic"] = tmp
  51. if 4 != len(client.cachedPartitions("my_topic", allPartitions)) {
  52. t.Fatal("Not using the cache!")
  53. }
  54. seedBroker.Close()
  55. safeClose(t, client)
  56. }
  57. func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
  58. seedBroker := NewMockBroker(t, 1)
  59. replicas := []int32{seedBroker.BrokerID()}
  60. metadataResponse := new(MetadataResponse)
  61. metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
  62. metadataResponse.AddTopicPartition("my_topic", 1, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  63. metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  64. seedBroker.Returns(metadataResponse)
  65. config := NewConfig()
  66. config.Metadata.Retry.Max = 0
  67. client, err := NewClient([]string{seedBroker.Addr()}, config)
  68. if err != nil {
  69. t.Fatal(err)
  70. }
  71. metadataResponse = new(MetadataResponse)
  72. metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
  73. seedBroker.Returns(metadataResponse)
  74. partitions, err := client.Partitions("unknown")
  75. if err != ErrUnknownTopicOrPartition {
  76. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  77. }
  78. if partitions != nil {
  79. t.Errorf("Should return nil as partition list, found %v", partitions)
  80. }
  81. // Should still use the cache of a known topic
  82. partitions, err = client.Partitions("my_topic")
  83. if err != nil {
  84. t.Errorf("Expected no error, found %v", err)
  85. }
  86. metadataResponse = new(MetadataResponse)
  87. metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
  88. seedBroker.Returns(metadataResponse)
  89. // Should not use cache for unknown topic
  90. partitions, err = client.Partitions("unknown")
  91. if err != ErrUnknownTopicOrPartition {
  92. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  93. }
  94. if partitions != nil {
  95. t.Errorf("Should return nil as partition list, found %v", partitions)
  96. }
  97. seedBroker.Close()
  98. safeClose(t, client)
  99. }
  100. func TestClientSeedBrokers(t *testing.T) {
  101. seedBroker := NewMockBroker(t, 1)
  102. metadataResponse := new(MetadataResponse)
  103. metadataResponse.AddBroker("localhost:12345", 2)
  104. seedBroker.Returns(metadataResponse)
  105. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  106. if err != nil {
  107. t.Fatal(err)
  108. }
  109. seedBroker.Close()
  110. safeClose(t, client)
  111. }
  112. func TestClientMetadata(t *testing.T) {
  113. seedBroker := NewMockBroker(t, 1)
  114. leader := NewMockBroker(t, 5)
  115. replicas := []int32{3, 1, 5}
  116. isr := []int32{5, 1}
  117. metadataResponse := new(MetadataResponse)
  118. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  119. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, []int32{}, ErrNoError)
  120. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrLeaderNotAvailable)
  121. seedBroker.Returns(metadataResponse)
  122. config := NewConfig()
  123. config.Metadata.Retry.Max = 0
  124. client, err := NewClient([]string{seedBroker.Addr()}, config)
  125. if err != nil {
  126. t.Fatal(err)
  127. }
  128. topics, err := client.Topics()
  129. if err != nil {
  130. t.Error(err)
  131. } else if len(topics) != 1 || topics[0] != "my_topic" {
  132. t.Error("Client returned incorrect topics:", topics)
  133. }
  134. parts, err := client.Partitions("my_topic")
  135. if err != nil {
  136. t.Error(err)
  137. } else if len(parts) != 2 || parts[0] != 0 || parts[1] != 1 {
  138. t.Error("Client returned incorrect partitions for my_topic:", parts)
  139. }
  140. parts, err = client.WritablePartitions("my_topic")
  141. if err != nil {
  142. t.Error(err)
  143. } else if len(parts) != 1 || parts[0] != 0 {
  144. t.Error("Client returned incorrect writable partitions for my_topic:", parts)
  145. }
  146. tst, err := client.Leader("my_topic", 0)
  147. if err != nil {
  148. t.Error(err)
  149. } else if tst.ID() != 5 {
  150. t.Error("Leader for my_topic had incorrect ID.")
  151. }
  152. replicas, err = client.Replicas("my_topic", 0)
  153. if err != nil {
  154. t.Error(err)
  155. } else if replicas[0] != 3 {
  156. t.Error("Incorrect (or sorted) replica")
  157. } else if replicas[1] != 1 {
  158. t.Error("Incorrect (or sorted) replica")
  159. } else if replicas[2] != 5 {
  160. t.Error("Incorrect (or sorted) replica")
  161. }
  162. isr, err = client.InSyncReplicas("my_topic", 0)
  163. if err != nil {
  164. t.Error(err)
  165. } else if len(isr) != 2 {
  166. t.Error("Client returned incorrect ISRs for partition:", isr)
  167. } else if isr[0] != 5 {
  168. t.Error("Incorrect (or sorted) ISR:", isr)
  169. } else if isr[1] != 1 {
  170. t.Error("Incorrect (or sorted) ISR:", isr)
  171. }
  172. leader.Close()
  173. seedBroker.Close()
  174. safeClose(t, client)
  175. }
  176. func TestClientMetadataWithOfflineReplicas(t *testing.T) {
  177. seedBroker := NewMockBroker(t, 1)
  178. leader := NewMockBroker(t, 5)
  179. replicas := []int32{1, 2, 3}
  180. isr := []int32{1, 2}
  181. offlineReplicas := []int32{3}
  182. metadataResponse := new(MetadataResponse)
  183. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  184. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, offlineReplicas, ErrNoError)
  185. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrNoError)
  186. metadataResponse.Version = 5
  187. seedBroker.Returns(metadataResponse)
  188. config := NewConfig()
  189. config.Version = V1_0_0_0
  190. config.Metadata.Retry.Max = 0
  191. client, err := NewClient([]string{seedBroker.Addr()}, config)
  192. if err != nil {
  193. t.Fatal(err)
  194. }
  195. topics, err := client.Topics()
  196. if err != nil {
  197. t.Error(err)
  198. } else if len(topics) != 1 || topics[0] != "my_topic" {
  199. t.Error("Client returned incorrect topics:", topics)
  200. }
  201. parts, err := client.Partitions("my_topic")
  202. if err != nil {
  203. t.Error(err)
  204. } else if len(parts) != 2 || parts[0] != 0 || parts[1] != 1 {
  205. t.Error("Client returned incorrect partitions for my_topic:", parts)
  206. }
  207. parts, err = client.WritablePartitions("my_topic")
  208. if err != nil {
  209. t.Error(err)
  210. } else if len(parts) != 2 {
  211. t.Error("Client returned incorrect writable partitions for my_topic:", parts)
  212. }
  213. tst, err := client.Leader("my_topic", 0)
  214. if err != nil {
  215. t.Error(err)
  216. } else if tst.ID() != 5 {
  217. t.Error("Leader for my_topic had incorrect ID.")
  218. }
  219. replicas, err = client.Replicas("my_topic", 0)
  220. if err != nil {
  221. t.Error(err)
  222. } else if replicas[0] != 1 {
  223. t.Error("Incorrect (or sorted) replica")
  224. } else if replicas[1] != 2 {
  225. t.Error("Incorrect (or sorted) replica")
  226. } else if replicas[2] != 3 {
  227. t.Error("Incorrect (or sorted) replica")
  228. }
  229. isr, err = client.InSyncReplicas("my_topic", 0)
  230. if err != nil {
  231. t.Error(err)
  232. } else if len(isr) != 2 {
  233. t.Error("Client returned incorrect ISRs for partition:", isr)
  234. } else if isr[0] != 1 {
  235. t.Error("Incorrect (or sorted) ISR:", isr)
  236. } else if isr[1] != 2 {
  237. t.Error("Incorrect (or sorted) ISR:", isr)
  238. }
  239. offlineReplicas, err = client.OfflineReplicas("my_topic", 0)
  240. if err != nil {
  241. t.Error(err)
  242. } else if len(offlineReplicas) != 1 {
  243. t.Error("Client returned incorrect offline replicas for partition:", offlineReplicas)
  244. } else if offlineReplicas[0] != 3 {
  245. t.Error("Incorrect offline replica:", offlineReplicas)
  246. }
  247. leader.Close()
  248. seedBroker.Close()
  249. safeClose(t, client)
  250. }
  251. func TestClientGetOffset(t *testing.T) {
  252. seedBroker := NewMockBroker(t, 1)
  253. leader := NewMockBroker(t, 2)
  254. leaderAddr := leader.Addr()
  255. metadata := new(MetadataResponse)
  256. metadata.AddTopicPartition("foo", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  257. metadata.AddBroker(leaderAddr, leader.BrokerID())
  258. seedBroker.Returns(metadata)
  259. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  260. if err != nil {
  261. t.Fatal(err)
  262. }
  263. offsetResponse := new(OffsetResponse)
  264. offsetResponse.AddTopicPartition("foo", 0, 123)
  265. leader.Returns(offsetResponse)
  266. offset, err := client.GetOffset("foo", 0, OffsetNewest)
  267. if err != nil {
  268. t.Error(err)
  269. }
  270. if offset != 123 {
  271. t.Error("Unexpected offset, got ", offset)
  272. }
  273. leader.Close()
  274. seedBroker.Returns(metadata)
  275. leader = NewMockBrokerAddr(t, 2, leaderAddr)
  276. offsetResponse = new(OffsetResponse)
  277. offsetResponse.AddTopicPartition("foo", 0, 456)
  278. leader.Returns(offsetResponse)
  279. offset, err = client.GetOffset("foo", 0, OffsetNewest)
  280. if err != nil {
  281. t.Error(err)
  282. }
  283. if offset != 456 {
  284. t.Error("Unexpected offset, got ", offset)
  285. }
  286. seedBroker.Close()
  287. leader.Close()
  288. safeClose(t, client)
  289. }
  290. func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) {
  291. seedBroker := NewMockBroker(t, 1)
  292. metadataResponse1 := new(MetadataResponse)
  293. seedBroker.Returns(metadataResponse1)
  294. retryCount := int32(0)
  295. config := NewConfig()
  296. config.Metadata.Retry.Max = 1
  297. config.Metadata.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
  298. atomic.AddInt32(&retryCount, 1)
  299. return 0
  300. }
  301. client, err := NewClient([]string{seedBroker.Addr()}, config)
  302. if err != nil {
  303. t.Fatal(err)
  304. }
  305. metadataUnknownTopic := new(MetadataResponse)
  306. metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
  307. seedBroker.Returns(metadataUnknownTopic)
  308. seedBroker.Returns(metadataUnknownTopic)
  309. if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
  310. t.Error("ErrUnknownTopicOrPartition expected, got", err)
  311. }
  312. safeClose(t, client)
  313. seedBroker.Close()
  314. actualRetryCount := atomic.LoadInt32(&retryCount)
  315. if actualRetryCount != 1 {
  316. t.Fatalf("Expected BackoffFunc to be called exactly once, but saw %d", actualRetryCount)
  317. }
  318. }
  319. func TestClientReceivingUnknownTopic(t *testing.T) {
  320. seedBroker := NewMockBroker(t, 1)
  321. metadataResponse1 := new(MetadataResponse)
  322. seedBroker.Returns(metadataResponse1)
  323. config := NewConfig()
  324. config.Metadata.Retry.Max = 1
  325. config.Metadata.Retry.Backoff = 0
  326. client, err := NewClient([]string{seedBroker.Addr()}, config)
  327. if err != nil {
  328. t.Fatal(err)
  329. }
  330. metadataUnknownTopic := new(MetadataResponse)
  331. metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
  332. seedBroker.Returns(metadataUnknownTopic)
  333. seedBroker.Returns(metadataUnknownTopic)
  334. if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
  335. t.Error("ErrUnknownTopicOrPartition expected, got", err)
  336. }
  337. // If we are asking for the leader of a partition of the non-existing topic.
  338. // we will request metadata again.
  339. seedBroker.Returns(metadataUnknownTopic)
  340. seedBroker.Returns(metadataUnknownTopic)
  341. if _, err = client.Leader("new_topic", 1); err != ErrUnknownTopicOrPartition {
  342. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  343. }
  344. safeClose(t, client)
  345. seedBroker.Close()
  346. }
  347. func TestClientReceivingPartialMetadata(t *testing.T) {
  348. seedBroker := NewMockBroker(t, 1)
  349. leader := NewMockBroker(t, 5)
  350. metadataResponse1 := new(MetadataResponse)
  351. metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
  352. seedBroker.Returns(metadataResponse1)
  353. config := NewConfig()
  354. config.Metadata.Retry.Max = 0
  355. client, err := NewClient([]string{seedBroker.Addr()}, config)
  356. if err != nil {
  357. t.Fatal(err)
  358. }
  359. replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()}
  360. metadataPartial := new(MetadataResponse)
  361. metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable)
  362. metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, []int32{}, ErrNoError)
  363. metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, []int32{}, ErrLeaderNotAvailable)
  364. seedBroker.Returns(metadataPartial)
  365. if err := client.RefreshMetadata("new_topic"); err != nil {
  366. t.Error("ErrLeaderNotAvailable should not make RefreshMetadata respond with an error")
  367. }
  368. // Even though the metadata was incomplete, we should be able to get the leader of a partition
  369. // for which we did get a useful response, without doing additional requests.
  370. partition0Leader, err := client.Leader("new_topic", 0)
  371. if err != nil {
  372. t.Error(err)
  373. } else if partition0Leader.Addr() != leader.Addr() {
  374. t.Error("Unexpected leader returned", partition0Leader.Addr())
  375. }
  376. // If we are asking for the leader of a partition that didn't have a leader before,
  377. // we will do another metadata request.
  378. seedBroker.Returns(metadataPartial)
  379. // Still no leader for the partition, so asking for it should return an error.
  380. _, err = client.Leader("new_topic", 1)
  381. if err != ErrLeaderNotAvailable {
  382. t.Error("Expected ErrLeaderNotAvailable, got", err)
  383. }
  384. safeClose(t, client)
  385. seedBroker.Close()
  386. leader.Close()
  387. }
  388. func TestClientRefreshBehaviour(t *testing.T) {
  389. seedBroker := NewMockBroker(t, 1)
  390. leader := NewMockBroker(t, 5)
  391. metadataResponse1 := new(MetadataResponse)
  392. metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
  393. seedBroker.Returns(metadataResponse1)
  394. metadataResponse2 := new(MetadataResponse)
  395. metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, nil, ErrNoError)
  396. seedBroker.Returns(metadataResponse2)
  397. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  398. if err != nil {
  399. t.Fatal(err)
  400. }
  401. parts, err := client.Partitions("my_topic")
  402. if err != nil {
  403. t.Error(err)
  404. } else if len(parts) != 1 || parts[0] != 0xb {
  405. t.Error("Client returned incorrect partitions for my_topic:", parts)
  406. }
  407. tst, err := client.Leader("my_topic", 0xb)
  408. if err != nil {
  409. t.Error(err)
  410. } else if tst.ID() != 5 {
  411. t.Error("Leader for my_topic had incorrect ID.")
  412. }
  413. leader.Close()
  414. seedBroker.Close()
  415. safeClose(t, client)
  416. }
  417. func TestClientResurrectDeadSeeds(t *testing.T) {
  418. initialSeed := NewMockBroker(t, 0)
  419. emptyMetadata := new(MetadataResponse)
  420. initialSeed.Returns(emptyMetadata)
  421. conf := NewConfig()
  422. conf.Metadata.Retry.Backoff = 0
  423. conf.Metadata.RefreshFrequency = 0
  424. c, err := NewClient([]string{initialSeed.Addr()}, conf)
  425. if err != nil {
  426. t.Fatal(err)
  427. }
  428. initialSeed.Close()
  429. client := c.(*client)
  430. seed1 := NewMockBroker(t, 1)
  431. seed2 := NewMockBroker(t, 2)
  432. seed3 := NewMockBroker(t, 3)
  433. addr1 := seed1.Addr()
  434. addr2 := seed2.Addr()
  435. addr3 := seed3.Addr()
  436. // Overwrite the seed brokers with a fixed ordering to make this test deterministic.
  437. safeClose(t, client.seedBrokers[0])
  438. client.seedBrokers = []*Broker{NewBroker(addr1), NewBroker(addr2), NewBroker(addr3)}
  439. client.deadSeeds = []*Broker{}
  440. wg := sync.WaitGroup{}
  441. wg.Add(1)
  442. go func() {
  443. if err := client.RefreshMetadata(); err != nil {
  444. t.Error(err)
  445. }
  446. wg.Done()
  447. }()
  448. seed1.Close()
  449. seed2.Close()
  450. seed1 = NewMockBrokerAddr(t, 1, addr1)
  451. seed2 = NewMockBrokerAddr(t, 2, addr2)
  452. seed3.Close()
  453. seed1.Close()
  454. seed2.Returns(emptyMetadata)
  455. wg.Wait()
  456. if len(client.seedBrokers) != 2 {
  457. t.Error("incorrect number of live seeds")
  458. }
  459. if len(client.deadSeeds) != 1 {
  460. t.Error("incorrect number of dead seeds")
  461. }
  462. safeClose(t, c)
  463. }
  464. func TestClientController(t *testing.T) {
  465. seedBroker := NewMockBroker(t, 1)
  466. defer seedBroker.Close()
  467. controllerBroker := NewMockBroker(t, 2)
  468. defer controllerBroker.Close()
  469. seedBroker.SetHandlerByMap(map[string]MockResponse{
  470. "MetadataRequest": NewMockMetadataResponse(t).
  471. SetController(controllerBroker.BrokerID()).
  472. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  473. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()),
  474. })
  475. cfg := NewConfig()
  476. // test kafka version greater than 0.10.0.0
  477. cfg.Version = V0_10_0_0
  478. client1, err := NewClient([]string{seedBroker.Addr()}, cfg)
  479. if err != nil {
  480. t.Fatal(err)
  481. }
  482. defer safeClose(t, client1)
  483. broker, err := client1.Controller()
  484. if err != nil {
  485. t.Fatal(err)
  486. }
  487. if broker.Addr() != controllerBroker.Addr() {
  488. t.Errorf("Expected controller to have address %s, found %s", controllerBroker.Addr(), broker.Addr())
  489. }
  490. // test kafka version earlier than 0.10.0.0
  491. cfg.Version = V0_9_0_1
  492. client2, err := NewClient([]string{seedBroker.Addr()}, cfg)
  493. if err != nil {
  494. t.Fatal(err)
  495. }
  496. defer safeClose(t, client2)
  497. if _, err = client2.Controller(); err != ErrUnsupportedVersion {
  498. t.Errorf("Expected Contoller() to return %s, found %s", ErrUnsupportedVersion, err)
  499. }
  500. }
  501. func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
  502. seedBroker := NewMockBroker(t, 1)
  503. staleCoordinator := NewMockBroker(t, 2)
  504. freshCoordinator := NewMockBroker(t, 3)
  505. replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()}
  506. metadataResponse1 := new(MetadataResponse)
  507. metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID())
  508. metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID())
  509. metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  510. seedBroker.Returns(metadataResponse1)
  511. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  512. if err != nil {
  513. t.Fatal(err)
  514. }
  515. coordinatorResponse1 := new(ConsumerMetadataResponse)
  516. coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
  517. seedBroker.Returns(coordinatorResponse1)
  518. coordinatorResponse2 := new(ConsumerMetadataResponse)
  519. coordinatorResponse2.CoordinatorID = staleCoordinator.BrokerID()
  520. coordinatorResponse2.CoordinatorHost = "127.0.0.1"
  521. coordinatorResponse2.CoordinatorPort = staleCoordinator.Port()
  522. seedBroker.Returns(coordinatorResponse2)
  523. broker, err := client.Coordinator("my_group")
  524. if err != nil {
  525. t.Error(err)
  526. }
  527. if staleCoordinator.Addr() != broker.Addr() {
  528. t.Errorf("Expected coordinator to have address %s, found %s", staleCoordinator.Addr(), broker.Addr())
  529. }
  530. if staleCoordinator.BrokerID() != broker.ID() {
  531. t.Errorf("Expected coordinator to have ID %d, found %d", staleCoordinator.BrokerID(), broker.ID())
  532. }
  533. // Grab the cached value
  534. broker2, err := client.Coordinator("my_group")
  535. if err != nil {
  536. t.Error(err)
  537. }
  538. if broker2.Addr() != broker.Addr() {
  539. t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr())
  540. }
  541. coordinatorResponse3 := new(ConsumerMetadataResponse)
  542. coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID()
  543. coordinatorResponse3.CoordinatorHost = "127.0.0.1"
  544. coordinatorResponse3.CoordinatorPort = freshCoordinator.Port()
  545. seedBroker.Returns(coordinatorResponse3)
  546. // Refresh the locally cahced value because it's stale
  547. if err := client.RefreshCoordinator("my_group"); err != nil {
  548. t.Error(err)
  549. }
  550. // Grab the fresh value
  551. broker3, err := client.Coordinator("my_group")
  552. if err != nil {
  553. t.Error(err)
  554. }
  555. if broker3.Addr() != freshCoordinator.Addr() {
  556. t.Errorf("Expected the freshCoordinator to be returned, but found %s.", broker3.Addr())
  557. }
  558. freshCoordinator.Close()
  559. staleCoordinator.Close()
  560. seedBroker.Close()
  561. safeClose(t, client)
  562. }
  563. func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
  564. seedBroker := NewMockBroker(t, 1)
  565. coordinator := NewMockBroker(t, 2)
  566. metadataResponse1 := new(MetadataResponse)
  567. seedBroker.Returns(metadataResponse1)
  568. config := NewConfig()
  569. config.Metadata.Retry.Max = 1
  570. config.Metadata.Retry.Backoff = 0
  571. client, err := NewClient([]string{seedBroker.Addr()}, config)
  572. if err != nil {
  573. t.Fatal(err)
  574. }
  575. coordinatorResponse1 := new(ConsumerMetadataResponse)
  576. coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
  577. seedBroker.Returns(coordinatorResponse1)
  578. metadataResponse2 := new(MetadataResponse)
  579. metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition)
  580. seedBroker.Returns(metadataResponse2)
  581. replicas := []int32{coordinator.BrokerID()}
  582. metadataResponse3 := new(MetadataResponse)
  583. metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  584. seedBroker.Returns(metadataResponse3)
  585. coordinatorResponse2 := new(ConsumerMetadataResponse)
  586. coordinatorResponse2.CoordinatorID = coordinator.BrokerID()
  587. coordinatorResponse2.CoordinatorHost = "127.0.0.1"
  588. coordinatorResponse2.CoordinatorPort = coordinator.Port()
  589. seedBroker.Returns(coordinatorResponse2)
  590. broker, err := client.Coordinator("my_group")
  591. if err != nil {
  592. t.Error(err)
  593. }
  594. if coordinator.Addr() != broker.Addr() {
  595. t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr())
  596. }
  597. if coordinator.BrokerID() != broker.ID() {
  598. t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID())
  599. }
  600. coordinator.Close()
  601. seedBroker.Close()
  602. safeClose(t, client)
  603. }
  604. func TestClientAutorefreshShutdownRace(t *testing.T) {
  605. seedBroker := NewMockBroker(t, 1)
  606. metadataResponse := new(MetadataResponse)
  607. seedBroker.Returns(metadataResponse)
  608. conf := NewConfig()
  609. conf.Metadata.RefreshFrequency = 100 * time.Millisecond
  610. client, err := NewClient([]string{seedBroker.Addr()}, conf)
  611. if err != nil {
  612. t.Fatal(err)
  613. }
  614. // Wait for the background refresh to kick in
  615. time.Sleep(110 * time.Millisecond)
  616. done := make(chan none)
  617. go func() {
  618. // Close the client
  619. if err := client.Close(); err != nil {
  620. t.Fatal(err)
  621. }
  622. close(done)
  623. }()
  624. // Wait for the Close to kick in
  625. time.Sleep(10 * time.Millisecond)
  626. // Then return some metadata to the still-running background thread
  627. leader := NewMockBroker(t, 2)
  628. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  629. metadataResponse.AddTopicPartition("foo", 0, leader.BrokerID(), []int32{2}, []int32{2}, []int32{}, ErrNoError)
  630. seedBroker.Returns(metadataResponse)
  631. <-done
  632. seedBroker.Close()
  633. // give the update time to happen so we get a panic if it's still running (which it shouldn't)
  634. time.Sleep(10 * time.Millisecond)
  635. }