client_test.go 26 KB


  1. package sarama
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "sync/atomic"
  7. "testing"
  8. "time"
  9. )
  10. func safeClose(t testing.TB, c io.Closer) {
  11. err := c.Close()
  12. if err != nil {
  13. t.Error(err)
  14. }
  15. }
  16. func TestSimpleClient(t *testing.T) {
  17. seedBroker := NewMockBroker(t, 1)
  18. seedBroker.Returns(new(MetadataResponse))
  19. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  20. if err != nil {
  21. t.Fatal(err)
  22. }
  23. seedBroker.Close()
  24. safeClose(t, client)
  25. }
  26. func TestCachedPartitions(t *testing.T) {
  27. seedBroker := NewMockBroker(t, 1)
  28. replicas := []int32{3, 1, 5}
  29. isr := []int32{5, 1}
  30. metadataResponse := new(MetadataResponse)
  31. metadataResponse.AddBroker("localhost:12345", 2)
  32. metadataResponse.AddTopicPartition("my_topic", 0, 2, replicas, isr, []int32{}, ErrNoError)
  33. metadataResponse.AddTopicPartition("my_topic", 1, 2, replicas, isr, []int32{}, ErrLeaderNotAvailable)
  34. seedBroker.Returns(metadataResponse)
  35. config := NewConfig()
  36. config.Metadata.Retry.Max = 0
  37. c, err := NewClient([]string{seedBroker.Addr()}, config)
  38. if err != nil {
  39. t.Fatal(err)
  40. }
  41. client := c.(*client)
  42. // Verify they aren't cached the same
  43. allP := client.cachedPartitionsResults["my_topic"][allPartitions]
  44. writeP := client.cachedPartitionsResults["my_topic"][writablePartitions]
  45. if len(allP) == len(writeP) {
  46. t.Fatal("Invalid lengths!")
  47. }
  48. tmp := client.cachedPartitionsResults["my_topic"]
  49. // Verify we actually use the cache at all!
  50. tmp[allPartitions] = []int32{1, 2, 3, 4}
  51. client.cachedPartitionsResults["my_topic"] = tmp
  52. if 4 != len(client.cachedPartitions("my_topic", allPartitions)) {
  53. t.Fatal("Not using the cache!")
  54. }
  55. seedBroker.Close()
  56. safeClose(t, client)
  57. }
  58. func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
  59. seedBroker := NewMockBroker(t, 1)
  60. replicas := []int32{seedBroker.BrokerID()}
  61. metadataResponse := new(MetadataResponse)
  62. metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
  63. metadataResponse.AddTopicPartition("my_topic", 1, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  64. metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  65. seedBroker.Returns(metadataResponse)
  66. config := NewConfig()
  67. config.Metadata.Retry.Max = 0
  68. client, err := NewClient([]string{seedBroker.Addr()}, config)
  69. if err != nil {
  70. t.Fatal(err)
  71. }
  72. metadataResponse = new(MetadataResponse)
  73. metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
  74. seedBroker.Returns(metadataResponse)
  75. partitions, err := client.Partitions("unknown")
  76. if err != ErrUnknownTopicOrPartition {
  77. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  78. }
  79. if partitions != nil {
  80. t.Errorf("Should return nil as partition list, found %v", partitions)
  81. }
  82. // Should still use the cache of a known topic
  83. partitions, err = client.Partitions("my_topic")
  84. if err != nil {
  85. t.Errorf("Expected no error, found %v", err)
  86. }
  87. metadataResponse = new(MetadataResponse)
  88. metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
  89. seedBroker.Returns(metadataResponse)
  90. // Should not use cache for unknown topic
  91. partitions, err = client.Partitions("unknown")
  92. if err != ErrUnknownTopicOrPartition {
  93. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  94. }
  95. if partitions != nil {
  96. t.Errorf("Should return nil as partition list, found %v", partitions)
  97. }
  98. seedBroker.Close()
  99. safeClose(t, client)
  100. }
  101. func TestClientSeedBrokers(t *testing.T) {
  102. seedBroker := NewMockBroker(t, 1)
  103. metadataResponse := new(MetadataResponse)
  104. metadataResponse.AddBroker("localhost:12345", 2)
  105. seedBroker.Returns(metadataResponse)
  106. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  107. if err != nil {
  108. t.Fatal(err)
  109. }
  110. seedBroker.Close()
  111. safeClose(t, client)
  112. }
  113. func TestClientMetadata(t *testing.T) {
  114. seedBroker := NewMockBroker(t, 1)
  115. leader := NewMockBroker(t, 5)
  116. replicas := []int32{3, 1, 5}
  117. isr := []int32{5, 1}
  118. metadataResponse := new(MetadataResponse)
  119. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  120. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, []int32{}, ErrNoError)
  121. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrLeaderNotAvailable)
  122. seedBroker.Returns(metadataResponse)
  123. config := NewConfig()
  124. config.Metadata.Retry.Max = 0
  125. client, err := NewClient([]string{seedBroker.Addr()}, config)
  126. if err != nil {
  127. t.Fatal(err)
  128. }
  129. topics, err := client.Topics()
  130. if err != nil {
  131. t.Error(err)
  132. } else if len(topics) != 1 || topics[0] != "my_topic" {
  133. t.Error("Client returned incorrect topics:", topics)
  134. }
  135. parts, err := client.Partitions("my_topic")
  136. if err != nil {
  137. t.Error(err)
  138. } else if len(parts) != 2 || parts[0] != 0 || parts[1] != 1 {
  139. t.Error("Client returned incorrect partitions for my_topic:", parts)
  140. }
  141. parts, err = client.WritablePartitions("my_topic")
  142. if err != nil {
  143. t.Error(err)
  144. } else if len(parts) != 1 || parts[0] != 0 {
  145. t.Error("Client returned incorrect writable partitions for my_topic:", parts)
  146. }
  147. tst, err := client.Leader("my_topic", 0)
  148. if err != nil {
  149. t.Error(err)
  150. } else if tst.ID() != 5 {
  151. t.Error("Leader for my_topic had incorrect ID.")
  152. }
  153. replicas, err = client.Replicas("my_topic", 0)
  154. if err != nil {
  155. t.Error(err)
  156. } else if replicas[0] != 3 {
  157. t.Error("Incorrect (or sorted) replica")
  158. } else if replicas[1] != 1 {
  159. t.Error("Incorrect (or sorted) replica")
  160. } else if replicas[2] != 5 {
  161. t.Error("Incorrect (or sorted) replica")
  162. }
  163. isr, err = client.InSyncReplicas("my_topic", 0)
  164. if err != nil {
  165. t.Error(err)
  166. } else if len(isr) != 2 {
  167. t.Error("Client returned incorrect ISRs for partition:", isr)
  168. } else if isr[0] != 5 {
  169. t.Error("Incorrect (or sorted) ISR:", isr)
  170. } else if isr[1] != 1 {
  171. t.Error("Incorrect (or sorted) ISR:", isr)
  172. }
  173. leader.Close()
  174. seedBroker.Close()
  175. safeClose(t, client)
  176. }
  177. func TestClientMetadataWithOfflineReplicas(t *testing.T) {
  178. seedBroker := NewMockBroker(t, 1)
  179. leader := NewMockBroker(t, 5)
  180. replicas := []int32{1, 2, 3}
  181. isr := []int32{1, 2}
  182. offlineReplicas := []int32{3}
  183. metadataResponse := new(MetadataResponse)
  184. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  185. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, offlineReplicas, ErrNoError)
  186. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrNoError)
  187. metadataResponse.Version = 5
  188. seedBroker.Returns(metadataResponse)
  189. config := NewConfig()
  190. config.Version = V1_0_0_0
  191. config.Metadata.Retry.Max = 0
  192. client, err := NewClient([]string{seedBroker.Addr()}, config)
  193. if err != nil {
  194. t.Fatal(err)
  195. }
  196. topics, err := client.Topics()
  197. if err != nil {
  198. t.Error(err)
  199. } else if len(topics) != 1 || topics[0] != "my_topic" {
  200. t.Error("Client returned incorrect topics:", topics)
  201. }
  202. parts, err := client.Partitions("my_topic")
  203. if err != nil {
  204. t.Error(err)
  205. } else if len(parts) != 2 || parts[0] != 0 || parts[1] != 1 {
  206. t.Error("Client returned incorrect partitions for my_topic:", parts)
  207. }
  208. parts, err = client.WritablePartitions("my_topic")
  209. if err != nil {
  210. t.Error(err)
  211. } else if len(parts) != 2 {
  212. t.Error("Client returned incorrect writable partitions for my_topic:", parts)
  213. }
  214. tst, err := client.Leader("my_topic", 0)
  215. if err != nil {
  216. t.Error(err)
  217. } else if tst.ID() != 5 {
  218. t.Error("Leader for my_topic had incorrect ID.")
  219. }
  220. replicas, err = client.Replicas("my_topic", 0)
  221. if err != nil {
  222. t.Error(err)
  223. } else if replicas[0] != 1 {
  224. t.Error("Incorrect (or sorted) replica")
  225. } else if replicas[1] != 2 {
  226. t.Error("Incorrect (or sorted) replica")
  227. } else if replicas[2] != 3 {
  228. t.Error("Incorrect (or sorted) replica")
  229. }
  230. isr, err = client.InSyncReplicas("my_topic", 0)
  231. if err != nil {
  232. t.Error(err)
  233. } else if len(isr) != 2 {
  234. t.Error("Client returned incorrect ISRs for partition:", isr)
  235. } else if isr[0] != 1 {
  236. t.Error("Incorrect (or sorted) ISR:", isr)
  237. } else if isr[1] != 2 {
  238. t.Error("Incorrect (or sorted) ISR:", isr)
  239. }
  240. offlineReplicas, err = client.OfflineReplicas("my_topic", 0)
  241. if err != nil {
  242. t.Error(err)
  243. } else if len(offlineReplicas) != 1 {
  244. t.Error("Client returned incorrect offline replicas for partition:", offlineReplicas)
  245. } else if offlineReplicas[0] != 3 {
  246. t.Error("Incorrect offline replica:", offlineReplicas)
  247. }
  248. leader.Close()
  249. seedBroker.Close()
  250. safeClose(t, client)
  251. }
  252. func TestClientGetOffset(t *testing.T) {
  253. seedBroker := NewMockBroker(t, 1)
  254. leader := NewMockBroker(t, 2)
  255. leaderAddr := leader.Addr()
  256. metadata := new(MetadataResponse)
  257. metadata.AddTopicPartition("foo", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  258. metadata.AddBroker(leaderAddr, leader.BrokerID())
  259. seedBroker.Returns(metadata)
  260. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  261. if err != nil {
  262. t.Fatal(err)
  263. }
  264. offsetResponse := new(OffsetResponse)
  265. offsetResponse.AddTopicPartition("foo", 0, 123)
  266. leader.Returns(offsetResponse)
  267. offset, err := client.GetOffset("foo", 0, OffsetNewest)
  268. if err != nil {
  269. t.Error(err)
  270. }
  271. if offset != 123 {
  272. t.Error("Unexpected offset, got ", offset)
  273. }
  274. leader.Close()
  275. seedBroker.Returns(metadata)
  276. leader = NewMockBrokerAddr(t, 2, leaderAddr)
  277. offsetResponse = new(OffsetResponse)
  278. offsetResponse.AddTopicPartition("foo", 0, 456)
  279. leader.Returns(offsetResponse)
  280. offset, err = client.GetOffset("foo", 0, OffsetNewest)
  281. if err != nil {
  282. t.Error(err)
  283. }
  284. if offset != 456 {
  285. t.Error("Unexpected offset, got ", offset)
  286. }
  287. seedBroker.Close()
  288. leader.Close()
  289. safeClose(t, client)
  290. }
  291. func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) {
  292. seedBroker := NewMockBroker(t, 1)
  293. metadataResponse1 := new(MetadataResponse)
  294. seedBroker.Returns(metadataResponse1)
  295. retryCount := int32(0)
  296. config := NewConfig()
  297. config.Metadata.Retry.Max = 1
  298. config.Metadata.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
  299. atomic.AddInt32(&retryCount, 1)
  300. return 0
  301. }
  302. client, err := NewClient([]string{seedBroker.Addr()}, config)
  303. if err != nil {
  304. t.Fatal(err)
  305. }
  306. metadataUnknownTopic := new(MetadataResponse)
  307. metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
  308. seedBroker.Returns(metadataUnknownTopic)
  309. seedBroker.Returns(metadataUnknownTopic)
  310. if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
  311. t.Error("ErrUnknownTopicOrPartition expected, got", err)
  312. }
  313. safeClose(t, client)
  314. seedBroker.Close()
  315. actualRetryCount := atomic.LoadInt32(&retryCount)
  316. if actualRetryCount != 1 {
  317. t.Fatalf("Expected BackoffFunc to be called exactly once, but saw %d", actualRetryCount)
  318. }
  319. }
  320. func TestClientReceivingUnknownTopic(t *testing.T) {
  321. seedBroker := NewMockBroker(t, 1)
  322. metadataResponse1 := new(MetadataResponse)
  323. seedBroker.Returns(metadataResponse1)
  324. config := NewConfig()
  325. config.Metadata.Retry.Max = 1
  326. config.Metadata.Retry.Backoff = 0
  327. client, err := NewClient([]string{seedBroker.Addr()}, config)
  328. if err != nil {
  329. t.Fatal(err)
  330. }
  331. metadataUnknownTopic := new(MetadataResponse)
  332. metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
  333. seedBroker.Returns(metadataUnknownTopic)
  334. seedBroker.Returns(metadataUnknownTopic)
  335. if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
  336. t.Error("ErrUnknownTopicOrPartition expected, got", err)
  337. }
  338. // If we are asking for the leader of a partition of the non-existing topic.
  339. // we will request metadata again.
  340. seedBroker.Returns(metadataUnknownTopic)
  341. seedBroker.Returns(metadataUnknownTopic)
  342. if _, err = client.Leader("new_topic", 1); err != ErrUnknownTopicOrPartition {
  343. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  344. }
  345. safeClose(t, client)
  346. seedBroker.Close()
  347. }
  348. func TestClientReceivingPartialMetadata(t *testing.T) {
  349. seedBroker := NewMockBroker(t, 1)
  350. leader := NewMockBroker(t, 5)
  351. metadataResponse1 := new(MetadataResponse)
  352. metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
  353. seedBroker.Returns(metadataResponse1)
  354. config := NewConfig()
  355. config.Metadata.Retry.Max = 0
  356. client, err := NewClient([]string{seedBroker.Addr()}, config)
  357. if err != nil {
  358. t.Fatal(err)
  359. }
  360. replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()}
  361. metadataPartial := new(MetadataResponse)
  362. metadataPartial.AddBroker(seedBroker.Addr(), 1)
  363. metadataPartial.AddBroker(leader.Addr(), 5)
  364. metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable)
  365. metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, []int32{}, ErrNoError)
  366. metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, []int32{}, ErrLeaderNotAvailable)
  367. seedBroker.Returns(metadataPartial)
  368. if err := client.RefreshMetadata("new_topic"); err != nil {
  369. t.Error("ErrLeaderNotAvailable should not make RefreshMetadata respond with an error")
  370. }
  371. // Even though the metadata was incomplete, we should be able to get the leader of a partition
  372. // for which we did get a useful response, without doing additional requests.
  373. partition0Leader, err := client.Leader("new_topic", 0)
  374. if err != nil {
  375. t.Error(err)
  376. } else if partition0Leader.Addr() != leader.Addr() {
  377. t.Error("Unexpected leader returned", partition0Leader.Addr())
  378. }
  379. // If we are asking for the leader of a partition that didn't have a leader before,
  380. // we will do another metadata request.
  381. seedBroker.Returns(metadataPartial)
  382. // Still no leader for the partition, so asking for it should return an error.
  383. _, err = client.Leader("new_topic", 1)
  384. if err != ErrLeaderNotAvailable {
  385. t.Error("Expected ErrLeaderNotAvailable, got", err)
  386. }
  387. safeClose(t, client)
  388. seedBroker.Close()
  389. leader.Close()
  390. }
  391. func TestClientRefreshBehaviour(t *testing.T) {
  392. seedBroker := NewMockBroker(t, 1)
  393. leader := NewMockBroker(t, 5)
  394. metadataResponse1 := new(MetadataResponse)
  395. metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
  396. seedBroker.Returns(metadataResponse1)
  397. metadataResponse2 := new(MetadataResponse)
  398. metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID())
  399. metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, nil, ErrNoError)
  400. seedBroker.Returns(metadataResponse2)
  401. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  402. if err != nil {
  403. t.Fatal(err)
  404. }
  405. parts, err := client.Partitions("my_topic")
  406. if err != nil {
  407. t.Error(err)
  408. } else if len(parts) != 1 || parts[0] != 0xb {
  409. t.Error("Client returned incorrect partitions for my_topic:", parts)
  410. }
  411. tst, err := client.Leader("my_topic", 0xb)
  412. if err != nil {
  413. t.Error(err)
  414. } else if tst.ID() != 5 {
  415. t.Error("Leader for my_topic had incorrect ID.")
  416. }
  417. leader.Close()
  418. seedBroker.Close()
  419. safeClose(t, client)
  420. }
  421. func TestClientRefreshBrokers(t *testing.T) {
  422. initialSeed := NewMockBroker(t, 0)
  423. leader := NewMockBroker(t, 5)
  424. metadataResponse1 := new(MetadataResponse)
  425. metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
  426. metadataResponse1.AddBroker(initialSeed.Addr(), initialSeed.BrokerID())
  427. initialSeed.Returns(metadataResponse1)
  428. c, err := NewClient([]string{initialSeed.Addr()}, nil)
  429. client := c.(*client)
  430. if err != nil {
  431. t.Fatal(err)
  432. }
  433. if len(client.Brokers()) != 2 {
  434. t.Error("Meta broker is not 2")
  435. }
  436. newSeedBrokers := []string{"localhost:12345"}
  437. _ = client.RefreshBrokers(newSeedBrokers)
  438. if client.seedBrokers[0].addr != newSeedBrokers[0] {
  439. t.Error("Seed broker not updated")
  440. }
  441. if len(client.Brokers()) != 0 {
  442. t.Error("Old brokers not closed")
  443. }
  444. }
  445. func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
  446. seedBroker := NewMockBroker(t, 1)
  447. leader := NewMockBroker(t, 5)
  448. metadataResponse1 := new(MetadataResponse)
  449. metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
  450. metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
  451. seedBroker.Returns(metadataResponse1)
  452. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  453. if err != nil {
  454. t.Fatal(err)
  455. }
  456. if len(client.Brokers()) != 2 {
  457. t.Error("Meta broker is not 2")
  458. }
  459. metadataResponse2 := new(MetadataResponse)
  460. metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID())
  461. seedBroker.Returns(metadataResponse2)
  462. if err := client.RefreshMetadata(); err != nil {
  463. t.Error(err)
  464. }
  465. if len(client.Brokers()) != 1 {
  466. t.Error("Meta broker is not 1")
  467. }
  468. }
  469. func TestClientResurrectDeadSeeds(t *testing.T) {
  470. initialSeed := NewMockBroker(t, 0)
  471. emptyMetadata := new(MetadataResponse)
  472. initialSeed.Returns(emptyMetadata)
  473. conf := NewConfig()
  474. conf.Metadata.Retry.Backoff = 0
  475. conf.Metadata.RefreshFrequency = 0
  476. c, err := NewClient([]string{initialSeed.Addr()}, conf)
  477. if err != nil {
  478. t.Fatal(err)
  479. }
  480. initialSeed.Close()
  481. client := c.(*client)
  482. seed1 := NewMockBroker(t, 1)
  483. seed2 := NewMockBroker(t, 2)
  484. seed3 := NewMockBroker(t, 3)
  485. addr1 := seed1.Addr()
  486. addr2 := seed2.Addr()
  487. addr3 := seed3.Addr()
  488. // Overwrite the seed brokers with a fixed ordering to make this test deterministic.
  489. safeClose(t, client.seedBrokers[0])
  490. client.seedBrokers = []*Broker{NewBroker(addr1), NewBroker(addr2), NewBroker(addr3)}
  491. client.deadSeeds = []*Broker{}
  492. wg := sync.WaitGroup{}
  493. wg.Add(1)
  494. go func() {
  495. if err := client.RefreshMetadata(); err != nil {
  496. t.Error(err)
  497. }
  498. wg.Done()
  499. }()
  500. seed1.Close()
  501. seed2.Close()
  502. seed1 = NewMockBrokerAddr(t, 1, addr1)
  503. seed2 = NewMockBrokerAddr(t, 2, addr2)
  504. seed3.Close()
  505. seed1.Close()
  506. seed2.Returns(emptyMetadata)
  507. wg.Wait()
  508. if len(client.seedBrokers) != 2 {
  509. t.Error("incorrect number of live seeds")
  510. }
  511. if len(client.deadSeeds) != 1 {
  512. t.Error("incorrect number of dead seeds")
  513. }
  514. safeClose(t, c)
  515. }
  516. func TestClientController(t *testing.T) {
  517. seedBroker := NewMockBroker(t, 1)
  518. defer seedBroker.Close()
  519. controllerBroker := NewMockBroker(t, 2)
  520. defer controllerBroker.Close()
  521. seedBroker.SetHandlerByMap(map[string]MockResponse{
  522. "MetadataRequest": NewMockMetadataResponse(t).
  523. SetController(controllerBroker.BrokerID()).
  524. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  525. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()),
  526. })
  527. cfg := NewConfig()
  528. // test kafka version greater than 0.10.0.0
  529. cfg.Version = V0_10_0_0
  530. client1, err := NewClient([]string{seedBroker.Addr()}, cfg)
  531. if err != nil {
  532. t.Fatal(err)
  533. }
  534. defer safeClose(t, client1)
  535. broker, err := client1.Controller()
  536. if err != nil {
  537. t.Fatal(err)
  538. }
  539. if broker.Addr() != controllerBroker.Addr() {
  540. t.Errorf("Expected controller to have address %s, found %s", controllerBroker.Addr(), broker.Addr())
  541. }
  542. // test kafka version earlier than 0.10.0.0
  543. cfg.Version = V0_9_0_1
  544. client2, err := NewClient([]string{seedBroker.Addr()}, cfg)
  545. if err != nil {
  546. t.Fatal(err)
  547. }
  548. defer safeClose(t, client2)
  549. if _, err = client2.Controller(); err != ErrUnsupportedVersion {
  550. t.Errorf("Expected Controller() to return %s, found %s", ErrUnsupportedVersion, err)
  551. }
  552. }
  553. func TestClientMetadataTimeout(t *testing.T) {
  554. for _, timeout := range []time.Duration{
  555. 250 * time.Millisecond, // Will cut the first retry pass
  556. 500 * time.Millisecond, // Will cut the second retry pass
  557. 750 * time.Millisecond, // Will cut the third retry pass
  558. 900 * time.Millisecond, // Will stop after the three retries
  559. } {
  560. t.Run(fmt.Sprintf("timeout=%v", timeout), func(t *testing.T) {
  561. // Use a responsive broker to create a working client
  562. initialSeed := NewMockBroker(t, 0)
  563. emptyMetadata := new(MetadataResponse)
  564. initialSeed.Returns(emptyMetadata)
  565. conf := NewConfig()
  566. // Speed up the metadata request failure because of a read timeout
  567. conf.Net.ReadTimeout = 100 * time.Millisecond
  568. // Disable backoff and refresh
  569. conf.Metadata.Retry.Backoff = 0
  570. conf.Metadata.RefreshFrequency = 0
  571. // But configure a "global" timeout
  572. conf.Metadata.Timeout = timeout
  573. c, err := NewClient([]string{initialSeed.Addr()}, conf)
  574. if err != nil {
  575. t.Fatal(err)
  576. }
  577. initialSeed.Close()
  578. client := c.(*client)
  579. // Start seed brokers that do not reply to anything and therefore a read
  580. // on the TCP connection will timeout to simulate unresponsive brokers
  581. seed1 := NewMockBroker(t, 1)
  582. defer seed1.Close()
  583. seed2 := NewMockBroker(t, 2)
  584. defer seed2.Close()
  585. // Overwrite the seed brokers with a fixed ordering to make this test deterministic
  586. safeClose(t, client.seedBrokers[0])
  587. client.seedBrokers = []*Broker{NewBroker(seed1.Addr()), NewBroker(seed2.Addr())}
  588. client.deadSeeds = []*Broker{}
  589. // Start refreshing metadata in the background
  590. errChan := make(chan error)
  591. go func() {
  592. errChan <- c.RefreshMetadata()
  593. }()
  594. // Check that the refresh fails fast enough (less than twice the configured timeout)
  595. // instead of at least: 100 ms * 2 brokers * 3 retries = 800 ms
  596. maxRefreshDuration := 2 * timeout
  597. select {
  598. case err := <-errChan:
  599. if err == nil {
  600. t.Fatal("Expected failed RefreshMetadata, got nil")
  601. }
  602. if err != ErrOutOfBrokers {
  603. t.Error("Expected failed RefreshMetadata with ErrOutOfBrokers, got:", err)
  604. }
  605. case <-time.After(maxRefreshDuration):
  606. t.Fatalf("RefreshMetadata did not fail fast enough after waiting for %v", maxRefreshDuration)
  607. }
  608. safeClose(t, c)
  609. })
  610. }
  611. }
  612. func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
  613. seedBroker := NewMockBroker(t, 1)
  614. staleCoordinator := NewMockBroker(t, 2)
  615. freshCoordinator := NewMockBroker(t, 3)
  616. replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()}
  617. metadataResponse1 := new(MetadataResponse)
  618. metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID())
  619. metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID())
  620. metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  621. seedBroker.Returns(metadataResponse1)
  622. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  623. if err != nil {
  624. t.Fatal(err)
  625. }
  626. coordinatorResponse1 := new(ConsumerMetadataResponse)
  627. coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
  628. seedBroker.Returns(coordinatorResponse1)
  629. coordinatorResponse2 := new(ConsumerMetadataResponse)
  630. coordinatorResponse2.CoordinatorID = staleCoordinator.BrokerID()
  631. coordinatorResponse2.CoordinatorHost = "127.0.0.1"
  632. coordinatorResponse2.CoordinatorPort = staleCoordinator.Port()
  633. seedBroker.Returns(coordinatorResponse2)
  634. broker, err := client.Coordinator("my_group")
  635. if err != nil {
  636. t.Error(err)
  637. }
  638. if staleCoordinator.Addr() != broker.Addr() {
  639. t.Errorf("Expected coordinator to have address %s, found %s", staleCoordinator.Addr(), broker.Addr())
  640. }
  641. if staleCoordinator.BrokerID() != broker.ID() {
  642. t.Errorf("Expected coordinator to have ID %d, found %d", staleCoordinator.BrokerID(), broker.ID())
  643. }
  644. // Grab the cached value
  645. broker2, err := client.Coordinator("my_group")
  646. if err != nil {
  647. t.Error(err)
  648. }
  649. if broker2.Addr() != broker.Addr() {
  650. t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr())
  651. }
  652. coordinatorResponse3 := new(ConsumerMetadataResponse)
  653. coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID()
  654. coordinatorResponse3.CoordinatorHost = "127.0.0.1"
  655. coordinatorResponse3.CoordinatorPort = freshCoordinator.Port()
  656. seedBroker.Returns(coordinatorResponse3)
  657. // Refresh the locally cahced value because it's stale
  658. if err := client.RefreshCoordinator("my_group"); err != nil {
  659. t.Error(err)
  660. }
  661. // Grab the fresh value
  662. broker3, err := client.Coordinator("my_group")
  663. if err != nil {
  664. t.Error(err)
  665. }
  666. if broker3.Addr() != freshCoordinator.Addr() {
  667. t.Errorf("Expected the freshCoordinator to be returned, but found %s.", broker3.Addr())
  668. }
  669. freshCoordinator.Close()
  670. staleCoordinator.Close()
  671. seedBroker.Close()
  672. safeClose(t, client)
  673. }
  674. func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
  675. seedBroker := NewMockBroker(t, 1)
  676. coordinator := NewMockBroker(t, 2)
  677. metadataResponse1 := new(MetadataResponse)
  678. seedBroker.Returns(metadataResponse1)
  679. config := NewConfig()
  680. config.Metadata.Retry.Max = 1
  681. config.Metadata.Retry.Backoff = 0
  682. client, err := NewClient([]string{seedBroker.Addr()}, config)
  683. if err != nil {
  684. t.Fatal(err)
  685. }
  686. coordinatorResponse1 := new(ConsumerMetadataResponse)
  687. coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
  688. seedBroker.Returns(coordinatorResponse1)
  689. metadataResponse2 := new(MetadataResponse)
  690. metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition)
  691. seedBroker.Returns(metadataResponse2)
  692. replicas := []int32{coordinator.BrokerID()}
  693. metadataResponse3 := new(MetadataResponse)
  694. metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  695. seedBroker.Returns(metadataResponse3)
  696. coordinatorResponse2 := new(ConsumerMetadataResponse)
  697. coordinatorResponse2.CoordinatorID = coordinator.BrokerID()
  698. coordinatorResponse2.CoordinatorHost = "127.0.0.1"
  699. coordinatorResponse2.CoordinatorPort = coordinator.Port()
  700. seedBroker.Returns(coordinatorResponse2)
  701. broker, err := client.Coordinator("my_group")
  702. if err != nil {
  703. t.Error(err)
  704. }
  705. if coordinator.Addr() != broker.Addr() {
  706. t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr())
  707. }
  708. if coordinator.BrokerID() != broker.ID() {
  709. t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID())
  710. }
  711. coordinator.Close()
  712. seedBroker.Close()
  713. safeClose(t, client)
  714. }
  715. func TestClientAutorefreshShutdownRace(t *testing.T) {
  716. seedBroker := NewMockBroker(t, 1)
  717. metadataResponse := new(MetadataResponse)
  718. seedBroker.Returns(metadataResponse)
  719. conf := NewConfig()
  720. conf.Metadata.RefreshFrequency = 100 * time.Millisecond
  721. client, err := NewClient([]string{seedBroker.Addr()}, conf)
  722. if err != nil {
  723. t.Fatal(err)
  724. }
  725. // Wait for the background refresh to kick in
  726. time.Sleep(110 * time.Millisecond)
  727. errCh := make(chan error, 1)
  728. go func() {
  729. // Close the client
  730. errCh <- client.Close()
  731. close(errCh)
  732. }()
  733. // Wait for the Close to kick in
  734. time.Sleep(10 * time.Millisecond)
  735. // Then return some metadata to the still-running background thread
  736. leader := NewMockBroker(t, 2)
  737. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  738. metadataResponse.AddTopicPartition("foo", 0, leader.BrokerID(), []int32{2}, []int32{2}, []int32{}, ErrNoError)
  739. seedBroker.Returns(metadataResponse)
  740. err = <-errCh
  741. if err != nil {
  742. t.Fatalf("goroutine client.Close():%s", err)
  743. }
  744. seedBroker.Close()
  745. // give the update time to happen so we get a panic if it's still running (which it shouldn't)
  746. time.Sleep(10 * time.Millisecond)
  747. }