async_producer_test.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. package sarama
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "sync"
  7. "testing"
  8. )
  9. const TestMessage = "ABC THE MESSAGE"
  10. func closeProducer(t *testing.T, p AsyncProducer) {
  11. var wg sync.WaitGroup
  12. p.AsyncClose()
  13. wg.Add(2)
  14. go func() {
  15. for _ = range p.Successes() {
  16. t.Error("Unexpected message on Successes()")
  17. }
  18. wg.Done()
  19. }()
  20. go func() {
  21. for msg := range p.Errors() {
  22. t.Error(msg.Err)
  23. }
  24. wg.Done()
  25. }()
  26. wg.Wait()
  27. }
  28. func TestSyncProducer(t *testing.T) {
  29. seedBroker := newMockBroker(t, 1)
  30. leader := newMockBroker(t, 2)
  31. metadataResponse := new(MetadataResponse)
  32. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  33. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  34. seedBroker.Returns(metadataResponse)
  35. prodSuccess := new(ProduceResponse)
  36. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  37. for i := 0; i < 10; i++ {
  38. leader.Returns(prodSuccess)
  39. }
  40. producer, err := NewSyncProducer([]string{seedBroker.Addr()}, nil)
  41. if err != nil {
  42. t.Fatal(err)
  43. }
  44. for i := 0; i < 10; i++ {
  45. partition, offset, err := producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
  46. if partition != 0 {
  47. t.Error("Unexpected partition")
  48. }
  49. if offset != 0 {
  50. t.Error("Unexpected offset")
  51. }
  52. if err != nil {
  53. t.Error(err)
  54. }
  55. }
  56. safeClose(t, producer)
  57. leader.Close()
  58. seedBroker.Close()
  59. }
  60. func TestConcurrentSyncProducer(t *testing.T) {
  61. seedBroker := newMockBroker(t, 1)
  62. leader := newMockBroker(t, 2)
  63. metadataResponse := new(MetadataResponse)
  64. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  65. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  66. seedBroker.Returns(metadataResponse)
  67. prodSuccess := new(ProduceResponse)
  68. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  69. leader.Returns(prodSuccess)
  70. config := NewConfig()
  71. config.Producer.Flush.Messages = 100
  72. producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
  73. if err != nil {
  74. t.Fatal(err)
  75. }
  76. wg := sync.WaitGroup{}
  77. for i := 0; i < 100; i++ {
  78. wg.Add(1)
  79. go func() {
  80. partition, _, err := producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
  81. if partition != 0 {
  82. t.Error("Unexpected partition")
  83. }
  84. if err != nil {
  85. t.Error(err)
  86. }
  87. wg.Done()
  88. }()
  89. }
  90. wg.Wait()
  91. safeClose(t, producer)
  92. leader.Close()
  93. seedBroker.Close()
  94. }
  95. func TestProducer(t *testing.T) {
  96. seedBroker := newMockBroker(t, 1)
  97. leader := newMockBroker(t, 2)
  98. metadataResponse := new(MetadataResponse)
  99. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  100. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  101. seedBroker.Returns(metadataResponse)
  102. prodSuccess := new(ProduceResponse)
  103. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  104. leader.Returns(prodSuccess)
  105. config := NewConfig()
  106. config.Producer.Flush.Messages = 10
  107. config.Producer.Return.Successes = true
  108. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  109. if err != nil {
  110. t.Fatal(err)
  111. }
  112. for i := 0; i < 10; i++ {
  113. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  114. }
  115. for i := 0; i < 10; i++ {
  116. select {
  117. case msg := <-producer.Errors():
  118. t.Error(msg.Err)
  119. if msg.Msg.flags != 0 {
  120. t.Error("Message had flags set")
  121. }
  122. case msg := <-producer.Successes():
  123. if msg.flags != 0 {
  124. t.Error("Message had flags set")
  125. }
  126. if msg.Metadata.(int) != i {
  127. t.Error("Message metadata did not match")
  128. }
  129. }
  130. }
  131. closeProducer(t, producer)
  132. leader.Close()
  133. seedBroker.Close()
  134. }
  135. func TestProducerMultipleFlushes(t *testing.T) {
  136. seedBroker := newMockBroker(t, 1)
  137. leader := newMockBroker(t, 2)
  138. metadataResponse := new(MetadataResponse)
  139. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  140. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  141. seedBroker.Returns(metadataResponse)
  142. prodSuccess := new(ProduceResponse)
  143. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  144. leader.Returns(prodSuccess)
  145. leader.Returns(prodSuccess)
  146. leader.Returns(prodSuccess)
  147. config := NewConfig()
  148. config.Producer.Flush.Messages = 5
  149. config.Producer.Return.Successes = true
  150. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  151. if err != nil {
  152. t.Fatal(err)
  153. }
  154. for flush := 0; flush < 3; flush++ {
  155. for i := 0; i < 5; i++ {
  156. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  157. }
  158. for i := 0; i < 5; i++ {
  159. select {
  160. case msg := <-producer.Errors():
  161. t.Error(msg.Err)
  162. if msg.Msg.flags != 0 {
  163. t.Error("Message had flags set")
  164. }
  165. case msg := <-producer.Successes():
  166. if msg.flags != 0 {
  167. t.Error("Message had flags set")
  168. }
  169. }
  170. }
  171. }
  172. closeProducer(t, producer)
  173. leader.Close()
  174. seedBroker.Close()
  175. }
  176. func TestProducerMultipleBrokers(t *testing.T) {
  177. seedBroker := newMockBroker(t, 1)
  178. leader0 := newMockBroker(t, 2)
  179. leader1 := newMockBroker(t, 3)
  180. metadataResponse := new(MetadataResponse)
  181. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  182. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  183. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  184. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  185. seedBroker.Returns(metadataResponse)
  186. prodResponse0 := new(ProduceResponse)
  187. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  188. leader0.Returns(prodResponse0)
  189. prodResponse1 := new(ProduceResponse)
  190. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  191. leader1.Returns(prodResponse1)
  192. config := NewConfig()
  193. config.Producer.Flush.Messages = 5
  194. config.Producer.Return.Successes = true
  195. config.Producer.Partitioner = NewRoundRobinPartitioner
  196. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  197. if err != nil {
  198. t.Fatal(err)
  199. }
  200. for i := 0; i < 10; i++ {
  201. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  202. }
  203. for i := 0; i < 10; i++ {
  204. select {
  205. case msg := <-producer.Errors():
  206. t.Error(msg.Err)
  207. if msg.Msg.flags != 0 {
  208. t.Error("Message had flags set")
  209. }
  210. case msg := <-producer.Successes():
  211. if msg.flags != 0 {
  212. t.Error("Message had flags set")
  213. }
  214. }
  215. }
  216. closeProducer(t, producer)
  217. leader1.Close()
  218. leader0.Close()
  219. seedBroker.Close()
  220. }
  221. func TestProducerFailureRetry(t *testing.T) {
  222. seedBroker := newMockBroker(t, 1)
  223. leader1 := newMockBroker(t, 2)
  224. leader2 := newMockBroker(t, 3)
  225. metadataLeader1 := new(MetadataResponse)
  226. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  227. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  228. seedBroker.Returns(metadataLeader1)
  229. config := NewConfig()
  230. config.Producer.Flush.Messages = 10
  231. config.Producer.Return.Successes = true
  232. config.Producer.Retry.Backoff = 0
  233. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  234. if err != nil {
  235. t.Fatal(err)
  236. }
  237. seedBroker.Close()
  238. for i := 0; i < 10; i++ {
  239. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  240. }
  241. prodNotLeader := new(ProduceResponse)
  242. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  243. leader1.Returns(prodNotLeader)
  244. metadataLeader2 := new(MetadataResponse)
  245. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  246. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  247. leader1.Returns(metadataLeader2)
  248. prodSuccess := new(ProduceResponse)
  249. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  250. leader2.Returns(prodSuccess)
  251. for i := 0; i < 10; i++ {
  252. select {
  253. case msg := <-producer.Errors():
  254. t.Error(msg.Err)
  255. if msg.Msg.flags != 0 {
  256. t.Error("Message had flags set")
  257. }
  258. case msg := <-producer.Successes():
  259. if msg.flags != 0 {
  260. t.Error("Message had flags set")
  261. }
  262. }
  263. }
  264. leader1.Close()
  265. for i := 0; i < 10; i++ {
  266. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  267. }
  268. leader2.Returns(prodSuccess)
  269. for i := 0; i < 10; i++ {
  270. select {
  271. case msg := <-producer.Errors():
  272. t.Error(msg.Err)
  273. if msg.Msg.flags != 0 {
  274. t.Error("Message had flags set")
  275. }
  276. case msg := <-producer.Successes():
  277. if msg.flags != 0 {
  278. t.Error("Message had flags set")
  279. }
  280. }
  281. }
  282. leader2.Close()
  283. closeProducer(t, producer)
  284. }
  285. func TestProducerBrokerBounce(t *testing.T) {
  286. seedBroker := newMockBroker(t, 1)
  287. leader := newMockBroker(t, 2)
  288. leaderAddr := leader.Addr()
  289. metadataResponse := new(MetadataResponse)
  290. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  291. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  292. seedBroker.Returns(metadataResponse)
  293. config := NewConfig()
  294. config.Producer.Flush.Messages = 10
  295. config.Producer.Return.Successes = true
  296. config.Producer.Retry.Backoff = 0
  297. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  298. if err != nil {
  299. t.Fatal(err)
  300. }
  301. for i := 0; i < 10; i++ {
  302. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  303. }
  304. leader.Close() // producer should get EOF
  305. leader = newMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  306. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  307. prodSuccess := new(ProduceResponse)
  308. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  309. leader.Returns(prodSuccess)
  310. for i := 0; i < 10; i++ {
  311. select {
  312. case msg := <-producer.Errors():
  313. t.Error(msg.Err)
  314. if msg.Msg.flags != 0 {
  315. t.Error("Message had flags set")
  316. }
  317. case msg := <-producer.Successes():
  318. if msg.flags != 0 {
  319. t.Error("Message had flags set")
  320. }
  321. }
  322. }
  323. seedBroker.Close()
  324. leader.Close()
  325. closeProducer(t, producer)
  326. }
  327. func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  328. seedBroker := newMockBroker(t, 1)
  329. leader1 := newMockBroker(t, 2)
  330. leader2 := newMockBroker(t, 3)
  331. metadataLeader1 := new(MetadataResponse)
  332. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  333. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  334. seedBroker.Returns(metadataLeader1)
  335. config := NewConfig()
  336. config.Producer.Flush.Messages = 10
  337. config.Producer.Return.Successes = true
  338. config.Producer.Retry.Max = 3
  339. config.Producer.Retry.Backoff = 0
  340. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  341. if err != nil {
  342. t.Fatal(err)
  343. }
  344. for i := 0; i < 10; i++ {
  345. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  346. }
  347. leader1.Close() // producer should get EOF
  348. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  349. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  350. // ok fine, tell it to go to leader2 finally
  351. metadataLeader2 := new(MetadataResponse)
  352. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  353. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  354. seedBroker.Returns(metadataLeader2)
  355. prodSuccess := new(ProduceResponse)
  356. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  357. leader2.Returns(prodSuccess)
  358. for i := 0; i < 10; i++ {
  359. select {
  360. case msg := <-producer.Errors():
  361. t.Error(msg.Err)
  362. if msg.Msg.flags != 0 {
  363. t.Error("Message had flags set")
  364. }
  365. case msg := <-producer.Successes():
  366. if msg.flags != 0 {
  367. t.Error("Message had flags set")
  368. }
  369. }
  370. }
  371. seedBroker.Close()
  372. leader2.Close()
  373. closeProducer(t, producer)
  374. }
  375. func TestProducerMultipleRetries(t *testing.T) {
  376. seedBroker := newMockBroker(t, 1)
  377. leader1 := newMockBroker(t, 2)
  378. leader2 := newMockBroker(t, 3)
  379. metadataLeader1 := new(MetadataResponse)
  380. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  381. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  382. seedBroker.Returns(metadataLeader1)
  383. config := NewConfig()
  384. config.Producer.Flush.Messages = 10
  385. config.Producer.Return.Successes = true
  386. config.Producer.Retry.Max = 4
  387. config.Producer.Retry.Backoff = 0
  388. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  389. if err != nil {
  390. t.Fatal(err)
  391. }
  392. for i := 0; i < 10; i++ {
  393. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  394. }
  395. prodNotLeader := new(ProduceResponse)
  396. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  397. leader1.Returns(prodNotLeader)
  398. metadataLeader2 := new(MetadataResponse)
  399. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  400. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  401. seedBroker.Returns(metadataLeader2)
  402. leader2.Returns(prodNotLeader)
  403. seedBroker.Returns(metadataLeader1)
  404. leader1.Returns(prodNotLeader)
  405. seedBroker.Returns(metadataLeader1)
  406. leader1.Returns(prodNotLeader)
  407. seedBroker.Returns(metadataLeader2)
  408. prodSuccess := new(ProduceResponse)
  409. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  410. leader2.Returns(prodSuccess)
  411. for i := 0; i < 10; i++ {
  412. select {
  413. case msg := <-producer.Errors():
  414. t.Error(msg.Err)
  415. if msg.Msg.flags != 0 {
  416. t.Error("Message had flags set")
  417. }
  418. case msg := <-producer.Successes():
  419. if msg.flags != 0 {
  420. t.Error("Message had flags set")
  421. }
  422. }
  423. }
  424. for i := 0; i < 10; i++ {
  425. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  426. }
  427. leader2.Returns(prodSuccess)
  428. for i := 0; i < 10; i++ {
  429. select {
  430. case msg := <-producer.Errors():
  431. t.Error(msg.Err)
  432. if msg.Msg.flags != 0 {
  433. t.Error("Message had flags set")
  434. }
  435. case msg := <-producer.Successes():
  436. if msg.flags != 0 {
  437. t.Error("Message had flags set")
  438. }
  439. }
  440. }
  441. seedBroker.Close()
  442. leader1.Close()
  443. leader2.Close()
  444. closeProducer(t, producer)
  445. }
  446. func TestProducerOutOfRetries(t *testing.T) {
  447. t.Skip("Enable once bug #294 is fixed.")
  448. seedBroker := newMockBroker(t, 1)
  449. leader := newMockBroker(t, 2)
  450. metadataResponse := new(MetadataResponse)
  451. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  452. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  453. seedBroker.Returns(metadataResponse)
  454. config := NewConfig()
  455. config.Producer.Flush.Messages = 10
  456. config.Producer.Return.Successes = true
  457. config.Producer.Retry.Backoff = 0
  458. config.Producer.Retry.Max = 0
  459. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  460. if err != nil {
  461. t.Fatal(err)
  462. }
  463. for i := 0; i < 10; i++ {
  464. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  465. }
  466. prodNotLeader := new(ProduceResponse)
  467. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  468. leader.Returns(prodNotLeader)
  469. for i := 0; i < 10; i++ {
  470. select {
  471. case msg := <-producer.Errors():
  472. if msg.Err != ErrNotLeaderForPartition {
  473. t.Error(msg.Err)
  474. }
  475. case <-producer.Successes():
  476. t.Error("Unexpected success")
  477. }
  478. }
  479. seedBroker.Returns(metadataResponse)
  480. for i := 0; i < 10; i++ {
  481. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  482. }
  483. prodSuccess := new(ProduceResponse)
  484. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  485. leader.Returns(prodSuccess)
  486. for i := 0; i < 10; i++ {
  487. select {
  488. case msg := <-producer.Errors():
  489. t.Error(msg.Err)
  490. case <-producer.Successes():
  491. }
  492. }
  493. leader.Close()
  494. seedBroker.Close()
  495. safeClose(t, producer)
  496. }
  497. // This example shows how to use the producer while simultaneously
  498. // reading the Errors channel to know about any failures.
  499. func ExampleAsyncProducer_select() {
  500. producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
  501. if err != nil {
  502. panic(err)
  503. }
  504. defer func() {
  505. if err := producer.Close(); err != nil {
  506. log.Fatalln(err)
  507. }
  508. }()
  509. // Trap SIGINT to trigger a shutdown.
  510. signals := make(chan os.Signal, 1)
  511. signal.Notify(signals, os.Interrupt)
  512. var enqueued, errors int
  513. ProducerLoop:
  514. for {
  515. select {
  516. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  517. enqueued++
  518. case err := <-producer.Errors():
  519. log.Println("Failed to produce message", err)
  520. errors++
  521. case <-signals:
  522. break ProducerLoop
  523. }
  524. }
  525. log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
  526. }
  527. // This example shows how to use the producer with separate goroutines
  528. // reading from the Successes and Errors channels. Note that in order
  529. // for the Successes channel to be populated, you have to set
  530. // config.Producer.Return.Successes to true.
  531. func ExampleAsyncProducer_goroutines() {
  532. config := NewConfig()
  533. config.Producer.Return.Successes = true
  534. producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
  535. if err != nil {
  536. panic(err)
  537. }
  538. // Trap SIGINT to trigger a graceful shutdown.
  539. signals := make(chan os.Signal, 1)
  540. signal.Notify(signals, os.Interrupt)
  541. var (
  542. wg sync.WaitGroup
  543. enqueued, successes, errors int
  544. )
  545. wg.Add(1)
  546. go func() {
  547. defer wg.Done()
  548. for _ = range producer.Successes() {
  549. successes++
  550. }
  551. }()
  552. wg.Add(1)
  553. go func() {
  554. defer wg.Done()
  555. for err := range producer.Errors() {
  556. log.Println(err)
  557. errors++
  558. }
  559. }()
  560. ProducerLoop:
  561. for {
  562. message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  563. select {
  564. case producer.Input() <- message:
  565. enqueued++
  566. case <-signals:
  567. producer.AsyncClose() // Trigger a shutdown of the producer.
  568. break ProducerLoop
  569. }
  570. }
  571. wg.Wait()
  572. log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
  573. }
  574. // This example shows the basic usage pattern of the SyncProducer.
  575. func ExampleSyncProducer() {
  576. producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
  577. if err != nil {
  578. log.Fatalln(err)
  579. }
  580. defer func() {
  581. if err := producer.Close(); err != nil {
  582. log.Fatalln(err)
  583. }
  584. }()
  585. partition, offset, err := producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
  586. if err != nil {
  587. log.Printf("FAILED to send message: %s\n", err)
  588. } else {
  589. log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
  590. }
  591. }