client_test.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858
  1. package sarama
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "sync/atomic"
  7. "testing"
  8. "time"
  9. )
  10. func safeClose(t testing.TB, c io.Closer) {
  11. err := c.Close()
  12. if err != nil {
  13. t.Error(err)
  14. }
  15. }
  16. func TestSimpleClient(t *testing.T) {
  17. seedBroker := NewMockBroker(t, 1)
  18. seedBroker.Returns(new(MetadataResponse))
  19. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  20. if err != nil {
  21. t.Fatal(err)
  22. }
  23. seedBroker.Close()
  24. safeClose(t, client)
  25. }
  26. func TestCachedPartitions(t *testing.T) {
  27. seedBroker := NewMockBroker(t, 1)
  28. replicas := []int32{3, 1, 5}
  29. isr := []int32{5, 1}
  30. metadataResponse := new(MetadataResponse)
  31. metadataResponse.AddBroker("localhost:12345", 2)
  32. metadataResponse.AddTopicPartition("my_topic", 0, 2, replicas, isr, []int32{}, ErrNoError)
  33. metadataResponse.AddTopicPartition("my_topic", 1, 2, replicas, isr, []int32{}, ErrLeaderNotAvailable)
  34. seedBroker.Returns(metadataResponse)
  35. config := NewConfig()
  36. config.Metadata.Retry.Max = 0
  37. c, err := NewClient([]string{seedBroker.Addr()}, config)
  38. if err != nil {
  39. t.Fatal(err)
  40. }
  41. client := c.(*client)
  42. // Verify they aren't cached the same
  43. allP := client.cachedPartitionsResults["my_topic"][allPartitions]
  44. writeP := client.cachedPartitionsResults["my_topic"][writablePartitions]
  45. if len(allP) == len(writeP) {
  46. t.Fatal("Invalid lengths!")
  47. }
  48. tmp := client.cachedPartitionsResults["my_topic"]
  49. // Verify we actually use the cache at all!
  50. tmp[allPartitions] = []int32{1, 2, 3, 4}
  51. client.cachedPartitionsResults["my_topic"] = tmp
  52. if 4 != len(client.cachedPartitions("my_topic", allPartitions)) {
  53. t.Fatal("Not using the cache!")
  54. }
  55. seedBroker.Close()
  56. safeClose(t, client)
  57. }
  58. func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
  59. seedBroker := NewMockBroker(t, 1)
  60. replicas := []int32{seedBroker.BrokerID()}
  61. metadataResponse := new(MetadataResponse)
  62. metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
  63. metadataResponse.AddTopicPartition("my_topic", 1, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  64. metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  65. seedBroker.Returns(metadataResponse)
  66. config := NewConfig()
  67. config.Metadata.Retry.Max = 0
  68. client, err := NewClient([]string{seedBroker.Addr()}, config)
  69. if err != nil {
  70. t.Fatal(err)
  71. }
  72. metadataResponse = new(MetadataResponse)
  73. metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
  74. seedBroker.Returns(metadataResponse)
  75. partitions, err := client.Partitions("unknown")
  76. if err != ErrUnknownTopicOrPartition {
  77. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  78. }
  79. if partitions != nil {
  80. t.Errorf("Should return nil as partition list, found %v", partitions)
  81. }
  82. // Should still use the cache of a known topic
  83. partitions, err = client.Partitions("my_topic")
  84. if err != nil {
  85. t.Errorf("Expected no error, found %v", err)
  86. }
  87. metadataResponse = new(MetadataResponse)
  88. metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
  89. seedBroker.Returns(metadataResponse)
  90. // Should not use cache for unknown topic
  91. partitions, err = client.Partitions("unknown")
  92. if err != ErrUnknownTopicOrPartition {
  93. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  94. }
  95. if partitions != nil {
  96. t.Errorf("Should return nil as partition list, found %v", partitions)
  97. }
  98. seedBroker.Close()
  99. safeClose(t, client)
  100. }
  101. func TestClientSeedBrokers(t *testing.T) {
  102. seedBroker := NewMockBroker(t, 1)
  103. metadataResponse := new(MetadataResponse)
  104. metadataResponse.AddBroker("localhost:12345", 2)
  105. seedBroker.Returns(metadataResponse)
  106. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  107. if err != nil {
  108. t.Fatal(err)
  109. }
  110. seedBroker.Close()
  111. safeClose(t, client)
  112. }
  113. func TestClientMetadata(t *testing.T) {
  114. seedBroker := NewMockBroker(t, 1)
  115. leader := NewMockBroker(t, 5)
  116. replicas := []int32{3, 1, 5}
  117. isr := []int32{5, 1}
  118. metadataResponse := new(MetadataResponse)
  119. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  120. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, []int32{}, ErrNoError)
  121. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrLeaderNotAvailable)
  122. seedBroker.Returns(metadataResponse)
  123. config := NewConfig()
  124. config.Metadata.Retry.Max = 0
  125. client, err := NewClient([]string{seedBroker.Addr()}, config)
  126. if err != nil {
  127. t.Fatal(err)
  128. }
  129. topics, err := client.Topics()
  130. if err != nil {
  131. t.Error(err)
  132. } else if len(topics) != 1 || topics[0] != "my_topic" {
  133. t.Error("Client returned incorrect topics:", topics)
  134. }
  135. parts, err := client.Partitions("my_topic")
  136. if err != nil {
  137. t.Error(err)
  138. } else if len(parts) != 2 || parts[0] != 0 || parts[1] != 1 {
  139. t.Error("Client returned incorrect partitions for my_topic:", parts)
  140. }
  141. parts, err = client.WritablePartitions("my_topic")
  142. if err != nil {
  143. t.Error(err)
  144. } else if len(parts) != 1 || parts[0] != 0 {
  145. t.Error("Client returned incorrect writable partitions for my_topic:", parts)
  146. }
  147. tst, err := client.Leader("my_topic", 0)
  148. if err != nil {
  149. t.Error(err)
  150. } else if tst.ID() != 5 {
  151. t.Error("Leader for my_topic had incorrect ID.")
  152. }
  153. replicas, err = client.Replicas("my_topic", 0)
  154. if err != nil {
  155. t.Error(err)
  156. } else if replicas[0] != 3 {
  157. t.Error("Incorrect (or sorted) replica")
  158. } else if replicas[1] != 1 {
  159. t.Error("Incorrect (or sorted) replica")
  160. } else if replicas[2] != 5 {
  161. t.Error("Incorrect (or sorted) replica")
  162. }
  163. isr, err = client.InSyncReplicas("my_topic", 0)
  164. if err != nil {
  165. t.Error(err)
  166. } else if len(isr) != 2 {
  167. t.Error("Client returned incorrect ISRs for partition:", isr)
  168. } else if isr[0] != 5 {
  169. t.Error("Incorrect (or sorted) ISR:", isr)
  170. } else if isr[1] != 1 {
  171. t.Error("Incorrect (or sorted) ISR:", isr)
  172. }
  173. leader.Close()
  174. seedBroker.Close()
  175. safeClose(t, client)
  176. }
  177. func TestClientMetadataWithOfflineReplicas(t *testing.T) {
  178. seedBroker := NewMockBroker(t, 1)
  179. leader := NewMockBroker(t, 5)
  180. replicas := []int32{1, 2, 3}
  181. isr := []int32{1, 2}
  182. offlineReplicas := []int32{3}
  183. metadataResponse := new(MetadataResponse)
  184. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  185. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, offlineReplicas, ErrNoError)
  186. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrNoError)
  187. metadataResponse.Version = 5
  188. seedBroker.Returns(metadataResponse)
  189. config := NewConfig()
  190. config.Version = V1_0_0_0
  191. config.Metadata.Retry.Max = 0
  192. client, err := NewClient([]string{seedBroker.Addr()}, config)
  193. if err != nil {
  194. t.Fatal(err)
  195. }
  196. topics, err := client.Topics()
  197. if err != nil {
  198. t.Error(err)
  199. } else if len(topics) != 1 || topics[0] != "my_topic" {
  200. t.Error("Client returned incorrect topics:", topics)
  201. }
  202. parts, err := client.Partitions("my_topic")
  203. if err != nil {
  204. t.Error(err)
  205. } else if len(parts) != 2 || parts[0] != 0 || parts[1] != 1 {
  206. t.Error("Client returned incorrect partitions for my_topic:", parts)
  207. }
  208. parts, err = client.WritablePartitions("my_topic")
  209. if err != nil {
  210. t.Error(err)
  211. } else if len(parts) != 2 {
  212. t.Error("Client returned incorrect writable partitions for my_topic:", parts)
  213. }
  214. tst, err := client.Leader("my_topic", 0)
  215. if err != nil {
  216. t.Error(err)
  217. } else if tst.ID() != 5 {
  218. t.Error("Leader for my_topic had incorrect ID.")
  219. }
  220. replicas, err = client.Replicas("my_topic", 0)
  221. if err != nil {
  222. t.Error(err)
  223. } else if replicas[0] != 1 {
  224. t.Error("Incorrect (or sorted) replica")
  225. } else if replicas[1] != 2 {
  226. t.Error("Incorrect (or sorted) replica")
  227. } else if replicas[2] != 3 {
  228. t.Error("Incorrect (or sorted) replica")
  229. }
  230. isr, err = client.InSyncReplicas("my_topic", 0)
  231. if err != nil {
  232. t.Error(err)
  233. } else if len(isr) != 2 {
  234. t.Error("Client returned incorrect ISRs for partition:", isr)
  235. } else if isr[0] != 1 {
  236. t.Error("Incorrect (or sorted) ISR:", isr)
  237. } else if isr[1] != 2 {
  238. t.Error("Incorrect (or sorted) ISR:", isr)
  239. }
  240. offlineReplicas, err = client.OfflineReplicas("my_topic", 0)
  241. if err != nil {
  242. t.Error(err)
  243. } else if len(offlineReplicas) != 1 {
  244. t.Error("Client returned incorrect offline replicas for partition:", offlineReplicas)
  245. } else if offlineReplicas[0] != 3 {
  246. t.Error("Incorrect offline replica:", offlineReplicas)
  247. }
  248. leader.Close()
  249. seedBroker.Close()
  250. safeClose(t, client)
  251. }
  252. func TestClientGetOffset(t *testing.T) {
  253. seedBroker := NewMockBroker(t, 1)
  254. leader := NewMockBroker(t, 2)
  255. leaderAddr := leader.Addr()
  256. metadata := new(MetadataResponse)
  257. metadata.AddTopicPartition("foo", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  258. metadata.AddBroker(leaderAddr, leader.BrokerID())
  259. seedBroker.Returns(metadata)
  260. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  261. if err != nil {
  262. t.Fatal(err)
  263. }
  264. offsetResponse := new(OffsetResponse)
  265. offsetResponse.AddTopicPartition("foo", 0, 123)
  266. leader.Returns(offsetResponse)
  267. offset, err := client.GetOffset("foo", 0, OffsetNewest)
  268. if err != nil {
  269. t.Error(err)
  270. }
  271. if offset != 123 {
  272. t.Error("Unexpected offset, got ", offset)
  273. }
  274. leader.Close()
  275. seedBroker.Returns(metadata)
  276. leader = NewMockBrokerAddr(t, 2, leaderAddr)
  277. offsetResponse = new(OffsetResponse)
  278. offsetResponse.AddTopicPartition("foo", 0, 456)
  279. leader.Returns(offsetResponse)
  280. offset, err = client.GetOffset("foo", 0, OffsetNewest)
  281. if err != nil {
  282. t.Error(err)
  283. }
  284. if offset != 456 {
  285. t.Error("Unexpected offset, got ", offset)
  286. }
  287. seedBroker.Close()
  288. leader.Close()
  289. safeClose(t, client)
  290. }
  291. func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) {
  292. seedBroker := NewMockBroker(t, 1)
  293. metadataResponse1 := new(MetadataResponse)
  294. seedBroker.Returns(metadataResponse1)
  295. retryCount := int32(0)
  296. config := NewConfig()
  297. config.Metadata.Retry.Max = 1
  298. config.Metadata.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
  299. atomic.AddInt32(&retryCount, 1)
  300. return 0
  301. }
  302. client, err := NewClient([]string{seedBroker.Addr()}, config)
  303. if err != nil {
  304. t.Fatal(err)
  305. }
  306. metadataUnknownTopic := new(MetadataResponse)
  307. metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
  308. seedBroker.Returns(metadataUnknownTopic)
  309. seedBroker.Returns(metadataUnknownTopic)
  310. if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
  311. t.Error("ErrUnknownTopicOrPartition expected, got", err)
  312. }
  313. safeClose(t, client)
  314. seedBroker.Close()
  315. actualRetryCount := atomic.LoadInt32(&retryCount)
  316. if actualRetryCount != 1 {
  317. t.Fatalf("Expected BackoffFunc to be called exactly once, but saw %d", actualRetryCount)
  318. }
  319. }
  320. func TestClientReceivingUnknownTopic(t *testing.T) {
  321. seedBroker := NewMockBroker(t, 1)
  322. metadataResponse1 := new(MetadataResponse)
  323. seedBroker.Returns(metadataResponse1)
  324. config := NewConfig()
  325. config.Metadata.Retry.Max = 1
  326. config.Metadata.Retry.Backoff = 0
  327. client, err := NewClient([]string{seedBroker.Addr()}, config)
  328. if err != nil {
  329. t.Fatal(err)
  330. }
  331. metadataUnknownTopic := new(MetadataResponse)
  332. metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
  333. seedBroker.Returns(metadataUnknownTopic)
  334. seedBroker.Returns(metadataUnknownTopic)
  335. if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
  336. t.Error("ErrUnknownTopicOrPartition expected, got", err)
  337. }
  338. // If we are asking for the leader of a partition of the non-existing topic.
  339. // we will request metadata again.
  340. seedBroker.Returns(metadataUnknownTopic)
  341. seedBroker.Returns(metadataUnknownTopic)
  342. if _, err = client.Leader("new_topic", 1); err != ErrUnknownTopicOrPartition {
  343. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  344. }
  345. safeClose(t, client)
  346. seedBroker.Close()
  347. }
  348. func TestClientReceivingPartialMetadata(t *testing.T) {
  349. seedBroker := NewMockBroker(t, 1)
  350. leader := NewMockBroker(t, 5)
  351. metadataResponse1 := new(MetadataResponse)
  352. metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
  353. seedBroker.Returns(metadataResponse1)
  354. config := NewConfig()
  355. config.Metadata.Retry.Max = 0
  356. client, err := NewClient([]string{seedBroker.Addr()}, config)
  357. if err != nil {
  358. t.Fatal(err)
  359. }
  360. replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()}
  361. metadataPartial := new(MetadataResponse)
  362. metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable)
  363. metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, []int32{}, ErrNoError)
  364. metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, []int32{}, ErrLeaderNotAvailable)
  365. seedBroker.Returns(metadataPartial)
  366. if err := client.RefreshMetadata("new_topic"); err != nil {
  367. t.Error("ErrLeaderNotAvailable should not make RefreshMetadata respond with an error")
  368. }
  369. // Even though the metadata was incomplete, we should be able to get the leader of a partition
  370. // for which we did get a useful response, without doing additional requests.
  371. partition0Leader, err := client.Leader("new_topic", 0)
  372. if err != nil {
  373. t.Error(err)
  374. } else if partition0Leader.Addr() != leader.Addr() {
  375. t.Error("Unexpected leader returned", partition0Leader.Addr())
  376. }
  377. // If we are asking for the leader of a partition that didn't have a leader before,
  378. // we will do another metadata request.
  379. seedBroker.Returns(metadataPartial)
  380. // Still no leader for the partition, so asking for it should return an error.
  381. _, err = client.Leader("new_topic", 1)
  382. if err != ErrLeaderNotAvailable {
  383. t.Error("Expected ErrLeaderNotAvailable, got", err)
  384. }
  385. safeClose(t, client)
  386. seedBroker.Close()
  387. leader.Close()
  388. }
  389. func TestClientRefreshBehaviour(t *testing.T) {
  390. seedBroker := NewMockBroker(t, 1)
  391. leader := NewMockBroker(t, 5)
  392. metadataResponse1 := new(MetadataResponse)
  393. metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
  394. seedBroker.Returns(metadataResponse1)
  395. metadataResponse2 := new(MetadataResponse)
  396. metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, nil, ErrNoError)
  397. seedBroker.Returns(metadataResponse2)
  398. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  399. if err != nil {
  400. t.Fatal(err)
  401. }
  402. parts, err := client.Partitions("my_topic")
  403. if err != nil {
  404. t.Error(err)
  405. } else if len(parts) != 1 || parts[0] != 0xb {
  406. t.Error("Client returned incorrect partitions for my_topic:", parts)
  407. }
  408. tst, err := client.Leader("my_topic", 0xb)
  409. if err != nil {
  410. t.Error(err)
  411. } else if tst.ID() != 5 {
  412. t.Error("Leader for my_topic had incorrect ID.")
  413. }
  414. leader.Close()
  415. seedBroker.Close()
  416. safeClose(t, client)
  417. }
  418. func TestClientResurrectDeadSeeds(t *testing.T) {
  419. initialSeed := NewMockBroker(t, 0)
  420. emptyMetadata := new(MetadataResponse)
  421. initialSeed.Returns(emptyMetadata)
  422. conf := NewConfig()
  423. conf.Metadata.Retry.Backoff = 0
  424. conf.Metadata.RefreshFrequency = 0
  425. c, err := NewClient([]string{initialSeed.Addr()}, conf)
  426. if err != nil {
  427. t.Fatal(err)
  428. }
  429. initialSeed.Close()
  430. client := c.(*client)
  431. seed1 := NewMockBroker(t, 1)
  432. seed2 := NewMockBroker(t, 2)
  433. seed3 := NewMockBroker(t, 3)
  434. addr1 := seed1.Addr()
  435. addr2 := seed2.Addr()
  436. addr3 := seed3.Addr()
  437. // Overwrite the seed brokers with a fixed ordering to make this test deterministic.
  438. safeClose(t, client.seedBrokers[0])
  439. client.seedBrokers = []*Broker{NewBroker(addr1), NewBroker(addr2), NewBroker(addr3)}
  440. client.deadSeeds = []*Broker{}
  441. wg := sync.WaitGroup{}
  442. wg.Add(1)
  443. go func() {
  444. if err := client.RefreshMetadata(); err != nil {
  445. t.Error(err)
  446. }
  447. wg.Done()
  448. }()
  449. seed1.Close()
  450. seed2.Close()
  451. seed1 = NewMockBrokerAddr(t, 1, addr1)
  452. seed2 = NewMockBrokerAddr(t, 2, addr2)
  453. seed3.Close()
  454. seed1.Close()
  455. seed2.Returns(emptyMetadata)
  456. wg.Wait()
  457. if len(client.seedBrokers) != 2 {
  458. t.Error("incorrect number of live seeds")
  459. }
  460. if len(client.deadSeeds) != 1 {
  461. t.Error("incorrect number of dead seeds")
  462. }
  463. safeClose(t, c)
  464. }
  465. func TestClientController(t *testing.T) {
  466. seedBroker := NewMockBroker(t, 1)
  467. defer seedBroker.Close()
  468. controllerBroker := NewMockBroker(t, 2)
  469. defer controllerBroker.Close()
  470. seedBroker.SetHandlerByMap(map[string]MockResponse{
  471. "MetadataRequest": NewMockMetadataResponse(t).
  472. SetController(controllerBroker.BrokerID()).
  473. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  474. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()),
  475. })
  476. cfg := NewConfig()
  477. // test kafka version greater than 0.10.0.0
  478. cfg.Version = V0_10_0_0
  479. client1, err := NewClient([]string{seedBroker.Addr()}, cfg)
  480. if err != nil {
  481. t.Fatal(err)
  482. }
  483. defer safeClose(t, client1)
  484. broker, err := client1.Controller()
  485. if err != nil {
  486. t.Fatal(err)
  487. }
  488. if broker.Addr() != controllerBroker.Addr() {
  489. t.Errorf("Expected controller to have address %s, found %s", controllerBroker.Addr(), broker.Addr())
  490. }
  491. // test kafka version earlier than 0.10.0.0
  492. cfg.Version = V0_9_0_1
  493. client2, err := NewClient([]string{seedBroker.Addr()}, cfg)
  494. if err != nil {
  495. t.Fatal(err)
  496. }
  497. defer safeClose(t, client2)
  498. if _, err = client2.Controller(); err != ErrUnsupportedVersion {
  499. t.Errorf("Expected Contoller() to return %s, found %s", ErrUnsupportedVersion, err)
  500. }
  501. }
  502. func TestClientMetadataTimeout(t *testing.T) {
  503. for _, timeout := range []time.Duration{
  504. 250 * time.Millisecond, // Will cut the first retry pass
  505. 500 * time.Millisecond, // Will cut the second retry pass
  506. 750 * time.Millisecond, // Will cut the third retry pass
  507. 900 * time.Millisecond, // Will stop after the three retries
  508. } {
  509. t.Run(fmt.Sprintf("timeout=%v", timeout), func(t *testing.T) {
  510. // Use a responsive broker to create a working client
  511. initialSeed := NewMockBroker(t, 0)
  512. emptyMetadata := new(MetadataResponse)
  513. initialSeed.Returns(emptyMetadata)
  514. conf := NewConfig()
  515. // Speed up the metadata request failure because of a read timeout
  516. conf.Net.ReadTimeout = 100 * time.Millisecond
  517. // Disable backoff and refresh
  518. conf.Metadata.Retry.Backoff = 0
  519. conf.Metadata.RefreshFrequency = 0
  520. // But configure a "global" timeout
  521. conf.Metadata.Timeout = timeout
  522. c, err := NewClient([]string{initialSeed.Addr()}, conf)
  523. if err != nil {
  524. t.Fatal(err)
  525. }
  526. initialSeed.Close()
  527. client := c.(*client)
  528. // Start seed brokers that do not reply to anything and therefore a read
  529. // on the TCP connection will timeout to simulate unresponsive brokers
  530. seed1 := NewMockBroker(t, 1)
  531. defer seed1.Close()
  532. seed2 := NewMockBroker(t, 2)
  533. defer seed2.Close()
  534. // Overwrite the seed brokers with a fixed ordering to make this test deterministic
  535. safeClose(t, client.seedBrokers[0])
  536. client.seedBrokers = []*Broker{NewBroker(seed1.Addr()), NewBroker(seed2.Addr())}
  537. client.deadSeeds = []*Broker{}
  538. // Start refreshing metadata in the background
  539. errChan := make(chan error)
  540. start := time.Now()
  541. go func() {
  542. errChan <- c.RefreshMetadata()
  543. }()
  544. // Check that the refresh fails fast enough (less than twice the configured timeout)
  545. // instead of at least: 100 ms * 2 brokers * 3 retries = 800 ms
  546. maxRefreshDuration := 2 * timeout
  547. select {
  548. case err := <-errChan:
  549. t.Logf("Got err: %v after waiting for: %v", err, time.Since(start))
  550. if err == nil {
  551. t.Fatal("Expected failed RefreshMetadata, got nil")
  552. }
  553. if err != ErrOutOfBrokers {
  554. t.Error("Expected failed RefreshMetadata with ErrOutOfBrokers, got:", err)
  555. }
  556. case <-time.After(maxRefreshDuration):
  557. t.Fatalf("RefreshMetadata did not fail fast enough after waiting for %v", maxRefreshDuration)
  558. }
  559. safeClose(t, c)
  560. })
  561. }
  562. }
  563. func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
  564. seedBroker := NewMockBroker(t, 1)
  565. staleCoordinator := NewMockBroker(t, 2)
  566. freshCoordinator := NewMockBroker(t, 3)
  567. replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()}
  568. metadataResponse1 := new(MetadataResponse)
  569. metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID())
  570. metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID())
  571. metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  572. seedBroker.Returns(metadataResponse1)
  573. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  574. if err != nil {
  575. t.Fatal(err)
  576. }
  577. coordinatorResponse1 := new(ConsumerMetadataResponse)
  578. coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
  579. seedBroker.Returns(coordinatorResponse1)
  580. coordinatorResponse2 := new(ConsumerMetadataResponse)
  581. coordinatorResponse2.CoordinatorID = staleCoordinator.BrokerID()
  582. coordinatorResponse2.CoordinatorHost = "127.0.0.1"
  583. coordinatorResponse2.CoordinatorPort = staleCoordinator.Port()
  584. seedBroker.Returns(coordinatorResponse2)
  585. broker, err := client.Coordinator("my_group")
  586. if err != nil {
  587. t.Error(err)
  588. }
  589. if staleCoordinator.Addr() != broker.Addr() {
  590. t.Errorf("Expected coordinator to have address %s, found %s", staleCoordinator.Addr(), broker.Addr())
  591. }
  592. if staleCoordinator.BrokerID() != broker.ID() {
  593. t.Errorf("Expected coordinator to have ID %d, found %d", staleCoordinator.BrokerID(), broker.ID())
  594. }
  595. // Grab the cached value
  596. broker2, err := client.Coordinator("my_group")
  597. if err != nil {
  598. t.Error(err)
  599. }
  600. if broker2.Addr() != broker.Addr() {
  601. t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr())
  602. }
  603. coordinatorResponse3 := new(ConsumerMetadataResponse)
  604. coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID()
  605. coordinatorResponse3.CoordinatorHost = "127.0.0.1"
  606. coordinatorResponse3.CoordinatorPort = freshCoordinator.Port()
  607. seedBroker.Returns(coordinatorResponse3)
  608. // Refresh the locally cahced value because it's stale
  609. if err := client.RefreshCoordinator("my_group"); err != nil {
  610. t.Error(err)
  611. }
  612. // Grab the fresh value
  613. broker3, err := client.Coordinator("my_group")
  614. if err != nil {
  615. t.Error(err)
  616. }
  617. if broker3.Addr() != freshCoordinator.Addr() {
  618. t.Errorf("Expected the freshCoordinator to be returned, but found %s.", broker3.Addr())
  619. }
  620. freshCoordinator.Close()
  621. staleCoordinator.Close()
  622. seedBroker.Close()
  623. safeClose(t, client)
  624. }
  625. func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
  626. seedBroker := NewMockBroker(t, 1)
  627. coordinator := NewMockBroker(t, 2)
  628. metadataResponse1 := new(MetadataResponse)
  629. seedBroker.Returns(metadataResponse1)
  630. config := NewConfig()
  631. config.Metadata.Retry.Max = 1
  632. config.Metadata.Retry.Backoff = 0
  633. client, err := NewClient([]string{seedBroker.Addr()}, config)
  634. if err != nil {
  635. t.Fatal(err)
  636. }
  637. coordinatorResponse1 := new(ConsumerMetadataResponse)
  638. coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
  639. seedBroker.Returns(coordinatorResponse1)
  640. metadataResponse2 := new(MetadataResponse)
  641. metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition)
  642. seedBroker.Returns(metadataResponse2)
  643. replicas := []int32{coordinator.BrokerID()}
  644. metadataResponse3 := new(MetadataResponse)
  645. metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  646. seedBroker.Returns(metadataResponse3)
  647. coordinatorResponse2 := new(ConsumerMetadataResponse)
  648. coordinatorResponse2.CoordinatorID = coordinator.BrokerID()
  649. coordinatorResponse2.CoordinatorHost = "127.0.0.1"
  650. coordinatorResponse2.CoordinatorPort = coordinator.Port()
  651. seedBroker.Returns(coordinatorResponse2)
  652. broker, err := client.Coordinator("my_group")
  653. if err != nil {
  654. t.Error(err)
  655. }
  656. if coordinator.Addr() != broker.Addr() {
  657. t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr())
  658. }
  659. if coordinator.BrokerID() != broker.ID() {
  660. t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID())
  661. }
  662. coordinator.Close()
  663. seedBroker.Close()
  664. safeClose(t, client)
  665. }
  666. func TestClientAutorefreshShutdownRace(t *testing.T) {
  667. seedBroker := NewMockBroker(t, 1)
  668. metadataResponse := new(MetadataResponse)
  669. seedBroker.Returns(metadataResponse)
  670. conf := NewConfig()
  671. conf.Metadata.RefreshFrequency = 100 * time.Millisecond
  672. client, err := NewClient([]string{seedBroker.Addr()}, conf)
  673. if err != nil {
  674. t.Fatal(err)
  675. }
  676. // Wait for the background refresh to kick in
  677. time.Sleep(110 * time.Millisecond)
  678. errCh := make(chan error, 1)
  679. go func() {
  680. // Close the client
  681. errCh <- client.Close()
  682. close(errCh)
  683. }()
  684. // Wait for the Close to kick in
  685. time.Sleep(10 * time.Millisecond)
  686. // Then return some metadata to the still-running background thread
  687. leader := NewMockBroker(t, 2)
  688. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  689. metadataResponse.AddTopicPartition("foo", 0, leader.BrokerID(), []int32{2}, []int32{2}, []int32{}, ErrNoError)
  690. seedBroker.Returns(metadataResponse)
  691. err = <-errCh
  692. if err != nil {
  693. t.Fatalf("goroutine client.Close():%s", err)
  694. }
  695. seedBroker.Close()
  696. // give the update time to happen so we get a panic if it's still running (which it shouldn't)
  697. time.Sleep(10 * time.Millisecond)
  698. }