async_producer_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. package sarama
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "sync"
  7. "testing"
  8. )
  9. const TestMessage = "ABC THE MESSAGE"
  10. func closeProducer(t *testing.T, p AsyncProducer) {
  11. var wg sync.WaitGroup
  12. p.AsyncClose()
  13. wg.Add(2)
  14. go func() {
  15. for _ = range p.Successes() {
  16. t.Error("Unexpected message on Successes()")
  17. }
  18. wg.Done()
  19. }()
  20. go func() {
  21. for msg := range p.Errors() {
  22. t.Error(msg.Err)
  23. }
  24. wg.Done()
  25. }()
  26. wg.Wait()
  27. }
  28. func expectSuccesses(t *testing.T, p AsyncProducer, successes int) {
  29. for i := 0; i < successes; i++ {
  30. select {
  31. case msg := <-p.Errors():
  32. t.Error(msg.Err)
  33. if msg.Msg.flags != 0 {
  34. t.Error("Message had flags set")
  35. }
  36. case msg := <-p.Successes():
  37. if msg.flags != 0 {
  38. t.Error("Message had flags set")
  39. }
  40. }
  41. }
  42. }
  43. func TestAsyncProducer(t *testing.T) {
  44. seedBroker := newMockBroker(t, 1)
  45. leader := newMockBroker(t, 2)
  46. metadataResponse := new(MetadataResponse)
  47. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  48. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  49. seedBroker.Returns(metadataResponse)
  50. prodSuccess := new(ProduceResponse)
  51. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  52. leader.Returns(prodSuccess)
  53. config := NewConfig()
  54. config.Producer.Flush.Messages = 10
  55. config.Producer.Return.Successes = true
  56. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  57. if err != nil {
  58. t.Fatal(err)
  59. }
  60. for i := 0; i < 10; i++ {
  61. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  62. }
  63. for i := 0; i < 10; i++ {
  64. select {
  65. case msg := <-producer.Errors():
  66. t.Error(msg.Err)
  67. if msg.Msg.flags != 0 {
  68. t.Error("Message had flags set")
  69. }
  70. case msg := <-producer.Successes():
  71. if msg.flags != 0 {
  72. t.Error("Message had flags set")
  73. }
  74. if msg.Metadata.(int) != i {
  75. t.Error("Message metadata did not match")
  76. }
  77. }
  78. }
  79. closeProducer(t, producer)
  80. leader.Close()
  81. seedBroker.Close()
  82. }
  83. func TestAsyncProducerMultipleFlushes(t *testing.T) {
  84. seedBroker := newMockBroker(t, 1)
  85. leader := newMockBroker(t, 2)
  86. metadataResponse := new(MetadataResponse)
  87. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  88. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  89. seedBroker.Returns(metadataResponse)
  90. prodSuccess := new(ProduceResponse)
  91. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  92. leader.Returns(prodSuccess)
  93. leader.Returns(prodSuccess)
  94. leader.Returns(prodSuccess)
  95. config := NewConfig()
  96. config.Producer.Flush.Messages = 5
  97. config.Producer.Return.Successes = true
  98. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  99. if err != nil {
  100. t.Fatal(err)
  101. }
  102. for flush := 0; flush < 3; flush++ {
  103. for i := 0; i < 5; i++ {
  104. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  105. }
  106. expectSuccesses(t, producer, 5)
  107. }
  108. closeProducer(t, producer)
  109. leader.Close()
  110. seedBroker.Close()
  111. }
  112. func TestAsyncProducerMultipleBrokers(t *testing.T) {
  113. seedBroker := newMockBroker(t, 1)
  114. leader0 := newMockBroker(t, 2)
  115. leader1 := newMockBroker(t, 3)
  116. metadataResponse := new(MetadataResponse)
  117. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  118. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  119. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  120. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  121. seedBroker.Returns(metadataResponse)
  122. prodResponse0 := new(ProduceResponse)
  123. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  124. leader0.Returns(prodResponse0)
  125. prodResponse1 := new(ProduceResponse)
  126. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  127. leader1.Returns(prodResponse1)
  128. config := NewConfig()
  129. config.Producer.Flush.Messages = 5
  130. config.Producer.Return.Successes = true
  131. config.Producer.Partitioner = NewRoundRobinPartitioner
  132. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  133. if err != nil {
  134. t.Fatal(err)
  135. }
  136. for i := 0; i < 10; i++ {
  137. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  138. }
  139. expectSuccesses(t, producer, 10)
  140. closeProducer(t, producer)
  141. leader1.Close()
  142. leader0.Close()
  143. seedBroker.Close()
  144. }
  145. func TestAsyncProducerFailureRetry(t *testing.T) {
  146. seedBroker := newMockBroker(t, 1)
  147. leader1 := newMockBroker(t, 2)
  148. leader2 := newMockBroker(t, 3)
  149. metadataLeader1 := new(MetadataResponse)
  150. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  151. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  152. seedBroker.Returns(metadataLeader1)
  153. config := NewConfig()
  154. config.Producer.Flush.Messages = 10
  155. config.Producer.Return.Successes = true
  156. config.Producer.Retry.Backoff = 0
  157. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  158. if err != nil {
  159. t.Fatal(err)
  160. }
  161. seedBroker.Close()
  162. for i := 0; i < 10; i++ {
  163. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  164. }
  165. prodNotLeader := new(ProduceResponse)
  166. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  167. leader1.Returns(prodNotLeader)
  168. metadataLeader2 := new(MetadataResponse)
  169. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  170. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  171. leader1.Returns(metadataLeader2)
  172. prodSuccess := new(ProduceResponse)
  173. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  174. leader2.Returns(prodSuccess)
  175. expectSuccesses(t, producer, 10)
  176. leader1.Close()
  177. for i := 0; i < 10; i++ {
  178. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  179. }
  180. leader2.Returns(prodSuccess)
  181. expectSuccesses(t, producer, 10)
  182. leader2.Close()
  183. closeProducer(t, producer)
  184. }
  185. func TestAsyncProducerBrokerBounce(t *testing.T) {
  186. seedBroker := newMockBroker(t, 1)
  187. leader := newMockBroker(t, 2)
  188. leaderAddr := leader.Addr()
  189. metadataResponse := new(MetadataResponse)
  190. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  191. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  192. seedBroker.Returns(metadataResponse)
  193. config := NewConfig()
  194. config.Producer.Flush.Messages = 10
  195. config.Producer.Return.Successes = true
  196. config.Producer.Retry.Backoff = 0
  197. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  198. if err != nil {
  199. t.Fatal(err)
  200. }
  201. for i := 0; i < 10; i++ {
  202. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  203. }
  204. leader.Close() // producer should get EOF
  205. leader = newMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  206. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  207. prodSuccess := new(ProduceResponse)
  208. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  209. leader.Returns(prodSuccess)
  210. expectSuccesses(t, producer, 10)
  211. seedBroker.Close()
  212. leader.Close()
  213. closeProducer(t, producer)
  214. }
  215. func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  216. seedBroker := newMockBroker(t, 1)
  217. leader1 := newMockBroker(t, 2)
  218. leader2 := newMockBroker(t, 3)
  219. metadataLeader1 := new(MetadataResponse)
  220. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  221. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  222. seedBroker.Returns(metadataLeader1)
  223. config := NewConfig()
  224. config.Producer.Flush.Messages = 10
  225. config.Producer.Return.Successes = true
  226. config.Producer.Retry.Max = 3
  227. config.Producer.Retry.Backoff = 0
  228. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  229. if err != nil {
  230. t.Fatal(err)
  231. }
  232. for i := 0; i < 10; i++ {
  233. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  234. }
  235. leader1.Close() // producer should get EOF
  236. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  237. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  238. // ok fine, tell it to go to leader2 finally
  239. metadataLeader2 := new(MetadataResponse)
  240. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  241. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  242. seedBroker.Returns(metadataLeader2)
  243. prodSuccess := new(ProduceResponse)
  244. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  245. leader2.Returns(prodSuccess)
  246. expectSuccesses(t, producer, 10)
  247. seedBroker.Close()
  248. leader2.Close()
  249. closeProducer(t, producer)
  250. }
  251. func TestAsyncProducerMultipleRetries(t *testing.T) {
  252. seedBroker := newMockBroker(t, 1)
  253. leader1 := newMockBroker(t, 2)
  254. leader2 := newMockBroker(t, 3)
  255. metadataLeader1 := new(MetadataResponse)
  256. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  257. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  258. seedBroker.Returns(metadataLeader1)
  259. config := NewConfig()
  260. config.Producer.Flush.Messages = 10
  261. config.Producer.Return.Successes = true
  262. config.Producer.Retry.Max = 4
  263. config.Producer.Retry.Backoff = 0
  264. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  265. if err != nil {
  266. t.Fatal(err)
  267. }
  268. for i := 0; i < 10; i++ {
  269. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  270. }
  271. prodNotLeader := new(ProduceResponse)
  272. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  273. leader1.Returns(prodNotLeader)
  274. metadataLeader2 := new(MetadataResponse)
  275. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  276. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  277. seedBroker.Returns(metadataLeader2)
  278. leader2.Returns(prodNotLeader)
  279. seedBroker.Returns(metadataLeader1)
  280. leader1.Returns(prodNotLeader)
  281. seedBroker.Returns(metadataLeader1)
  282. leader1.Returns(prodNotLeader)
  283. seedBroker.Returns(metadataLeader2)
  284. prodSuccess := new(ProduceResponse)
  285. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  286. leader2.Returns(prodSuccess)
  287. expectSuccesses(t, producer, 10)
  288. for i := 0; i < 10; i++ {
  289. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  290. }
  291. leader2.Returns(prodSuccess)
  292. expectSuccesses(t, producer, 10)
  293. seedBroker.Close()
  294. leader1.Close()
  295. leader2.Close()
  296. closeProducer(t, producer)
  297. }
  298. func TestAsyncProducerOutOfRetries(t *testing.T) {
  299. t.Skip("Enable once bug #294 is fixed.")
  300. seedBroker := newMockBroker(t, 1)
  301. leader := newMockBroker(t, 2)
  302. metadataResponse := new(MetadataResponse)
  303. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  304. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  305. seedBroker.Returns(metadataResponse)
  306. config := NewConfig()
  307. config.Producer.Flush.Messages = 10
  308. config.Producer.Return.Successes = true
  309. config.Producer.Retry.Backoff = 0
  310. config.Producer.Retry.Max = 0
  311. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  312. if err != nil {
  313. t.Fatal(err)
  314. }
  315. for i := 0; i < 10; i++ {
  316. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  317. }
  318. prodNotLeader := new(ProduceResponse)
  319. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  320. leader.Returns(prodNotLeader)
  321. for i := 0; i < 10; i++ {
  322. select {
  323. case msg := <-producer.Errors():
  324. if msg.Err != ErrNotLeaderForPartition {
  325. t.Error(msg.Err)
  326. }
  327. case <-producer.Successes():
  328. t.Error("Unexpected success")
  329. }
  330. }
  331. seedBroker.Returns(metadataResponse)
  332. for i := 0; i < 10; i++ {
  333. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  334. }
  335. prodSuccess := new(ProduceResponse)
  336. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  337. leader.Returns(prodSuccess)
  338. expectSuccesses(t, producer, 10)
  339. leader.Close()
  340. seedBroker.Close()
  341. safeClose(t, producer)
  342. }
  343. func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
  344. seedBroker := newMockBroker(t, 1)
  345. leader := newMockBroker(t, 2)
  346. leaderAddr := leader.Addr()
  347. metadataResponse := new(MetadataResponse)
  348. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  349. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  350. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  351. seedBroker.Returns(metadataResponse)
  352. config := NewConfig()
  353. config.Producer.Return.Successes = true
  354. config.Producer.Retry.Backoff = 0
  355. config.Producer.Retry.Max = 1
  356. config.Producer.Partitioner = NewRoundRobinPartitioner
  357. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  358. if err != nil {
  359. t.Fatal(err)
  360. }
  361. // prime partition 0
  362. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  363. prodSuccess := new(ProduceResponse)
  364. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  365. leader.Returns(prodSuccess)
  366. expectSuccesses(t, producer, 1)
  367. // prime partition 1
  368. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  369. prodSuccess = new(ProduceResponse)
  370. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  371. leader.Returns(prodSuccess)
  372. expectSuccesses(t, producer, 1)
  373. // reboot the broker (the producer will get EOF on its existing connection)
  374. leader.Close()
  375. leader = newMockBrokerAddr(t, 2, leaderAddr)
  376. // send another message on partition 0 to trigger the EOF and retry
  377. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  378. // tell partition 0 to go to that broker again
  379. seedBroker.Returns(metadataResponse)
  380. // succeed this time
  381. prodSuccess = new(ProduceResponse)
  382. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  383. leader.Returns(prodSuccess)
  384. expectSuccesses(t, producer, 1)
  385. // shutdown
  386. closeProducer(t, producer)
  387. seedBroker.Close()
  388. leader.Close()
  389. }
  390. // This example shows how to use the producer while simultaneously
  391. // reading the Errors channel to know about any failures.
  392. func ExampleAsyncProducer_select() {
  393. producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
  394. if err != nil {
  395. panic(err)
  396. }
  397. defer func() {
  398. if err := producer.Close(); err != nil {
  399. log.Fatalln(err)
  400. }
  401. }()
  402. // Trap SIGINT to trigger a shutdown.
  403. signals := make(chan os.Signal, 1)
  404. signal.Notify(signals, os.Interrupt)
  405. var enqueued, errors int
  406. ProducerLoop:
  407. for {
  408. select {
  409. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  410. enqueued++
  411. case err := <-producer.Errors():
  412. log.Println("Failed to produce message", err)
  413. errors++
  414. case <-signals:
  415. break ProducerLoop
  416. }
  417. }
  418. log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
  419. }
  420. // This example shows how to use the producer with separate goroutines
  421. // reading from the Successes and Errors channels. Note that in order
  422. // for the Successes channel to be populated, you have to set
  423. // config.Producer.Return.Successes to true.
  424. func ExampleAsyncProducer_goroutines() {
  425. config := NewConfig()
  426. config.Producer.Return.Successes = true
  427. producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
  428. if err != nil {
  429. panic(err)
  430. }
  431. // Trap SIGINT to trigger a graceful shutdown.
  432. signals := make(chan os.Signal, 1)
  433. signal.Notify(signals, os.Interrupt)
  434. var (
  435. wg sync.WaitGroup
  436. enqueued, successes, errors int
  437. )
  438. wg.Add(1)
  439. go func() {
  440. defer wg.Done()
  441. for _ = range producer.Successes() {
  442. successes++
  443. }
  444. }()
  445. wg.Add(1)
  446. go func() {
  447. defer wg.Done()
  448. for err := range producer.Errors() {
  449. log.Println(err)
  450. errors++
  451. }
  452. }()
  453. ProducerLoop:
  454. for {
  455. message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  456. select {
  457. case producer.Input() <- message:
  458. enqueued++
  459. case <-signals:
  460. producer.AsyncClose() // Trigger a shutdown of the producer.
  461. break ProducerLoop
  462. }
  463. }
  464. wg.Wait()
  465. log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
  466. }