client_test.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889
  1. package sarama
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "sync/atomic"
  7. "testing"
  8. "time"
  9. )
  10. func safeClose(t testing.TB, c io.Closer) {
  11. err := c.Close()
  12. if err != nil {
  13. t.Error(err)
  14. }
  15. }
  16. func TestSimpleClient(t *testing.T) {
  17. seedBroker := NewMockBroker(t, 1)
  18. seedBroker.Returns(new(MetadataResponse))
  19. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  20. if err != nil {
  21. t.Fatal(err)
  22. }
  23. seedBroker.Close()
  24. safeClose(t, client)
  25. }
  26. func TestCachedPartitions(t *testing.T) {
  27. seedBroker := NewMockBroker(t, 1)
  28. replicas := []int32{3, 1, 5}
  29. isr := []int32{5, 1}
  30. metadataResponse := new(MetadataResponse)
  31. metadataResponse.AddBroker("localhost:12345", 2)
  32. metadataResponse.AddTopicPartition("my_topic", 0, 2, replicas, isr, []int32{}, ErrNoError)
  33. metadataResponse.AddTopicPartition("my_topic", 1, 2, replicas, isr, []int32{}, ErrLeaderNotAvailable)
  34. seedBroker.Returns(metadataResponse)
  35. config := NewConfig()
  36. config.Metadata.Retry.Max = 0
  37. c, err := NewClient([]string{seedBroker.Addr()}, config)
  38. if err != nil {
  39. t.Fatal(err)
  40. }
  41. client := c.(*client)
  42. // Verify they aren't cached the same
  43. allP := client.cachedPartitionsResults["my_topic"][allPartitions]
  44. writeP := client.cachedPartitionsResults["my_topic"][writablePartitions]
  45. if len(allP) == len(writeP) {
  46. t.Fatal("Invalid lengths!")
  47. }
  48. tmp := client.cachedPartitionsResults["my_topic"]
  49. // Verify we actually use the cache at all!
  50. tmp[allPartitions] = []int32{1, 2, 3, 4}
  51. client.cachedPartitionsResults["my_topic"] = tmp
  52. if 4 != len(client.cachedPartitions("my_topic", allPartitions)) {
  53. t.Fatal("Not using the cache!")
  54. }
  55. seedBroker.Close()
  56. safeClose(t, client)
  57. }
  58. func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
  59. seedBroker := NewMockBroker(t, 1)
  60. replicas := []int32{seedBroker.BrokerID()}
  61. metadataResponse := new(MetadataResponse)
  62. metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
  63. metadataResponse.AddTopicPartition("my_topic", 1, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  64. metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  65. seedBroker.Returns(metadataResponse)
  66. config := NewConfig()
  67. config.Metadata.Retry.Max = 0
  68. client, err := NewClient([]string{seedBroker.Addr()}, config)
  69. if err != nil {
  70. t.Fatal(err)
  71. }
  72. metadataResponse = new(MetadataResponse)
  73. metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
  74. seedBroker.Returns(metadataResponse)
  75. partitions, err := client.Partitions("unknown")
  76. if err != ErrUnknownTopicOrPartition {
  77. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  78. }
  79. if partitions != nil {
  80. t.Errorf("Should return nil as partition list, found %v", partitions)
  81. }
  82. // Should still use the cache of a known topic
  83. partitions, err = client.Partitions("my_topic")
  84. if err != nil {
  85. t.Errorf("Expected no error, found %v", err)
  86. }
  87. metadataResponse = new(MetadataResponse)
  88. metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
  89. seedBroker.Returns(metadataResponse)
  90. // Should not use cache for unknown topic
  91. partitions, err = client.Partitions("unknown")
  92. if err != ErrUnknownTopicOrPartition {
  93. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  94. }
  95. if partitions != nil {
  96. t.Errorf("Should return nil as partition list, found %v", partitions)
  97. }
  98. seedBroker.Close()
  99. safeClose(t, client)
  100. }
  101. func TestClientSeedBrokers(t *testing.T) {
  102. seedBroker := NewMockBroker(t, 1)
  103. metadataResponse := new(MetadataResponse)
  104. metadataResponse.AddBroker("localhost:12345", 2)
  105. seedBroker.Returns(metadataResponse)
  106. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  107. if err != nil {
  108. t.Fatal(err)
  109. }
  110. seedBroker.Close()
  111. safeClose(t, client)
  112. }
  113. func TestClientMetadata(t *testing.T) {
  114. seedBroker := NewMockBroker(t, 1)
  115. leader := NewMockBroker(t, 5)
  116. replicas := []int32{3, 1, 5}
  117. isr := []int32{5, 1}
  118. metadataResponse := new(MetadataResponse)
  119. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  120. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, []int32{}, ErrNoError)
  121. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrLeaderNotAvailable)
  122. seedBroker.Returns(metadataResponse)
  123. config := NewConfig()
  124. config.Metadata.Retry.Max = 0
  125. client, err := NewClient([]string{seedBroker.Addr()}, config)
  126. if err != nil {
  127. t.Fatal(err)
  128. }
  129. topics, err := client.Topics()
  130. if err != nil {
  131. t.Error(err)
  132. } else if len(topics) != 1 || topics[0] != "my_topic" {
  133. t.Error("Client returned incorrect topics:", topics)
  134. }
  135. parts, err := client.Partitions("my_topic")
  136. if err != nil {
  137. t.Error(err)
  138. } else if len(parts) != 2 || parts[0] != 0 || parts[1] != 1 {
  139. t.Error("Client returned incorrect partitions for my_topic:", parts)
  140. }
  141. parts, err = client.WritablePartitions("my_topic")
  142. if err != nil {
  143. t.Error(err)
  144. } else if len(parts) != 1 || parts[0] != 0 {
  145. t.Error("Client returned incorrect writable partitions for my_topic:", parts)
  146. }
  147. tst, err := client.Leader("my_topic", 0)
  148. if err != nil {
  149. t.Error(err)
  150. } else if tst.ID() != 5 {
  151. t.Error("Leader for my_topic had incorrect ID.")
  152. }
  153. replicas, err = client.Replicas("my_topic", 0)
  154. if err != nil {
  155. t.Error(err)
  156. } else if replicas[0] != 3 {
  157. t.Error("Incorrect (or sorted) replica")
  158. } else if replicas[1] != 1 {
  159. t.Error("Incorrect (or sorted) replica")
  160. } else if replicas[2] != 5 {
  161. t.Error("Incorrect (or sorted) replica")
  162. }
  163. isr, err = client.InSyncReplicas("my_topic", 0)
  164. if err != nil {
  165. t.Error(err)
  166. } else if len(isr) != 2 {
  167. t.Error("Client returned incorrect ISRs for partition:", isr)
  168. } else if isr[0] != 5 {
  169. t.Error("Incorrect (or sorted) ISR:", isr)
  170. } else if isr[1] != 1 {
  171. t.Error("Incorrect (or sorted) ISR:", isr)
  172. }
  173. leader.Close()
  174. seedBroker.Close()
  175. safeClose(t, client)
  176. }
  177. func TestClientMetadataWithOfflineReplicas(t *testing.T) {
  178. seedBroker := NewMockBroker(t, 1)
  179. leader := NewMockBroker(t, 5)
  180. replicas := []int32{1, 2, 3}
  181. isr := []int32{1, 2}
  182. offlineReplicas := []int32{3}
  183. metadataResponse := new(MetadataResponse)
  184. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  185. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, offlineReplicas, ErrNoError)
  186. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrNoError)
  187. metadataResponse.Version = 5
  188. seedBroker.Returns(metadataResponse)
  189. config := NewConfig()
  190. config.Version = V1_0_0_0
  191. config.Metadata.Retry.Max = 0
  192. client, err := NewClient([]string{seedBroker.Addr()}, config)
  193. if err != nil {
  194. t.Fatal(err)
  195. }
  196. topics, err := client.Topics()
  197. if err != nil {
  198. t.Error(err)
  199. } else if len(topics) != 1 || topics[0] != "my_topic" {
  200. t.Error("Client returned incorrect topics:", topics)
  201. }
  202. parts, err := client.Partitions("my_topic")
  203. if err != nil {
  204. t.Error(err)
  205. } else if len(parts) != 2 || parts[0] != 0 || parts[1] != 1 {
  206. t.Error("Client returned incorrect partitions for my_topic:", parts)
  207. }
  208. parts, err = client.WritablePartitions("my_topic")
  209. if err != nil {
  210. t.Error(err)
  211. } else if len(parts) != 2 {
  212. t.Error("Client returned incorrect writable partitions for my_topic:", parts)
  213. }
  214. tst, err := client.Leader("my_topic", 0)
  215. if err != nil {
  216. t.Error(err)
  217. } else if tst.ID() != 5 {
  218. t.Error("Leader for my_topic had incorrect ID.")
  219. }
  220. replicas, err = client.Replicas("my_topic", 0)
  221. if err != nil {
  222. t.Error(err)
  223. } else if replicas[0] != 1 {
  224. t.Error("Incorrect (or sorted) replica")
  225. } else if replicas[1] != 2 {
  226. t.Error("Incorrect (or sorted) replica")
  227. } else if replicas[2] != 3 {
  228. t.Error("Incorrect (or sorted) replica")
  229. }
  230. isr, err = client.InSyncReplicas("my_topic", 0)
  231. if err != nil {
  232. t.Error(err)
  233. } else if len(isr) != 2 {
  234. t.Error("Client returned incorrect ISRs for partition:", isr)
  235. } else if isr[0] != 1 {
  236. t.Error("Incorrect (or sorted) ISR:", isr)
  237. } else if isr[1] != 2 {
  238. t.Error("Incorrect (or sorted) ISR:", isr)
  239. }
  240. offlineReplicas, err = client.OfflineReplicas("my_topic", 0)
  241. if err != nil {
  242. t.Error(err)
  243. } else if len(offlineReplicas) != 1 {
  244. t.Error("Client returned incorrect offline replicas for partition:", offlineReplicas)
  245. } else if offlineReplicas[0] != 3 {
  246. t.Error("Incorrect offline replica:", offlineReplicas)
  247. }
  248. leader.Close()
  249. seedBroker.Close()
  250. safeClose(t, client)
  251. }
  252. func TestClientGetOffset(t *testing.T) {
  253. seedBroker := NewMockBroker(t, 1)
  254. leader := NewMockBroker(t, 2)
  255. leaderAddr := leader.Addr()
  256. metadata := new(MetadataResponse)
  257. metadata.AddTopicPartition("foo", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  258. metadata.AddBroker(leaderAddr, leader.BrokerID())
  259. seedBroker.Returns(metadata)
  260. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  261. if err != nil {
  262. t.Fatal(err)
  263. }
  264. offsetResponse := new(OffsetResponse)
  265. offsetResponse.AddTopicPartition("foo", 0, 123)
  266. leader.Returns(offsetResponse)
  267. offset, err := client.GetOffset("foo", 0, OffsetNewest)
  268. if err != nil {
  269. t.Error(err)
  270. }
  271. if offset != 123 {
  272. t.Error("Unexpected offset, got ", offset)
  273. }
  274. leader.Close()
  275. seedBroker.Returns(metadata)
  276. leader = NewMockBrokerAddr(t, 2, leaderAddr)
  277. offsetResponse = new(OffsetResponse)
  278. offsetResponse.AddTopicPartition("foo", 0, 456)
  279. leader.Returns(offsetResponse)
  280. offset, err = client.GetOffset("foo", 0, OffsetNewest)
  281. if err != nil {
  282. t.Error(err)
  283. }
  284. if offset != 456 {
  285. t.Error("Unexpected offset, got ", offset)
  286. }
  287. seedBroker.Close()
  288. leader.Close()
  289. safeClose(t, client)
  290. }
  291. func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) {
  292. seedBroker := NewMockBroker(t, 1)
  293. metadataResponse1 := new(MetadataResponse)
  294. seedBroker.Returns(metadataResponse1)
  295. retryCount := int32(0)
  296. config := NewConfig()
  297. config.Metadata.Retry.Max = 1
  298. config.Metadata.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
  299. atomic.AddInt32(&retryCount, 1)
  300. return 0
  301. }
  302. client, err := NewClient([]string{seedBroker.Addr()}, config)
  303. if err != nil {
  304. t.Fatal(err)
  305. }
  306. metadataUnknownTopic := new(MetadataResponse)
  307. metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
  308. seedBroker.Returns(metadataUnknownTopic)
  309. seedBroker.Returns(metadataUnknownTopic)
  310. if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
  311. t.Error("ErrUnknownTopicOrPartition expected, got", err)
  312. }
  313. safeClose(t, client)
  314. seedBroker.Close()
  315. actualRetryCount := atomic.LoadInt32(&retryCount)
  316. if actualRetryCount != 1 {
  317. t.Fatalf("Expected BackoffFunc to be called exactly once, but saw %d", actualRetryCount)
  318. }
  319. }
  320. func TestClientReceivingUnknownTopic(t *testing.T) {
  321. seedBroker := NewMockBroker(t, 1)
  322. metadataResponse1 := new(MetadataResponse)
  323. seedBroker.Returns(metadataResponse1)
  324. config := NewConfig()
  325. config.Metadata.Retry.Max = 1
  326. config.Metadata.Retry.Backoff = 0
  327. client, err := NewClient([]string{seedBroker.Addr()}, config)
  328. if err != nil {
  329. t.Fatal(err)
  330. }
  331. metadataUnknownTopic := new(MetadataResponse)
  332. metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
  333. seedBroker.Returns(metadataUnknownTopic)
  334. seedBroker.Returns(metadataUnknownTopic)
  335. if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
  336. t.Error("ErrUnknownTopicOrPartition expected, got", err)
  337. }
  338. // If we are asking for the leader of a partition of the non-existing topic.
  339. // we will request metadata again.
  340. seedBroker.Returns(metadataUnknownTopic)
  341. seedBroker.Returns(metadataUnknownTopic)
  342. if _, err = client.Leader("new_topic", 1); err != ErrUnknownTopicOrPartition {
  343. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  344. }
  345. safeClose(t, client)
  346. seedBroker.Close()
  347. }
  348. func TestClientReceivingPartialMetadata(t *testing.T) {
  349. seedBroker := NewMockBroker(t, 1)
  350. leader := NewMockBroker(t, 5)
  351. metadataResponse1 := new(MetadataResponse)
  352. metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
  353. seedBroker.Returns(metadataResponse1)
  354. config := NewConfig()
  355. config.Metadata.Retry.Max = 0
  356. client, err := NewClient([]string{seedBroker.Addr()}, config)
  357. if err != nil {
  358. t.Fatal(err)
  359. }
  360. replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()}
  361. metadataPartial := new(MetadataResponse)
  362. metadataPartial.AddBroker(seedBroker.Addr(), 1)
  363. metadataPartial.AddBroker(leader.Addr(), 5)
  364. metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable)
  365. metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, []int32{}, ErrNoError)
  366. metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, []int32{}, ErrLeaderNotAvailable)
  367. seedBroker.Returns(metadataPartial)
  368. if err := client.RefreshMetadata("new_topic"); err != nil {
  369. t.Error("ErrLeaderNotAvailable should not make RefreshMetadata respond with an error")
  370. }
  371. // Even though the metadata was incomplete, we should be able to get the leader of a partition
  372. // for which we did get a useful response, without doing additional requests.
  373. partition0Leader, err := client.Leader("new_topic", 0)
  374. if err != nil {
  375. t.Error(err)
  376. } else if partition0Leader.Addr() != leader.Addr() {
  377. t.Error("Unexpected leader returned", partition0Leader.Addr())
  378. }
  379. // If we are asking for the leader of a partition that didn't have a leader before,
  380. // we will do another metadata request.
  381. seedBroker.Returns(metadataPartial)
  382. // Still no leader for the partition, so asking for it should return an error.
  383. _, err = client.Leader("new_topic", 1)
  384. if err != ErrLeaderNotAvailable {
  385. t.Error("Expected ErrLeaderNotAvailable, got", err)
  386. }
  387. safeClose(t, client)
  388. seedBroker.Close()
  389. leader.Close()
  390. }
  391. func TestClientRefreshBehaviour(t *testing.T) {
  392. seedBroker := NewMockBroker(t, 1)
  393. leader := NewMockBroker(t, 5)
  394. metadataResponse1 := new(MetadataResponse)
  395. metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
  396. seedBroker.Returns(metadataResponse1)
  397. metadataResponse2 := new(MetadataResponse)
  398. metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID())
  399. metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, nil, ErrNoError)
  400. seedBroker.Returns(metadataResponse2)
  401. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  402. if err != nil {
  403. t.Fatal(err)
  404. }
  405. parts, err := client.Partitions("my_topic")
  406. if err != nil {
  407. t.Error(err)
  408. } else if len(parts) != 1 || parts[0] != 0xb {
  409. t.Error("Client returned incorrect partitions for my_topic:", parts)
  410. }
  411. tst, err := client.Leader("my_topic", 0xb)
  412. if err != nil {
  413. t.Error(err)
  414. } else if tst.ID() != 5 {
  415. t.Error("Leader for my_topic had incorrect ID.")
  416. }
  417. leader.Close()
  418. seedBroker.Close()
  419. safeClose(t, client)
  420. }
  421. func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
  422. seedBroker := NewMockBroker(t, 1)
  423. leader := NewMockBroker(t, 5)
  424. metadataResponse1 := new(MetadataResponse)
  425. metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
  426. metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
  427. seedBroker.Returns(metadataResponse1)
  428. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  429. if err != nil {
  430. t.Fatal(err)
  431. }
  432. if len(client.Brokers()) != 2 {
  433. t.Error("Meta broker is not 2")
  434. }
  435. metadataResponse2 := new(MetadataResponse)
  436. metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID())
  437. seedBroker.Returns(metadataResponse2)
  438. if err := client.RefreshMetadata(); err != nil {
  439. t.Error(err)
  440. }
  441. if len(client.Brokers()) != 1 {
  442. t.Error("Meta broker is not 1")
  443. }
  444. }
  445. func TestClientResurrectDeadSeeds(t *testing.T) {
  446. initialSeed := NewMockBroker(t, 0)
  447. emptyMetadata := new(MetadataResponse)
  448. initialSeed.Returns(emptyMetadata)
  449. conf := NewConfig()
  450. conf.Metadata.Retry.Backoff = 0
  451. conf.Metadata.RefreshFrequency = 0
  452. c, err := NewClient([]string{initialSeed.Addr()}, conf)
  453. if err != nil {
  454. t.Fatal(err)
  455. }
  456. initialSeed.Close()
  457. client := c.(*client)
  458. seed1 := NewMockBroker(t, 1)
  459. seed2 := NewMockBroker(t, 2)
  460. seed3 := NewMockBroker(t, 3)
  461. addr1 := seed1.Addr()
  462. addr2 := seed2.Addr()
  463. addr3 := seed3.Addr()
  464. // Overwrite the seed brokers with a fixed ordering to make this test deterministic.
  465. safeClose(t, client.seedBrokers[0])
  466. client.seedBrokers = []*Broker{NewBroker(addr1), NewBroker(addr2), NewBroker(addr3)}
  467. client.deadSeeds = []*Broker{}
  468. wg := sync.WaitGroup{}
  469. wg.Add(1)
  470. go func() {
  471. if err := client.RefreshMetadata(); err != nil {
  472. t.Error(err)
  473. }
  474. wg.Done()
  475. }()
  476. seed1.Close()
  477. seed2.Close()
  478. seed1 = NewMockBrokerAddr(t, 1, addr1)
  479. seed2 = NewMockBrokerAddr(t, 2, addr2)
  480. seed3.Close()
  481. seed1.Close()
  482. seed2.Returns(emptyMetadata)
  483. wg.Wait()
  484. if len(client.seedBrokers) != 2 {
  485. t.Error("incorrect number of live seeds")
  486. }
  487. if len(client.deadSeeds) != 1 {
  488. t.Error("incorrect number of dead seeds")
  489. }
  490. safeClose(t, c)
  491. }
  492. func TestClientController(t *testing.T) {
  493. seedBroker := NewMockBroker(t, 1)
  494. defer seedBroker.Close()
  495. controllerBroker := NewMockBroker(t, 2)
  496. defer controllerBroker.Close()
  497. seedBroker.SetHandlerByMap(map[string]MockResponse{
  498. "MetadataRequest": NewMockMetadataResponse(t).
  499. SetController(controllerBroker.BrokerID()).
  500. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  501. SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()),
  502. })
  503. cfg := NewConfig()
  504. // test kafka version greater than 0.10.0.0
  505. cfg.Version = V0_10_0_0
  506. client1, err := NewClient([]string{seedBroker.Addr()}, cfg)
  507. if err != nil {
  508. t.Fatal(err)
  509. }
  510. defer safeClose(t, client1)
  511. broker, err := client1.Controller()
  512. if err != nil {
  513. t.Fatal(err)
  514. }
  515. if broker.Addr() != controllerBroker.Addr() {
  516. t.Errorf("Expected controller to have address %s, found %s", controllerBroker.Addr(), broker.Addr())
  517. }
  518. // test kafka version earlier than 0.10.0.0
  519. cfg.Version = V0_9_0_1
  520. client2, err := NewClient([]string{seedBroker.Addr()}, cfg)
  521. if err != nil {
  522. t.Fatal(err)
  523. }
  524. defer safeClose(t, client2)
  525. if _, err = client2.Controller(); err != ErrUnsupportedVersion {
  526. t.Errorf("Expected Controller() to return %s, found %s", ErrUnsupportedVersion, err)
  527. }
  528. }
  529. func TestClientMetadataTimeout(t *testing.T) {
  530. for _, timeout := range []time.Duration{
  531. 250 * time.Millisecond, // Will cut the first retry pass
  532. 500 * time.Millisecond, // Will cut the second retry pass
  533. 750 * time.Millisecond, // Will cut the third retry pass
  534. 900 * time.Millisecond, // Will stop after the three retries
  535. } {
  536. t.Run(fmt.Sprintf("timeout=%v", timeout), func(t *testing.T) {
  537. // Use a responsive broker to create a working client
  538. initialSeed := NewMockBroker(t, 0)
  539. emptyMetadata := new(MetadataResponse)
  540. initialSeed.Returns(emptyMetadata)
  541. conf := NewConfig()
  542. // Speed up the metadata request failure because of a read timeout
  543. conf.Net.ReadTimeout = 100 * time.Millisecond
  544. // Disable backoff and refresh
  545. conf.Metadata.Retry.Backoff = 0
  546. conf.Metadata.RefreshFrequency = 0
  547. // But configure a "global" timeout
  548. conf.Metadata.Timeout = timeout
  549. c, err := NewClient([]string{initialSeed.Addr()}, conf)
  550. if err != nil {
  551. t.Fatal(err)
  552. }
  553. initialSeed.Close()
  554. client := c.(*client)
  555. // Start seed brokers that do not reply to anything and therefore a read
  556. // on the TCP connection will timeout to simulate unresponsive brokers
  557. seed1 := NewMockBroker(t, 1)
  558. defer seed1.Close()
  559. seed2 := NewMockBroker(t, 2)
  560. defer seed2.Close()
  561. // Overwrite the seed brokers with a fixed ordering to make this test deterministic
  562. safeClose(t, client.seedBrokers[0])
  563. client.seedBrokers = []*Broker{NewBroker(seed1.Addr()), NewBroker(seed2.Addr())}
  564. client.deadSeeds = []*Broker{}
  565. // Start refreshing metadata in the background
  566. errChan := make(chan error)
  567. go func() {
  568. errChan <- c.RefreshMetadata()
  569. }()
  570. // Check that the refresh fails fast enough (less than twice the configured timeout)
  571. // instead of at least: 100 ms * 2 brokers * 3 retries = 800 ms
  572. maxRefreshDuration := 2 * timeout
  573. select {
  574. case err := <-errChan:
  575. if err == nil {
  576. t.Fatal("Expected failed RefreshMetadata, got nil")
  577. }
  578. if err != ErrOutOfBrokers {
  579. t.Error("Expected failed RefreshMetadata with ErrOutOfBrokers, got:", err)
  580. }
  581. case <-time.After(maxRefreshDuration):
  582. t.Fatalf("RefreshMetadata did not fail fast enough after waiting for %v", maxRefreshDuration)
  583. }
  584. safeClose(t, c)
  585. })
  586. }
  587. }
  588. func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
  589. seedBroker := NewMockBroker(t, 1)
  590. staleCoordinator := NewMockBroker(t, 2)
  591. freshCoordinator := NewMockBroker(t, 3)
  592. replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()}
  593. metadataResponse1 := new(MetadataResponse)
  594. metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID())
  595. metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID())
  596. metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  597. seedBroker.Returns(metadataResponse1)
  598. client, err := NewClient([]string{seedBroker.Addr()}, nil)
  599. if err != nil {
  600. t.Fatal(err)
  601. }
  602. coordinatorResponse1 := new(ConsumerMetadataResponse)
  603. coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
  604. seedBroker.Returns(coordinatorResponse1)
  605. coordinatorResponse2 := new(ConsumerMetadataResponse)
  606. coordinatorResponse2.CoordinatorID = staleCoordinator.BrokerID()
  607. coordinatorResponse2.CoordinatorHost = "127.0.0.1"
  608. coordinatorResponse2.CoordinatorPort = staleCoordinator.Port()
  609. seedBroker.Returns(coordinatorResponse2)
  610. broker, err := client.Coordinator("my_group")
  611. if err != nil {
  612. t.Error(err)
  613. }
  614. if staleCoordinator.Addr() != broker.Addr() {
  615. t.Errorf("Expected coordinator to have address %s, found %s", staleCoordinator.Addr(), broker.Addr())
  616. }
  617. if staleCoordinator.BrokerID() != broker.ID() {
  618. t.Errorf("Expected coordinator to have ID %d, found %d", staleCoordinator.BrokerID(), broker.ID())
  619. }
  620. // Grab the cached value
  621. broker2, err := client.Coordinator("my_group")
  622. if err != nil {
  623. t.Error(err)
  624. }
  625. if broker2.Addr() != broker.Addr() {
  626. t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr())
  627. }
  628. coordinatorResponse3 := new(ConsumerMetadataResponse)
  629. coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID()
  630. coordinatorResponse3.CoordinatorHost = "127.0.0.1"
  631. coordinatorResponse3.CoordinatorPort = freshCoordinator.Port()
  632. seedBroker.Returns(coordinatorResponse3)
  633. // Refresh the locally cahced value because it's stale
  634. if err := client.RefreshCoordinator("my_group"); err != nil {
  635. t.Error(err)
  636. }
  637. // Grab the fresh value
  638. broker3, err := client.Coordinator("my_group")
  639. if err != nil {
  640. t.Error(err)
  641. }
  642. if broker3.Addr() != freshCoordinator.Addr() {
  643. t.Errorf("Expected the freshCoordinator to be returned, but found %s.", broker3.Addr())
  644. }
  645. freshCoordinator.Close()
  646. staleCoordinator.Close()
  647. seedBroker.Close()
  648. safeClose(t, client)
  649. }
  650. func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
  651. seedBroker := NewMockBroker(t, 1)
  652. coordinator := NewMockBroker(t, 2)
  653. metadataResponse1 := new(MetadataResponse)
  654. seedBroker.Returns(metadataResponse1)
  655. config := NewConfig()
  656. config.Metadata.Retry.Max = 1
  657. config.Metadata.Retry.Backoff = 0
  658. client, err := NewClient([]string{seedBroker.Addr()}, config)
  659. if err != nil {
  660. t.Fatal(err)
  661. }
  662. coordinatorResponse1 := new(ConsumerMetadataResponse)
  663. coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
  664. seedBroker.Returns(coordinatorResponse1)
  665. metadataResponse2 := new(MetadataResponse)
  666. metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition)
  667. seedBroker.Returns(metadataResponse2)
  668. replicas := []int32{coordinator.BrokerID()}
  669. metadataResponse3 := new(MetadataResponse)
  670. metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError)
  671. seedBroker.Returns(metadataResponse3)
  672. coordinatorResponse2 := new(ConsumerMetadataResponse)
  673. coordinatorResponse2.CoordinatorID = coordinator.BrokerID()
  674. coordinatorResponse2.CoordinatorHost = "127.0.0.1"
  675. coordinatorResponse2.CoordinatorPort = coordinator.Port()
  676. seedBroker.Returns(coordinatorResponse2)
  677. broker, err := client.Coordinator("my_group")
  678. if err != nil {
  679. t.Error(err)
  680. }
  681. if coordinator.Addr() != broker.Addr() {
  682. t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr())
  683. }
  684. if coordinator.BrokerID() != broker.ID() {
  685. t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID())
  686. }
  687. coordinator.Close()
  688. seedBroker.Close()
  689. safeClose(t, client)
  690. }
  691. func TestClientAutorefreshShutdownRace(t *testing.T) {
  692. seedBroker := NewMockBroker(t, 1)
  693. metadataResponse := new(MetadataResponse)
  694. seedBroker.Returns(metadataResponse)
  695. conf := NewConfig()
  696. conf.Metadata.RefreshFrequency = 100 * time.Millisecond
  697. client, err := NewClient([]string{seedBroker.Addr()}, conf)
  698. if err != nil {
  699. t.Fatal(err)
  700. }
  701. // Wait for the background refresh to kick in
  702. time.Sleep(110 * time.Millisecond)
  703. errCh := make(chan error, 1)
  704. go func() {
  705. // Close the client
  706. errCh <- client.Close()
  707. close(errCh)
  708. }()
  709. // Wait for the Close to kick in
  710. time.Sleep(10 * time.Millisecond)
  711. // Then return some metadata to the still-running background thread
  712. leader := NewMockBroker(t, 2)
  713. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  714. metadataResponse.AddTopicPartition("foo", 0, leader.BrokerID(), []int32{2}, []int32{2}, []int32{}, ErrNoError)
  715. seedBroker.Returns(metadataResponse)
  716. err = <-errCh
  717. if err != nil {
  718. t.Fatalf("goroutine client.Close():%s", err)
  719. }
  720. seedBroker.Close()
  721. // give the update time to happen so we get a panic if it's still running (which it shouldn't)
  722. time.Sleep(10 * time.Millisecond)
  723. }