async_producer_test.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667
  1. package sarama
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "sync"
  7. "testing"
  8. "time"
  9. )
  10. const TestMessage = "ABC THE MESSAGE"
  11. func closeProducer(t *testing.T, p AsyncProducer) {
  12. var wg sync.WaitGroup
  13. p.AsyncClose()
  14. wg.Add(2)
  15. go func() {
  16. for _ = range p.Successes() {
  17. t.Error("Unexpected message on Successes()")
  18. }
  19. wg.Done()
  20. }()
  21. go func() {
  22. for msg := range p.Errors() {
  23. t.Error(msg.Err)
  24. }
  25. wg.Done()
  26. }()
  27. wg.Wait()
  28. }
  29. func expectSuccesses(t *testing.T, p AsyncProducer, successes int) {
  30. for i := 0; i < successes; i++ {
  31. select {
  32. case msg := <-p.Errors():
  33. t.Error(msg.Err)
  34. if msg.Msg.flags != 0 {
  35. t.Error("Message had flags set")
  36. }
  37. case msg := <-p.Successes():
  38. if msg.flags != 0 {
  39. t.Error("Message had flags set")
  40. }
  41. }
  42. }
  43. }
  44. func TestAsyncProducer(t *testing.T) {
  45. seedBroker := newMockBroker(t, 1)
  46. leader := newMockBroker(t, 2)
  47. metadataResponse := new(MetadataResponse)
  48. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  49. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  50. seedBroker.Returns(metadataResponse)
  51. prodSuccess := new(ProduceResponse)
  52. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  53. leader.Returns(prodSuccess)
  54. config := NewConfig()
  55. config.Producer.Flush.Messages = 10
  56. config.Producer.Return.Successes = true
  57. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  58. if err != nil {
  59. t.Fatal(err)
  60. }
  61. for i := 0; i < 10; i++ {
  62. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  63. }
  64. for i := 0; i < 10; i++ {
  65. select {
  66. case msg := <-producer.Errors():
  67. t.Error(msg.Err)
  68. if msg.Msg.flags != 0 {
  69. t.Error("Message had flags set")
  70. }
  71. case msg := <-producer.Successes():
  72. if msg.flags != 0 {
  73. t.Error("Message had flags set")
  74. }
  75. if msg.Metadata.(int) != i {
  76. t.Error("Message metadata did not match")
  77. }
  78. }
  79. }
  80. closeProducer(t, producer)
  81. leader.Close()
  82. seedBroker.Close()
  83. }
  84. func TestAsyncProducerMultipleFlushes(t *testing.T) {
  85. seedBroker := newMockBroker(t, 1)
  86. leader := newMockBroker(t, 2)
  87. metadataResponse := new(MetadataResponse)
  88. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  89. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  90. seedBroker.Returns(metadataResponse)
  91. prodSuccess := new(ProduceResponse)
  92. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  93. leader.Returns(prodSuccess)
  94. leader.Returns(prodSuccess)
  95. leader.Returns(prodSuccess)
  96. config := NewConfig()
  97. config.Producer.Flush.Messages = 5
  98. config.Producer.Return.Successes = true
  99. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  100. if err != nil {
  101. t.Fatal(err)
  102. }
  103. for flush := 0; flush < 3; flush++ {
  104. for i := 0; i < 5; i++ {
  105. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  106. }
  107. expectSuccesses(t, producer, 5)
  108. }
  109. closeProducer(t, producer)
  110. leader.Close()
  111. seedBroker.Close()
  112. }
  113. func TestAsyncProducerMultipleBrokers(t *testing.T) {
  114. seedBroker := newMockBroker(t, 1)
  115. leader0 := newMockBroker(t, 2)
  116. leader1 := newMockBroker(t, 3)
  117. metadataResponse := new(MetadataResponse)
  118. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  119. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  120. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  121. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  122. seedBroker.Returns(metadataResponse)
  123. prodResponse0 := new(ProduceResponse)
  124. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  125. leader0.Returns(prodResponse0)
  126. prodResponse1 := new(ProduceResponse)
  127. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  128. leader1.Returns(prodResponse1)
  129. config := NewConfig()
  130. config.Producer.Flush.Messages = 5
  131. config.Producer.Return.Successes = true
  132. config.Producer.Partitioner = NewRoundRobinPartitioner
  133. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  134. if err != nil {
  135. t.Fatal(err)
  136. }
  137. for i := 0; i < 10; i++ {
  138. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  139. }
  140. expectSuccesses(t, producer, 10)
  141. closeProducer(t, producer)
  142. leader1.Close()
  143. leader0.Close()
  144. seedBroker.Close()
  145. }
  146. func TestAsyncProducerFailureRetry(t *testing.T) {
  147. seedBroker := newMockBroker(t, 1)
  148. leader1 := newMockBroker(t, 2)
  149. leader2 := newMockBroker(t, 3)
  150. metadataLeader1 := new(MetadataResponse)
  151. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  152. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  153. seedBroker.Returns(metadataLeader1)
  154. config := NewConfig()
  155. config.Producer.Flush.Messages = 10
  156. config.Producer.Return.Successes = true
  157. config.Producer.Retry.Backoff = 0
  158. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  159. if err != nil {
  160. t.Fatal(err)
  161. }
  162. seedBroker.Close()
  163. for i := 0; i < 10; i++ {
  164. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  165. }
  166. prodNotLeader := new(ProduceResponse)
  167. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  168. leader1.Returns(prodNotLeader)
  169. metadataLeader2 := new(MetadataResponse)
  170. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  171. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  172. leader1.Returns(metadataLeader2)
  173. prodSuccess := new(ProduceResponse)
  174. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  175. leader2.Returns(prodSuccess)
  176. expectSuccesses(t, producer, 10)
  177. leader1.Close()
  178. for i := 0; i < 10; i++ {
  179. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  180. }
  181. leader2.Returns(prodSuccess)
  182. expectSuccesses(t, producer, 10)
  183. leader2.Close()
  184. closeProducer(t, producer)
  185. }
  186. func TestAsyncProducerBrokerBounce(t *testing.T) {
  187. seedBroker := newMockBroker(t, 1)
  188. leader := newMockBroker(t, 2)
  189. leaderAddr := leader.Addr()
  190. metadataResponse := new(MetadataResponse)
  191. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  192. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  193. seedBroker.Returns(metadataResponse)
  194. config := NewConfig()
  195. config.Producer.Flush.Messages = 10
  196. config.Producer.Return.Successes = true
  197. config.Producer.Retry.Backoff = 0
  198. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  199. if err != nil {
  200. t.Fatal(err)
  201. }
  202. for i := 0; i < 10; i++ {
  203. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  204. }
  205. leader.Close() // producer should get EOF
  206. leader = newMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  207. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  208. prodSuccess := new(ProduceResponse)
  209. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  210. leader.Returns(prodSuccess)
  211. expectSuccesses(t, producer, 10)
  212. seedBroker.Close()
  213. leader.Close()
  214. closeProducer(t, producer)
  215. }
  216. func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  217. seedBroker := newMockBroker(t, 1)
  218. leader1 := newMockBroker(t, 2)
  219. leader2 := newMockBroker(t, 3)
  220. metadataLeader1 := new(MetadataResponse)
  221. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  222. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  223. seedBroker.Returns(metadataLeader1)
  224. config := NewConfig()
  225. config.Producer.Flush.Messages = 10
  226. config.Producer.Return.Successes = true
  227. config.Producer.Retry.Max = 3
  228. config.Producer.Retry.Backoff = 0
  229. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  230. if err != nil {
  231. t.Fatal(err)
  232. }
  233. for i := 0; i < 10; i++ {
  234. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  235. }
  236. leader1.Close() // producer should get EOF
  237. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  238. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  239. // ok fine, tell it to go to leader2 finally
  240. metadataLeader2 := new(MetadataResponse)
  241. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  242. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  243. seedBroker.Returns(metadataLeader2)
  244. prodSuccess := new(ProduceResponse)
  245. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  246. leader2.Returns(prodSuccess)
  247. expectSuccesses(t, producer, 10)
  248. seedBroker.Close()
  249. leader2.Close()
  250. closeProducer(t, producer)
  251. }
  252. func TestAsyncProducerMultipleRetries(t *testing.T) {
  253. seedBroker := newMockBroker(t, 1)
  254. leader1 := newMockBroker(t, 2)
  255. leader2 := newMockBroker(t, 3)
  256. metadataLeader1 := new(MetadataResponse)
  257. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  258. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  259. seedBroker.Returns(metadataLeader1)
  260. config := NewConfig()
  261. config.Producer.Flush.Messages = 10
  262. config.Producer.Return.Successes = true
  263. config.Producer.Retry.Max = 4
  264. config.Producer.Retry.Backoff = 0
  265. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  266. if err != nil {
  267. t.Fatal(err)
  268. }
  269. for i := 0; i < 10; i++ {
  270. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  271. }
  272. prodNotLeader := new(ProduceResponse)
  273. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  274. leader1.Returns(prodNotLeader)
  275. metadataLeader2 := new(MetadataResponse)
  276. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  277. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  278. seedBroker.Returns(metadataLeader2)
  279. leader2.Returns(prodNotLeader)
  280. seedBroker.Returns(metadataLeader1)
  281. leader1.Returns(prodNotLeader)
  282. seedBroker.Returns(metadataLeader1)
  283. leader1.Returns(prodNotLeader)
  284. seedBroker.Returns(metadataLeader2)
  285. prodSuccess := new(ProduceResponse)
  286. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  287. leader2.Returns(prodSuccess)
  288. expectSuccesses(t, producer, 10)
  289. for i := 0; i < 10; i++ {
  290. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  291. }
  292. leader2.Returns(prodSuccess)
  293. expectSuccesses(t, producer, 10)
  294. seedBroker.Close()
  295. leader1.Close()
  296. leader2.Close()
  297. closeProducer(t, producer)
  298. }
  299. func TestAsyncProducerOutOfRetries(t *testing.T) {
  300. t.Skip("Enable once bug #294 is fixed.")
  301. seedBroker := newMockBroker(t, 1)
  302. leader := newMockBroker(t, 2)
  303. metadataResponse := new(MetadataResponse)
  304. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  305. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  306. seedBroker.Returns(metadataResponse)
  307. config := NewConfig()
  308. config.Producer.Flush.Messages = 10
  309. config.Producer.Return.Successes = true
  310. config.Producer.Retry.Backoff = 0
  311. config.Producer.Retry.Max = 0
  312. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  313. if err != nil {
  314. t.Fatal(err)
  315. }
  316. for i := 0; i < 10; i++ {
  317. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  318. }
  319. prodNotLeader := new(ProduceResponse)
  320. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  321. leader.Returns(prodNotLeader)
  322. for i := 0; i < 10; i++ {
  323. select {
  324. case msg := <-producer.Errors():
  325. if msg.Err != ErrNotLeaderForPartition {
  326. t.Error(msg.Err)
  327. }
  328. case <-producer.Successes():
  329. t.Error("Unexpected success")
  330. }
  331. }
  332. seedBroker.Returns(metadataResponse)
  333. for i := 0; i < 10; i++ {
  334. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  335. }
  336. prodSuccess := new(ProduceResponse)
  337. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  338. leader.Returns(prodSuccess)
  339. expectSuccesses(t, producer, 10)
  340. leader.Close()
  341. seedBroker.Close()
  342. safeClose(t, producer)
  343. }
  344. func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
  345. seedBroker := newMockBroker(t, 1)
  346. leader := newMockBroker(t, 2)
  347. leaderAddr := leader.Addr()
  348. metadataResponse := new(MetadataResponse)
  349. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  350. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  351. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  352. seedBroker.Returns(metadataResponse)
  353. config := NewConfig()
  354. config.Producer.Return.Successes = true
  355. config.Producer.Retry.Backoff = 0
  356. config.Producer.Retry.Max = 1
  357. config.Producer.Partitioner = NewRoundRobinPartitioner
  358. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  359. if err != nil {
  360. t.Fatal(err)
  361. }
  362. // prime partition 0
  363. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  364. prodSuccess := new(ProduceResponse)
  365. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  366. leader.Returns(prodSuccess)
  367. expectSuccesses(t, producer, 1)
  368. // prime partition 1
  369. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  370. prodSuccess = new(ProduceResponse)
  371. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  372. leader.Returns(prodSuccess)
  373. expectSuccesses(t, producer, 1)
  374. // reboot the broker (the producer will get EOF on its existing connection)
  375. leader.Close()
  376. leader = newMockBrokerAddr(t, 2, leaderAddr)
  377. // send another message on partition 0 to trigger the EOF and retry
  378. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  379. // tell partition 0 to go to that broker again
  380. seedBroker.Returns(metadataResponse)
  381. // succeed this time
  382. prodSuccess = new(ProduceResponse)
  383. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  384. leader.Returns(prodSuccess)
  385. expectSuccesses(t, producer, 1)
  386. // shutdown
  387. closeProducer(t, producer)
  388. seedBroker.Close()
  389. leader.Close()
  390. }
  391. func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
  392. seedBroker := newMockBroker(t, 1)
  393. leader := newMockBroker(t, 2)
  394. metadataResponse := new(MetadataResponse)
  395. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  396. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  397. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  398. seedBroker.Returns(metadataResponse)
  399. config := NewConfig()
  400. config.Producer.Flush.Messages = 5
  401. config.Producer.Return.Successes = true
  402. config.Producer.Retry.Backoff = 0
  403. config.Producer.Retry.Max = 1
  404. config.Producer.Partitioner = NewManualPartitioner
  405. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  406. if err != nil {
  407. t.Fatal(err)
  408. }
  409. // prime partitions
  410. for p := int32(0); p < 2; p++ {
  411. for i := 0; i < 5; i++ {
  412. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
  413. }
  414. prodSuccess := new(ProduceResponse)
  415. prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
  416. leader.Returns(prodSuccess)
  417. expectSuccesses(t, producer, 5)
  418. }
  419. // send more messages on partition 0
  420. for i := 0; i < 5; i++ {
  421. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  422. }
  423. prodNotLeader := new(ProduceResponse)
  424. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  425. leader.Returns(prodNotLeader)
  426. // tell partition 0 to go to that broker again
  427. seedBroker.Returns(metadataResponse)
  428. // succeed this time
  429. prodSuccess := new(ProduceResponse)
  430. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  431. leader.Returns(prodSuccess)
  432. expectSuccesses(t, producer, 5)
  433. // put five more through
  434. for i := 0; i < 5; i++ {
  435. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  436. }
  437. leader.Returns(prodSuccess)
  438. expectSuccesses(t, producer, 5)
  439. // shutdown
  440. closeProducer(t, producer)
  441. seedBroker.Close()
  442. leader.Close()
  443. }
  444. func TestAsyncProducerRetryShutdown(t *testing.T) {
  445. seedBroker := newMockBroker(t, 1)
  446. leader := newMockBroker(t, 2)
  447. metadataLeader := new(MetadataResponse)
  448. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  449. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  450. seedBroker.Returns(metadataLeader)
  451. config := NewConfig()
  452. config.Producer.Flush.Messages = 10
  453. config.Producer.Return.Successes = true
  454. config.Producer.Retry.Backoff = 0
  455. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  456. if err != nil {
  457. t.Fatal(err)
  458. }
  459. for i := 0; i < 10; i++ {
  460. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  461. }
  462. producer.AsyncClose()
  463. time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
  464. producer.Input() <- &ProducerMessage{Topic: "FOO"}
  465. if err := <-producer.Errors(); err.Err != ErrShuttingDown {
  466. t.Error(err)
  467. }
  468. prodNotLeader := new(ProduceResponse)
  469. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  470. leader.Returns(prodNotLeader)
  471. seedBroker.Returns(metadataLeader)
  472. prodSuccess := new(ProduceResponse)
  473. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  474. leader.Returns(prodSuccess)
  475. expectSuccesses(t, producer, 10)
  476. seedBroker.Close()
  477. leader.Close()
  478. // wait for the async-closed producer to shut down fully
  479. for err := range producer.Errors() {
  480. t.Error(err)
  481. }
  482. }
  483. // This example shows how to use the producer while simultaneously
  484. // reading the Errors channel to know about any failures.
  485. func ExampleAsyncProducer_select() {
  486. producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
  487. if err != nil {
  488. panic(err)
  489. }
  490. defer func() {
  491. if err := producer.Close(); err != nil {
  492. log.Fatalln(err)
  493. }
  494. }()
  495. // Trap SIGINT to trigger a shutdown.
  496. signals := make(chan os.Signal, 1)
  497. signal.Notify(signals, os.Interrupt)
  498. var enqueued, errors int
  499. ProducerLoop:
  500. for {
  501. select {
  502. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  503. enqueued++
  504. case err := <-producer.Errors():
  505. log.Println("Failed to produce message", err)
  506. errors++
  507. case <-signals:
  508. break ProducerLoop
  509. }
  510. }
  511. log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
  512. }
  513. // This example shows how to use the producer with separate goroutines
  514. // reading from the Successes and Errors channels. Note that in order
  515. // for the Successes channel to be populated, you have to set
  516. // config.Producer.Return.Successes to true.
  517. func ExampleAsyncProducer_goroutines() {
  518. config := NewConfig()
  519. config.Producer.Return.Successes = true
  520. producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
  521. if err != nil {
  522. panic(err)
  523. }
  524. // Trap SIGINT to trigger a graceful shutdown.
  525. signals := make(chan os.Signal, 1)
  526. signal.Notify(signals, os.Interrupt)
  527. var (
  528. wg sync.WaitGroup
  529. enqueued, successes, errors int
  530. )
  531. wg.Add(1)
  532. go func() {
  533. defer wg.Done()
  534. for _ = range producer.Successes() {
  535. successes++
  536. }
  537. }()
  538. wg.Add(1)
  539. go func() {
  540. defer wg.Done()
  541. for err := range producer.Errors() {
  542. log.Println(err)
  543. errors++
  544. }
  545. }()
  546. ProducerLoop:
  547. for {
  548. message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  549. select {
  550. case producer.Input() <- message:
  551. enqueued++
  552. case <-signals:
  553. producer.AsyncClose() // Trigger a shutdown of the producer.
  554. break ProducerLoop
  555. }
  556. }
  557. wg.Wait()
  558. log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
  559. }