async_producer_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746
  1. package sarama
  2. import (
  3. "errors"
  4. "log"
  5. "os"
  6. "os/signal"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. const TestMessage = "ABC THE MESSAGE"
  12. func closeProducer(t *testing.T, p AsyncProducer) {
  13. var wg sync.WaitGroup
  14. p.AsyncClose()
  15. wg.Add(2)
  16. go func() {
  17. for _ = range p.Successes() {
  18. t.Error("Unexpected message on Successes()")
  19. }
  20. wg.Done()
  21. }()
  22. go func() {
  23. for msg := range p.Errors() {
  24. t.Error(msg.Err)
  25. }
  26. wg.Done()
  27. }()
  28. wg.Wait()
  29. }
  30. func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
  31. for successes > 0 || errors > 0 {
  32. select {
  33. case msg := <-p.Errors():
  34. if msg.Msg.flags != 0 {
  35. t.Error("Message had flags set")
  36. }
  37. errors--
  38. if errors < 0 {
  39. t.Error(msg.Err)
  40. }
  41. case msg := <-p.Successes():
  42. if msg.flags != 0 {
  43. t.Error("Message had flags set")
  44. }
  45. successes--
  46. if successes < 0 {
  47. t.Error("Too many successes")
  48. }
  49. }
  50. }
  51. }
  52. type testPartitioner chan *int32
  53. func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
  54. part := <-p
  55. if part == nil {
  56. return 0, errors.New("BOOM")
  57. }
  58. return *part, nil
  59. }
  60. func (p testPartitioner) RequiresConsistency() bool {
  61. return true
  62. }
  63. func (p testPartitioner) feed(partition int32) {
  64. p <- &partition
  65. }
  66. func TestAsyncProducer(t *testing.T) {
  67. seedBroker := newMockBroker(t, 1)
  68. leader := newMockBroker(t, 2)
  69. metadataResponse := new(MetadataResponse)
  70. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  71. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  72. seedBroker.Returns(metadataResponse)
  73. prodSuccess := new(ProduceResponse)
  74. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  75. leader.Returns(prodSuccess)
  76. config := NewConfig()
  77. config.Producer.Flush.Messages = 10
  78. config.Producer.Return.Successes = true
  79. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  80. if err != nil {
  81. t.Fatal(err)
  82. }
  83. for i := 0; i < 10; i++ {
  84. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  85. }
  86. for i := 0; i < 10; i++ {
  87. select {
  88. case msg := <-producer.Errors():
  89. t.Error(msg.Err)
  90. if msg.Msg.flags != 0 {
  91. t.Error("Message had flags set")
  92. }
  93. case msg := <-producer.Successes():
  94. if msg.flags != 0 {
  95. t.Error("Message had flags set")
  96. }
  97. if msg.Metadata.(int) != i {
  98. t.Error("Message metadata did not match")
  99. }
  100. }
  101. }
  102. closeProducer(t, producer)
  103. leader.Close()
  104. seedBroker.Close()
  105. }
  106. func TestAsyncProducerMultipleFlushes(t *testing.T) {
  107. seedBroker := newMockBroker(t, 1)
  108. leader := newMockBroker(t, 2)
  109. metadataResponse := new(MetadataResponse)
  110. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  111. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  112. seedBroker.Returns(metadataResponse)
  113. prodSuccess := new(ProduceResponse)
  114. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  115. leader.Returns(prodSuccess)
  116. leader.Returns(prodSuccess)
  117. leader.Returns(prodSuccess)
  118. config := NewConfig()
  119. config.Producer.Flush.Messages = 5
  120. config.Producer.Return.Successes = true
  121. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  122. if err != nil {
  123. t.Fatal(err)
  124. }
  125. for flush := 0; flush < 3; flush++ {
  126. for i := 0; i < 5; i++ {
  127. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  128. }
  129. expectResults(t, producer, 5, 0)
  130. }
  131. closeProducer(t, producer)
  132. leader.Close()
  133. seedBroker.Close()
  134. }
  135. func TestAsyncProducerMultipleBrokers(t *testing.T) {
  136. seedBroker := newMockBroker(t, 1)
  137. leader0 := newMockBroker(t, 2)
  138. leader1 := newMockBroker(t, 3)
  139. metadataResponse := new(MetadataResponse)
  140. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  141. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  142. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  143. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  144. seedBroker.Returns(metadataResponse)
  145. prodResponse0 := new(ProduceResponse)
  146. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  147. leader0.Returns(prodResponse0)
  148. prodResponse1 := new(ProduceResponse)
  149. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  150. leader1.Returns(prodResponse1)
  151. config := NewConfig()
  152. config.Producer.Flush.Messages = 5
  153. config.Producer.Return.Successes = true
  154. config.Producer.Partitioner = NewRoundRobinPartitioner
  155. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  156. if err != nil {
  157. t.Fatal(err)
  158. }
  159. for i := 0; i < 10; i++ {
  160. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  161. }
  162. expectResults(t, producer, 10, 0)
  163. closeProducer(t, producer)
  164. leader1.Close()
  165. leader0.Close()
  166. seedBroker.Close()
  167. }
  168. func TestAsyncProducerCustomPartitioner(t *testing.T) {
  169. seedBroker := newMockBroker(t, 1)
  170. leader := newMockBroker(t, 2)
  171. metadataResponse := new(MetadataResponse)
  172. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  173. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  174. seedBroker.Returns(metadataResponse)
  175. prodResponse := new(ProduceResponse)
  176. prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  177. leader.Returns(prodResponse)
  178. config := NewConfig()
  179. config.Producer.Flush.Messages = 2
  180. config.Producer.Return.Successes = true
  181. config.Producer.Partitioner = func(topic string) Partitioner {
  182. p := make(testPartitioner)
  183. go func() {
  184. p.feed(0)
  185. p <- nil
  186. p <- nil
  187. p <- nil
  188. p.feed(0)
  189. }()
  190. return p
  191. }
  192. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  193. if err != nil {
  194. t.Fatal(err)
  195. }
  196. for i := 0; i < 5; i++ {
  197. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  198. }
  199. expectResults(t, producer, 2, 3)
  200. closeProducer(t, producer)
  201. leader.Close()
  202. seedBroker.Close()
  203. }
  204. func TestAsyncProducerFailureRetry(t *testing.T) {
  205. seedBroker := newMockBroker(t, 1)
  206. leader1 := newMockBroker(t, 2)
  207. leader2 := newMockBroker(t, 3)
  208. metadataLeader1 := new(MetadataResponse)
  209. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  210. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  211. seedBroker.Returns(metadataLeader1)
  212. config := NewConfig()
  213. config.Producer.Flush.Messages = 10
  214. config.Producer.Return.Successes = true
  215. config.Producer.Retry.Backoff = 0
  216. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  217. if err != nil {
  218. t.Fatal(err)
  219. }
  220. seedBroker.Close()
  221. for i := 0; i < 10; i++ {
  222. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  223. }
  224. prodNotLeader := new(ProduceResponse)
  225. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  226. leader1.Returns(prodNotLeader)
  227. metadataLeader2 := new(MetadataResponse)
  228. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  229. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  230. leader1.Returns(metadataLeader2)
  231. prodSuccess := new(ProduceResponse)
  232. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  233. leader2.Returns(prodSuccess)
  234. expectResults(t, producer, 10, 0)
  235. leader1.Close()
  236. for i := 0; i < 10; i++ {
  237. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  238. }
  239. leader2.Returns(prodSuccess)
  240. expectResults(t, producer, 10, 0)
  241. leader2.Close()
  242. closeProducer(t, producer)
  243. }
  244. // If a Kafka broker becomes unavailable and then returns back in service, then
  245. // producer reconnects to it and continues sending messages.
  246. func TestAsyncProducerBrokerBounce(t *testing.T) {
  247. // Given
  248. seedBroker := newMockBroker(t, 1)
  249. leader := newMockBroker(t, 2)
  250. leaderAddr := leader.Addr()
  251. metadataResponse := new(MetadataResponse)
  252. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  253. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  254. seedBroker.Returns(metadataResponse)
  255. prodSuccess := new(ProduceResponse)
  256. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  257. config := NewConfig()
  258. config.Producer.Flush.Messages = 1
  259. config.Producer.Return.Successes = true
  260. config.Producer.Retry.Backoff = 0
  261. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  262. if err != nil {
  263. t.Fatal(err)
  264. }
  265. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  266. leader.Returns(prodSuccess)
  267. expectResults(t, producer, 1, 0)
  268. // When: a broker connection gets reset by a broker (network glitch, restart, you name it).
  269. leader.Close() // producer should get EOF
  270. leader = newMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  271. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  272. // Then: a produced message goes through the new broker connection.
  273. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  274. leader.Returns(prodSuccess)
  275. expectResults(t, producer, 1, 0)
  276. closeProducer(t, producer)
  277. seedBroker.Close()
  278. leader.Close()
  279. }
  280. func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  281. seedBroker := newMockBroker(t, 1)
  282. leader1 := newMockBroker(t, 2)
  283. leader2 := newMockBroker(t, 3)
  284. metadataLeader1 := new(MetadataResponse)
  285. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  286. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  287. seedBroker.Returns(metadataLeader1)
  288. config := NewConfig()
  289. config.Producer.Flush.Messages = 10
  290. config.Producer.Return.Successes = true
  291. config.Producer.Retry.Max = 3
  292. config.Producer.Retry.Backoff = 0
  293. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  294. if err != nil {
  295. t.Fatal(err)
  296. }
  297. for i := 0; i < 10; i++ {
  298. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  299. }
  300. leader1.Close() // producer should get EOF
  301. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  302. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  303. // ok fine, tell it to go to leader2 finally
  304. metadataLeader2 := new(MetadataResponse)
  305. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  306. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  307. seedBroker.Returns(metadataLeader2)
  308. prodSuccess := new(ProduceResponse)
  309. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  310. leader2.Returns(prodSuccess)
  311. expectResults(t, producer, 10, 0)
  312. seedBroker.Close()
  313. leader2.Close()
  314. closeProducer(t, producer)
  315. }
  316. func TestAsyncProducerMultipleRetries(t *testing.T) {
  317. seedBroker := newMockBroker(t, 1)
  318. leader1 := newMockBroker(t, 2)
  319. leader2 := newMockBroker(t, 3)
  320. metadataLeader1 := new(MetadataResponse)
  321. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  322. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  323. seedBroker.Returns(metadataLeader1)
  324. config := NewConfig()
  325. config.Producer.Flush.Messages = 10
  326. config.Producer.Return.Successes = true
  327. config.Producer.Retry.Max = 4
  328. config.Producer.Retry.Backoff = 0
  329. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  330. if err != nil {
  331. t.Fatal(err)
  332. }
  333. for i := 0; i < 10; i++ {
  334. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  335. }
  336. prodNotLeader := new(ProduceResponse)
  337. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  338. leader1.Returns(prodNotLeader)
  339. metadataLeader2 := new(MetadataResponse)
  340. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  341. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  342. seedBroker.Returns(metadataLeader2)
  343. leader2.Returns(prodNotLeader)
  344. seedBroker.Returns(metadataLeader1)
  345. leader1.Returns(prodNotLeader)
  346. seedBroker.Returns(metadataLeader1)
  347. leader1.Returns(prodNotLeader)
  348. seedBroker.Returns(metadataLeader2)
  349. prodSuccess := new(ProduceResponse)
  350. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  351. leader2.Returns(prodSuccess)
  352. expectResults(t, producer, 10, 0)
  353. for i := 0; i < 10; i++ {
  354. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  355. }
  356. leader2.Returns(prodSuccess)
  357. expectResults(t, producer, 10, 0)
  358. seedBroker.Close()
  359. leader1.Close()
  360. leader2.Close()
  361. closeProducer(t, producer)
  362. }
  363. func TestAsyncProducerOutOfRetries(t *testing.T) {
  364. t.Skip("Enable once bug #294 is fixed.")
  365. seedBroker := newMockBroker(t, 1)
  366. leader := newMockBroker(t, 2)
  367. metadataResponse := new(MetadataResponse)
  368. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  369. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  370. seedBroker.Returns(metadataResponse)
  371. config := NewConfig()
  372. config.Producer.Flush.Messages = 10
  373. config.Producer.Return.Successes = true
  374. config.Producer.Retry.Backoff = 0
  375. config.Producer.Retry.Max = 0
  376. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  377. if err != nil {
  378. t.Fatal(err)
  379. }
  380. for i := 0; i < 10; i++ {
  381. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  382. }
  383. prodNotLeader := new(ProduceResponse)
  384. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  385. leader.Returns(prodNotLeader)
  386. for i := 0; i < 10; i++ {
  387. select {
  388. case msg := <-producer.Errors():
  389. if msg.Err != ErrNotLeaderForPartition {
  390. t.Error(msg.Err)
  391. }
  392. case <-producer.Successes():
  393. t.Error("Unexpected success")
  394. }
  395. }
  396. seedBroker.Returns(metadataResponse)
  397. for i := 0; i < 10; i++ {
  398. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  399. }
  400. prodSuccess := new(ProduceResponse)
  401. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  402. leader.Returns(prodSuccess)
  403. expectResults(t, producer, 10, 0)
  404. leader.Close()
  405. seedBroker.Close()
  406. safeClose(t, producer)
  407. }
  408. func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
  409. seedBroker := newMockBroker(t, 1)
  410. leader := newMockBroker(t, 2)
  411. leaderAddr := leader.Addr()
  412. metadataResponse := new(MetadataResponse)
  413. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  414. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  415. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  416. seedBroker.Returns(metadataResponse)
  417. config := NewConfig()
  418. config.Producer.Return.Successes = true
  419. config.Producer.Retry.Backoff = 0
  420. config.Producer.Retry.Max = 1
  421. config.Producer.Partitioner = NewRoundRobinPartitioner
  422. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  423. if err != nil {
  424. t.Fatal(err)
  425. }
  426. // prime partition 0
  427. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  428. prodSuccess := new(ProduceResponse)
  429. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  430. leader.Returns(prodSuccess)
  431. expectResults(t, producer, 1, 0)
  432. // prime partition 1
  433. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  434. prodSuccess = new(ProduceResponse)
  435. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  436. leader.Returns(prodSuccess)
  437. expectResults(t, producer, 1, 0)
  438. // reboot the broker (the producer will get EOF on its existing connection)
  439. leader.Close()
  440. leader = newMockBrokerAddr(t, 2, leaderAddr)
  441. // send another message on partition 0 to trigger the EOF and retry
  442. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  443. // tell partition 0 to go to that broker again
  444. seedBroker.Returns(metadataResponse)
  445. // succeed this time
  446. prodSuccess = new(ProduceResponse)
  447. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  448. leader.Returns(prodSuccess)
  449. expectResults(t, producer, 1, 0)
  450. // shutdown
  451. closeProducer(t, producer)
  452. seedBroker.Close()
  453. leader.Close()
  454. }
  455. func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
  456. seedBroker := newMockBroker(t, 1)
  457. leader := newMockBroker(t, 2)
  458. metadataResponse := new(MetadataResponse)
  459. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  460. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  461. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  462. seedBroker.Returns(metadataResponse)
  463. config := NewConfig()
  464. config.Producer.Flush.Messages = 5
  465. config.Producer.Return.Successes = true
  466. config.Producer.Retry.Backoff = 0
  467. config.Producer.Retry.Max = 1
  468. config.Producer.Partitioner = NewManualPartitioner
  469. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  470. if err != nil {
  471. t.Fatal(err)
  472. }
  473. // prime partitions
  474. for p := int32(0); p < 2; p++ {
  475. for i := 0; i < 5; i++ {
  476. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
  477. }
  478. prodSuccess := new(ProduceResponse)
  479. prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
  480. leader.Returns(prodSuccess)
  481. expectResults(t, producer, 5, 0)
  482. }
  483. // send more messages on partition 0
  484. for i := 0; i < 5; i++ {
  485. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  486. }
  487. prodNotLeader := new(ProduceResponse)
  488. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  489. leader.Returns(prodNotLeader)
  490. time.Sleep(50 * time.Millisecond)
  491. leader.SetHandlerByMap(map[string]MockResponse{
  492. "ProduceRequest": newMockProduceResponse(t).
  493. SetError("my_topic", 0, ErrNoError),
  494. })
  495. // tell partition 0 to go to that broker again
  496. seedBroker.Returns(metadataResponse)
  497. // succeed this time
  498. expectResults(t, producer, 5, 0)
  499. // put five more through
  500. for i := 0; i < 5; i++ {
  501. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  502. }
  503. expectResults(t, producer, 5, 0)
  504. // shutdown
  505. closeProducer(t, producer)
  506. seedBroker.Close()
  507. leader.Close()
  508. }
  509. func TestAsyncProducerRetryShutdown(t *testing.T) {
  510. seedBroker := newMockBroker(t, 1)
  511. leader := newMockBroker(t, 2)
  512. metadataLeader := new(MetadataResponse)
  513. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  514. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  515. seedBroker.Returns(metadataLeader)
  516. config := NewConfig()
  517. config.Producer.Flush.Messages = 10
  518. config.Producer.Return.Successes = true
  519. config.Producer.Retry.Backoff = 0
  520. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  521. if err != nil {
  522. t.Fatal(err)
  523. }
  524. for i := 0; i < 10; i++ {
  525. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  526. }
  527. producer.AsyncClose()
  528. time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
  529. producer.Input() <- &ProducerMessage{Topic: "FOO"}
  530. if err := <-producer.Errors(); err.Err != ErrShuttingDown {
  531. t.Error(err)
  532. }
  533. prodNotLeader := new(ProduceResponse)
  534. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  535. leader.Returns(prodNotLeader)
  536. seedBroker.Returns(metadataLeader)
  537. prodSuccess := new(ProduceResponse)
  538. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  539. leader.Returns(prodSuccess)
  540. expectResults(t, producer, 10, 0)
  541. seedBroker.Close()
  542. leader.Close()
  543. // wait for the async-closed producer to shut down fully
  544. for err := range producer.Errors() {
  545. t.Error(err)
  546. }
  547. }
  548. // This example shows how to use the producer while simultaneously
  549. // reading the Errors channel to know about any failures.
  550. func ExampleAsyncProducer_select() {
  551. producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
  552. if err != nil {
  553. panic(err)
  554. }
  555. defer func() {
  556. if err := producer.Close(); err != nil {
  557. log.Fatalln(err)
  558. }
  559. }()
  560. // Trap SIGINT to trigger a shutdown.
  561. signals := make(chan os.Signal, 1)
  562. signal.Notify(signals, os.Interrupt)
  563. var enqueued, errors int
  564. ProducerLoop:
  565. for {
  566. select {
  567. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  568. enqueued++
  569. case err := <-producer.Errors():
  570. log.Println("Failed to produce message", err)
  571. errors++
  572. case <-signals:
  573. break ProducerLoop
  574. }
  575. }
  576. log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
  577. }
  578. // This example shows how to use the producer with separate goroutines
  579. // reading from the Successes and Errors channels. Note that in order
  580. // for the Successes channel to be populated, you have to set
  581. // config.Producer.Return.Successes to true.
  582. func ExampleAsyncProducer_goroutines() {
  583. config := NewConfig()
  584. config.Producer.Return.Successes = true
  585. producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
  586. if err != nil {
  587. panic(err)
  588. }
  589. // Trap SIGINT to trigger a graceful shutdown.
  590. signals := make(chan os.Signal, 1)
  591. signal.Notify(signals, os.Interrupt)
  592. var (
  593. wg sync.WaitGroup
  594. enqueued, successes, errors int
  595. )
  596. wg.Add(1)
  597. go func() {
  598. defer wg.Done()
  599. for _ = range producer.Successes() {
  600. successes++
  601. }
  602. }()
  603. wg.Add(1)
  604. go func() {
  605. defer wg.Done()
  606. for err := range producer.Errors() {
  607. log.Println(err)
  608. errors++
  609. }
  610. }()
  611. ProducerLoop:
  612. for {
  613. message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  614. select {
  615. case producer.Input() <- message:
  616. enqueued++
  617. case <-signals:
  618. producer.AsyncClose() // Trigger a shutdown of the producer.
  619. break ProducerLoop
  620. }
  621. }
  622. wg.Wait()
  623. log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
  624. }