async_producer_test.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. package sarama
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "sync"
  7. "testing"
  8. )
  9. const TestMessage = "ABC THE MESSAGE"
  10. func closeProducer(t *testing.T, p AsyncProducer) {
  11. var wg sync.WaitGroup
  12. p.AsyncClose()
  13. wg.Add(2)
  14. go func() {
  15. for _ = range p.Successes() {
  16. t.Error("Unexpected message on Successes()")
  17. }
  18. wg.Done()
  19. }()
  20. go func() {
  21. for msg := range p.Errors() {
  22. t.Error(msg.Err)
  23. }
  24. wg.Done()
  25. }()
  26. wg.Wait()
  27. }
  28. func TestAsyncProducer(t *testing.T) {
  29. seedBroker := newMockBroker(t, 1)
  30. leader := newMockBroker(t, 2)
  31. metadataResponse := new(MetadataResponse)
  32. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  33. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  34. seedBroker.Returns(metadataResponse)
  35. prodSuccess := new(ProduceResponse)
  36. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  37. leader.Returns(prodSuccess)
  38. config := NewConfig()
  39. config.Producer.Flush.Messages = 10
  40. config.Producer.Return.Successes = true
  41. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  42. if err != nil {
  43. t.Fatal(err)
  44. }
  45. for i := 0; i < 10; i++ {
  46. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  47. }
  48. for i := 0; i < 10; i++ {
  49. select {
  50. case msg := <-producer.Errors():
  51. t.Error(msg.Err)
  52. if msg.Msg.flags != 0 {
  53. t.Error("Message had flags set")
  54. }
  55. case msg := <-producer.Successes():
  56. if msg.flags != 0 {
  57. t.Error("Message had flags set")
  58. }
  59. if msg.Metadata.(int) != i {
  60. t.Error("Message metadata did not match")
  61. }
  62. }
  63. }
  64. closeProducer(t, producer)
  65. leader.Close()
  66. seedBroker.Close()
  67. }
  68. func TestAsyncProducerMultipleFlushes(t *testing.T) {
  69. seedBroker := newMockBroker(t, 1)
  70. leader := newMockBroker(t, 2)
  71. metadataResponse := new(MetadataResponse)
  72. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  73. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  74. seedBroker.Returns(metadataResponse)
  75. prodSuccess := new(ProduceResponse)
  76. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  77. leader.Returns(prodSuccess)
  78. leader.Returns(prodSuccess)
  79. leader.Returns(prodSuccess)
  80. config := NewConfig()
  81. config.Producer.Flush.Messages = 5
  82. config.Producer.Return.Successes = true
  83. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  84. if err != nil {
  85. t.Fatal(err)
  86. }
  87. for flush := 0; flush < 3; flush++ {
  88. for i := 0; i < 5; i++ {
  89. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  90. }
  91. for i := 0; i < 5; i++ {
  92. select {
  93. case msg := <-producer.Errors():
  94. t.Error(msg.Err)
  95. if msg.Msg.flags != 0 {
  96. t.Error("Message had flags set")
  97. }
  98. case msg := <-producer.Successes():
  99. if msg.flags != 0 {
  100. t.Error("Message had flags set")
  101. }
  102. }
  103. }
  104. }
  105. closeProducer(t, producer)
  106. leader.Close()
  107. seedBroker.Close()
  108. }
  109. func TestAsyncProducerMultipleBrokers(t *testing.T) {
  110. seedBroker := newMockBroker(t, 1)
  111. leader0 := newMockBroker(t, 2)
  112. leader1 := newMockBroker(t, 3)
  113. metadataResponse := new(MetadataResponse)
  114. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  115. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  116. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  117. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  118. seedBroker.Returns(metadataResponse)
  119. prodResponse0 := new(ProduceResponse)
  120. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  121. leader0.Returns(prodResponse0)
  122. prodResponse1 := new(ProduceResponse)
  123. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  124. leader1.Returns(prodResponse1)
  125. config := NewConfig()
  126. config.Producer.Flush.Messages = 5
  127. config.Producer.Return.Successes = true
  128. config.Producer.Partitioner = NewRoundRobinPartitioner
  129. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  130. if err != nil {
  131. t.Fatal(err)
  132. }
  133. for i := 0; i < 10; i++ {
  134. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  135. }
  136. for i := 0; i < 10; i++ {
  137. select {
  138. case msg := <-producer.Errors():
  139. t.Error(msg.Err)
  140. if msg.Msg.flags != 0 {
  141. t.Error("Message had flags set")
  142. }
  143. case msg := <-producer.Successes():
  144. if msg.flags != 0 {
  145. t.Error("Message had flags set")
  146. }
  147. }
  148. }
  149. closeProducer(t, producer)
  150. leader1.Close()
  151. leader0.Close()
  152. seedBroker.Close()
  153. }
  154. func TestAsyncProducerFailureRetry(t *testing.T) {
  155. seedBroker := newMockBroker(t, 1)
  156. leader1 := newMockBroker(t, 2)
  157. leader2 := newMockBroker(t, 3)
  158. metadataLeader1 := new(MetadataResponse)
  159. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  160. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  161. seedBroker.Returns(metadataLeader1)
  162. config := NewConfig()
  163. config.Producer.Flush.Messages = 10
  164. config.Producer.Return.Successes = true
  165. config.Producer.Retry.Backoff = 0
  166. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  167. if err != nil {
  168. t.Fatal(err)
  169. }
  170. seedBroker.Close()
  171. for i := 0; i < 10; i++ {
  172. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  173. }
  174. prodNotLeader := new(ProduceResponse)
  175. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  176. leader1.Returns(prodNotLeader)
  177. metadataLeader2 := new(MetadataResponse)
  178. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  179. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  180. leader1.Returns(metadataLeader2)
  181. prodSuccess := new(ProduceResponse)
  182. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  183. leader2.Returns(prodSuccess)
  184. for i := 0; i < 10; i++ {
  185. select {
  186. case msg := <-producer.Errors():
  187. t.Error(msg.Err)
  188. if msg.Msg.flags != 0 {
  189. t.Error("Message had flags set")
  190. }
  191. case msg := <-producer.Successes():
  192. if msg.flags != 0 {
  193. t.Error("Message had flags set")
  194. }
  195. }
  196. }
  197. leader1.Close()
  198. for i := 0; i < 10; i++ {
  199. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  200. }
  201. leader2.Returns(prodSuccess)
  202. for i := 0; i < 10; i++ {
  203. select {
  204. case msg := <-producer.Errors():
  205. t.Error(msg.Err)
  206. if msg.Msg.flags != 0 {
  207. t.Error("Message had flags set")
  208. }
  209. case msg := <-producer.Successes():
  210. if msg.flags != 0 {
  211. t.Error("Message had flags set")
  212. }
  213. }
  214. }
  215. leader2.Close()
  216. closeProducer(t, producer)
  217. }
  218. func TestAsyncProducerBrokerBounce(t *testing.T) {
  219. seedBroker := newMockBroker(t, 1)
  220. leader := newMockBroker(t, 2)
  221. leaderAddr := leader.Addr()
  222. metadataResponse := new(MetadataResponse)
  223. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  224. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  225. seedBroker.Returns(metadataResponse)
  226. config := NewConfig()
  227. config.Producer.Flush.Messages = 10
  228. config.Producer.Return.Successes = true
  229. config.Producer.Retry.Backoff = 0
  230. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  231. if err != nil {
  232. t.Fatal(err)
  233. }
  234. for i := 0; i < 10; i++ {
  235. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  236. }
  237. leader.Close() // producer should get EOF
  238. leader = newMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  239. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  240. prodSuccess := new(ProduceResponse)
  241. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  242. leader.Returns(prodSuccess)
  243. for i := 0; i < 10; i++ {
  244. select {
  245. case msg := <-producer.Errors():
  246. t.Error(msg.Err)
  247. if msg.Msg.flags != 0 {
  248. t.Error("Message had flags set")
  249. }
  250. case msg := <-producer.Successes():
  251. if msg.flags != 0 {
  252. t.Error("Message had flags set")
  253. }
  254. }
  255. }
  256. seedBroker.Close()
  257. leader.Close()
  258. closeProducer(t, producer)
  259. }
  260. func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  261. seedBroker := newMockBroker(t, 1)
  262. leader1 := newMockBroker(t, 2)
  263. leader2 := newMockBroker(t, 3)
  264. metadataLeader1 := new(MetadataResponse)
  265. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  266. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  267. seedBroker.Returns(metadataLeader1)
  268. config := NewConfig()
  269. config.Producer.Flush.Messages = 10
  270. config.Producer.Return.Successes = true
  271. config.Producer.Retry.Max = 3
  272. config.Producer.Retry.Backoff = 0
  273. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  274. if err != nil {
  275. t.Fatal(err)
  276. }
  277. for i := 0; i < 10; i++ {
  278. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  279. }
  280. leader1.Close() // producer should get EOF
  281. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  282. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  283. // ok fine, tell it to go to leader2 finally
  284. metadataLeader2 := new(MetadataResponse)
  285. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  286. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  287. seedBroker.Returns(metadataLeader2)
  288. prodSuccess := new(ProduceResponse)
  289. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  290. leader2.Returns(prodSuccess)
  291. for i := 0; i < 10; i++ {
  292. select {
  293. case msg := <-producer.Errors():
  294. t.Error(msg.Err)
  295. if msg.Msg.flags != 0 {
  296. t.Error("Message had flags set")
  297. }
  298. case msg := <-producer.Successes():
  299. if msg.flags != 0 {
  300. t.Error("Message had flags set")
  301. }
  302. }
  303. }
  304. seedBroker.Close()
  305. leader2.Close()
  306. closeProducer(t, producer)
  307. }
  308. func TestAsyncProducerMultipleRetries(t *testing.T) {
  309. seedBroker := newMockBroker(t, 1)
  310. leader1 := newMockBroker(t, 2)
  311. leader2 := newMockBroker(t, 3)
  312. metadataLeader1 := new(MetadataResponse)
  313. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  314. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  315. seedBroker.Returns(metadataLeader1)
  316. config := NewConfig()
  317. config.Producer.Flush.Messages = 10
  318. config.Producer.Return.Successes = true
  319. config.Producer.Retry.Max = 4
  320. config.Producer.Retry.Backoff = 0
  321. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  322. if err != nil {
  323. t.Fatal(err)
  324. }
  325. for i := 0; i < 10; i++ {
  326. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  327. }
  328. prodNotLeader := new(ProduceResponse)
  329. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  330. leader1.Returns(prodNotLeader)
  331. metadataLeader2 := new(MetadataResponse)
  332. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  333. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  334. seedBroker.Returns(metadataLeader2)
  335. leader2.Returns(prodNotLeader)
  336. seedBroker.Returns(metadataLeader1)
  337. leader1.Returns(prodNotLeader)
  338. seedBroker.Returns(metadataLeader1)
  339. leader1.Returns(prodNotLeader)
  340. seedBroker.Returns(metadataLeader2)
  341. prodSuccess := new(ProduceResponse)
  342. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  343. leader2.Returns(prodSuccess)
  344. for i := 0; i < 10; i++ {
  345. select {
  346. case msg := <-producer.Errors():
  347. t.Error(msg.Err)
  348. if msg.Msg.flags != 0 {
  349. t.Error("Message had flags set")
  350. }
  351. case msg := <-producer.Successes():
  352. if msg.flags != 0 {
  353. t.Error("Message had flags set")
  354. }
  355. }
  356. }
  357. for i := 0; i < 10; i++ {
  358. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  359. }
  360. leader2.Returns(prodSuccess)
  361. for i := 0; i < 10; i++ {
  362. select {
  363. case msg := <-producer.Errors():
  364. t.Error(msg.Err)
  365. if msg.Msg.flags != 0 {
  366. t.Error("Message had flags set")
  367. }
  368. case msg := <-producer.Successes():
  369. if msg.flags != 0 {
  370. t.Error("Message had flags set")
  371. }
  372. }
  373. }
  374. seedBroker.Close()
  375. leader1.Close()
  376. leader2.Close()
  377. closeProducer(t, producer)
  378. }
  379. func TestAsyncProducerOutOfRetries(t *testing.T) {
  380. t.Skip("Enable once bug #294 is fixed.")
  381. seedBroker := newMockBroker(t, 1)
  382. leader := newMockBroker(t, 2)
  383. metadataResponse := new(MetadataResponse)
  384. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  385. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  386. seedBroker.Returns(metadataResponse)
  387. config := NewConfig()
  388. config.Producer.Flush.Messages = 10
  389. config.Producer.Return.Successes = true
  390. config.Producer.Retry.Backoff = 0
  391. config.Producer.Retry.Max = 0
  392. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  393. if err != nil {
  394. t.Fatal(err)
  395. }
  396. for i := 0; i < 10; i++ {
  397. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  398. }
  399. prodNotLeader := new(ProduceResponse)
  400. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  401. leader.Returns(prodNotLeader)
  402. for i := 0; i < 10; i++ {
  403. select {
  404. case msg := <-producer.Errors():
  405. if msg.Err != ErrNotLeaderForPartition {
  406. t.Error(msg.Err)
  407. }
  408. case <-producer.Successes():
  409. t.Error("Unexpected success")
  410. }
  411. }
  412. seedBroker.Returns(metadataResponse)
  413. for i := 0; i < 10; i++ {
  414. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  415. }
  416. prodSuccess := new(ProduceResponse)
  417. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  418. leader.Returns(prodSuccess)
  419. for i := 0; i < 10; i++ {
  420. select {
  421. case msg := <-producer.Errors():
  422. t.Error(msg.Err)
  423. case <-producer.Successes():
  424. }
  425. }
  426. leader.Close()
  427. seedBroker.Close()
  428. safeClose(t, producer)
  429. }
  430. func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
  431. seedBroker := newMockBroker(t, 1)
  432. leader := newMockBroker(t, 2)
  433. leaderAddr := leader.Addr()
  434. metadataResponse := new(MetadataResponse)
  435. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  436. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  437. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  438. seedBroker.Returns(metadataResponse)
  439. config := NewConfig()
  440. config.Producer.Return.Successes = true
  441. config.Producer.Retry.Backoff = 0
  442. config.Producer.Retry.Max = 1
  443. config.Producer.Partitioner = NewRoundRobinPartitioner
  444. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  445. if err != nil {
  446. t.Fatal(err)
  447. }
  448. // prime partition 0
  449. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  450. prodSuccess := new(ProduceResponse)
  451. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  452. leader.Returns(prodSuccess)
  453. select {
  454. case msg := <-producer.Errors():
  455. t.Error(msg.Err)
  456. case <-producer.Successes():
  457. }
  458. // prime partition 1
  459. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  460. prodSuccess = new(ProduceResponse)
  461. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  462. leader.Returns(prodSuccess)
  463. select {
  464. case msg := <-producer.Errors():
  465. t.Error(msg.Err)
  466. case <-producer.Successes():
  467. }
  468. // reboot the broker (the producer will get EOF on its existing connection)
  469. leader.Close()
  470. leader = newMockBrokerAddr(t, 2, leaderAddr)
  471. // send another message on partition 0 to trigger the EOF and retry
  472. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  473. // tell partition 0 to go to that broker again
  474. seedBroker.Returns(metadataResponse)
  475. // succeed this time
  476. prodSuccess = new(ProduceResponse)
  477. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  478. leader.Returns(prodSuccess)
  479. select {
  480. case msg := <-producer.Errors():
  481. t.Error(msg.Err)
  482. case <-producer.Successes():
  483. }
  484. // shutdown
  485. closeProducer(t, producer)
  486. seedBroker.Close()
  487. leader.Close()
  488. }
  489. // This example shows how to use the producer while simultaneously
  490. // reading the Errors channel to know about any failures.
  491. func ExampleAsyncProducer_select() {
  492. producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
  493. if err != nil {
  494. panic(err)
  495. }
  496. defer func() {
  497. if err := producer.Close(); err != nil {
  498. log.Fatalln(err)
  499. }
  500. }()
  501. // Trap SIGINT to trigger a shutdown.
  502. signals := make(chan os.Signal, 1)
  503. signal.Notify(signals, os.Interrupt)
  504. var enqueued, errors int
  505. ProducerLoop:
  506. for {
  507. select {
  508. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  509. enqueued++
  510. case err := <-producer.Errors():
  511. log.Println("Failed to produce message", err)
  512. errors++
  513. case <-signals:
  514. break ProducerLoop
  515. }
  516. }
  517. log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
  518. }
  519. // This example shows how to use the producer with separate goroutines
  520. // reading from the Successes and Errors channels. Note that in order
  521. // for the Successes channel to be populated, you have to set
  522. // config.Producer.Return.Successes to true.
  523. func ExampleAsyncProducer_goroutines() {
  524. config := NewConfig()
  525. config.Producer.Return.Successes = true
  526. producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
  527. if err != nil {
  528. panic(err)
  529. }
  530. // Trap SIGINT to trigger a graceful shutdown.
  531. signals := make(chan os.Signal, 1)
  532. signal.Notify(signals, os.Interrupt)
  533. var (
  534. wg sync.WaitGroup
  535. enqueued, successes, errors int
  536. )
  537. wg.Add(1)
  538. go func() {
  539. defer wg.Done()
  540. for _ = range producer.Successes() {
  541. successes++
  542. }
  543. }()
  544. wg.Add(1)
  545. go func() {
  546. defer wg.Done()
  547. for err := range producer.Errors() {
  548. log.Println(err)
  549. errors++
  550. }
  551. }()
  552. ProducerLoop:
  553. for {
  554. message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  555. select {
  556. case producer.Input() <- message:
  557. enqueued++
  558. case <-signals:
  559. producer.AsyncClose() // Trigger a shutdown of the producer.
  560. break ProducerLoop
  561. }
  562. }
  563. wg.Wait()
  564. log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
  565. }