async_producer_test.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845
  1. package sarama
  2. import (
  3. "errors"
  4. "log"
  5. "os"
  6. "os/signal"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. const TestMessage = "ABC THE MESSAGE"
  12. func closeProducer(t *testing.T, p AsyncProducer) {
  13. var wg sync.WaitGroup
  14. p.AsyncClose()
  15. wg.Add(2)
  16. go func() {
  17. for range p.Successes() {
  18. t.Error("Unexpected message on Successes()")
  19. }
  20. wg.Done()
  21. }()
  22. go func() {
  23. for msg := range p.Errors() {
  24. t.Error(msg.Err)
  25. }
  26. wg.Done()
  27. }()
  28. wg.Wait()
  29. }
  30. func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
  31. expect := successes + errors
  32. for expect > 0 {
  33. select {
  34. case msg := <-p.Errors():
  35. if msg.Msg.flags != 0 {
  36. t.Error("Message had flags set")
  37. }
  38. errors--
  39. expect--
  40. if errors < 0 {
  41. t.Error(msg.Err)
  42. }
  43. case msg := <-p.Successes():
  44. if msg.flags != 0 {
  45. t.Error("Message had flags set")
  46. }
  47. successes--
  48. expect--
  49. if successes < 0 {
  50. t.Error("Too many successes")
  51. }
  52. }
  53. }
  54. if successes != 0 || errors != 0 {
  55. t.Error("Unexpected successes", successes, "or errors", errors)
  56. }
  57. }
  58. type testPartitioner chan *int32
  59. func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
  60. part := <-p
  61. if part == nil {
  62. return 0, errors.New("BOOM")
  63. }
  64. return *part, nil
  65. }
  66. func (p testPartitioner) RequiresConsistency() bool {
  67. return true
  68. }
  69. func (p testPartitioner) feed(partition int32) {
  70. p <- &partition
  71. }
  72. type flakyEncoder bool
  73. func (f flakyEncoder) Length() int {
  74. return len(TestMessage)
  75. }
  76. func (f flakyEncoder) Encode() ([]byte, error) {
  77. if !bool(f) {
  78. return nil, errors.New("flaky encoding error")
  79. }
  80. return []byte(TestMessage), nil
  81. }
  82. func TestAsyncProducer(t *testing.T) {
  83. seedBroker := NewMockBroker(t, 1)
  84. leader := NewMockBroker(t, 2)
  85. metadataResponse := new(MetadataResponse)
  86. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  87. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  88. seedBroker.Returns(metadataResponse)
  89. prodSuccess := new(ProduceResponse)
  90. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  91. leader.Returns(prodSuccess)
  92. config := NewConfig()
  93. config.Producer.Flush.Messages = 10
  94. config.Producer.Return.Successes = true
  95. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  96. if err != nil {
  97. t.Fatal(err)
  98. }
  99. for i := 0; i < 10; i++ {
  100. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  101. }
  102. for i := 0; i < 10; i++ {
  103. select {
  104. case msg := <-producer.Errors():
  105. t.Error(msg.Err)
  106. if msg.Msg.flags != 0 {
  107. t.Error("Message had flags set")
  108. }
  109. case msg := <-producer.Successes():
  110. if msg.flags != 0 {
  111. t.Error("Message had flags set")
  112. }
  113. if msg.Metadata.(int) != i {
  114. t.Error("Message metadata did not match")
  115. }
  116. case <-time.After(time.Second):
  117. t.Errorf("Timeout waiting for msg #%d", i)
  118. goto done
  119. }
  120. }
  121. done:
  122. closeProducer(t, producer)
  123. leader.Close()
  124. seedBroker.Close()
  125. }
  126. func TestAsyncProducerMultipleFlushes(t *testing.T) {
  127. seedBroker := NewMockBroker(t, 1)
  128. leader := NewMockBroker(t, 2)
  129. metadataResponse := new(MetadataResponse)
  130. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  131. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  132. seedBroker.Returns(metadataResponse)
  133. prodSuccess := new(ProduceResponse)
  134. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  135. leader.Returns(prodSuccess)
  136. leader.Returns(prodSuccess)
  137. leader.Returns(prodSuccess)
  138. config := NewConfig()
  139. config.Producer.Flush.Messages = 5
  140. config.Producer.Return.Successes = true
  141. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  142. if err != nil {
  143. t.Fatal(err)
  144. }
  145. for flush := 0; flush < 3; flush++ {
  146. for i := 0; i < 5; i++ {
  147. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  148. }
  149. expectResults(t, producer, 5, 0)
  150. }
  151. closeProducer(t, producer)
  152. leader.Close()
  153. seedBroker.Close()
  154. }
  155. func TestAsyncProducerMultipleBrokers(t *testing.T) {
  156. seedBroker := NewMockBroker(t, 1)
  157. leader0 := NewMockBroker(t, 2)
  158. leader1 := NewMockBroker(t, 3)
  159. metadataResponse := new(MetadataResponse)
  160. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  161. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  162. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  163. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  164. seedBroker.Returns(metadataResponse)
  165. prodResponse0 := new(ProduceResponse)
  166. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  167. leader0.Returns(prodResponse0)
  168. prodResponse1 := new(ProduceResponse)
  169. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  170. leader1.Returns(prodResponse1)
  171. config := NewConfig()
  172. config.Producer.Flush.Messages = 5
  173. config.Producer.Return.Successes = true
  174. config.Producer.Partitioner = NewRoundRobinPartitioner
  175. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  176. if err != nil {
  177. t.Fatal(err)
  178. }
  179. for i := 0; i < 10; i++ {
  180. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  181. }
  182. expectResults(t, producer, 10, 0)
  183. closeProducer(t, producer)
  184. leader1.Close()
  185. leader0.Close()
  186. seedBroker.Close()
  187. }
  188. func TestAsyncProducerCustomPartitioner(t *testing.T) {
  189. seedBroker := NewMockBroker(t, 1)
  190. leader := NewMockBroker(t, 2)
  191. metadataResponse := new(MetadataResponse)
  192. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  193. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  194. seedBroker.Returns(metadataResponse)
  195. prodResponse := new(ProduceResponse)
  196. prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  197. leader.Returns(prodResponse)
  198. config := NewConfig()
  199. config.Producer.Flush.Messages = 2
  200. config.Producer.Return.Successes = true
  201. config.Producer.Partitioner = func(topic string) Partitioner {
  202. p := make(testPartitioner)
  203. go func() {
  204. p.feed(0)
  205. p <- nil
  206. p <- nil
  207. p <- nil
  208. p.feed(0)
  209. }()
  210. return p
  211. }
  212. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  213. if err != nil {
  214. t.Fatal(err)
  215. }
  216. for i := 0; i < 5; i++ {
  217. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  218. }
  219. expectResults(t, producer, 2, 3)
  220. closeProducer(t, producer)
  221. leader.Close()
  222. seedBroker.Close()
  223. }
  224. func TestAsyncProducerFailureRetry(t *testing.T) {
  225. seedBroker := NewMockBroker(t, 1)
  226. leader1 := NewMockBroker(t, 2)
  227. leader2 := NewMockBroker(t, 3)
  228. metadataLeader1 := new(MetadataResponse)
  229. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  230. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  231. seedBroker.Returns(metadataLeader1)
  232. config := NewConfig()
  233. config.Producer.Flush.Messages = 10
  234. config.Producer.Return.Successes = true
  235. config.Producer.Retry.Backoff = 0
  236. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  237. if err != nil {
  238. t.Fatal(err)
  239. }
  240. seedBroker.Close()
  241. for i := 0; i < 10; i++ {
  242. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  243. }
  244. prodNotLeader := new(ProduceResponse)
  245. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  246. leader1.Returns(prodNotLeader)
  247. metadataLeader2 := new(MetadataResponse)
  248. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  249. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  250. leader1.Returns(metadataLeader2)
  251. prodSuccess := new(ProduceResponse)
  252. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  253. leader2.Returns(prodSuccess)
  254. expectResults(t, producer, 10, 0)
  255. leader1.Close()
  256. for i := 0; i < 10; i++ {
  257. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  258. }
  259. leader2.Returns(prodSuccess)
  260. expectResults(t, producer, 10, 0)
  261. leader2.Close()
  262. closeProducer(t, producer)
  263. }
  264. func TestAsyncProducerEncoderFailures(t *testing.T) {
  265. seedBroker := NewMockBroker(t, 1)
  266. leader := NewMockBroker(t, 2)
  267. metadataResponse := new(MetadataResponse)
  268. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  269. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  270. seedBroker.Returns(metadataResponse)
  271. prodSuccess := new(ProduceResponse)
  272. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  273. leader.Returns(prodSuccess)
  274. leader.Returns(prodSuccess)
  275. leader.Returns(prodSuccess)
  276. config := NewConfig()
  277. config.Producer.Flush.Messages = 1
  278. config.Producer.Return.Successes = true
  279. config.Producer.Partitioner = NewManualPartitioner
  280. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  281. if err != nil {
  282. t.Fatal(err)
  283. }
  284. for flush := 0; flush < 3; flush++ {
  285. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
  286. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
  287. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
  288. expectResults(t, producer, 1, 2)
  289. }
  290. closeProducer(t, producer)
  291. leader.Close()
  292. seedBroker.Close()
  293. }
  294. // If a Kafka broker becomes unavailable and then returns back in service, then
  295. // producer reconnects to it and continues sending messages.
  296. func TestAsyncProducerBrokerBounce(t *testing.T) {
  297. // Given
  298. seedBroker := NewMockBroker(t, 1)
  299. leader := NewMockBroker(t, 2)
  300. leaderAddr := leader.Addr()
  301. metadataResponse := new(MetadataResponse)
  302. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  303. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  304. seedBroker.Returns(metadataResponse)
  305. prodSuccess := new(ProduceResponse)
  306. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  307. config := NewConfig()
  308. config.Producer.Flush.Messages = 1
  309. config.Producer.Return.Successes = true
  310. config.Producer.Retry.Backoff = 0
  311. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  312. if err != nil {
  313. t.Fatal(err)
  314. }
  315. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  316. leader.Returns(prodSuccess)
  317. expectResults(t, producer, 1, 0)
  318. // When: a broker connection gets reset by a broker (network glitch, restart, you name it).
  319. leader.Close() // producer should get EOF
  320. leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  321. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  322. // Then: a produced message goes through the new broker connection.
  323. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  324. leader.Returns(prodSuccess)
  325. expectResults(t, producer, 1, 0)
  326. closeProducer(t, producer)
  327. seedBroker.Close()
  328. leader.Close()
  329. }
  330. func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  331. seedBroker := NewMockBroker(t, 1)
  332. leader1 := NewMockBroker(t, 2)
  333. leader2 := NewMockBroker(t, 3)
  334. metadataLeader1 := new(MetadataResponse)
  335. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  336. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  337. seedBroker.Returns(metadataLeader1)
  338. config := NewConfig()
  339. config.Producer.Flush.Messages = 10
  340. config.Producer.Return.Successes = true
  341. config.Producer.Retry.Max = 3
  342. config.Producer.Retry.Backoff = 0
  343. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  344. if err != nil {
  345. t.Fatal(err)
  346. }
  347. for i := 0; i < 10; i++ {
  348. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  349. }
  350. leader1.Close() // producer should get EOF
  351. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  352. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  353. // ok fine, tell it to go to leader2 finally
  354. metadataLeader2 := new(MetadataResponse)
  355. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  356. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  357. seedBroker.Returns(metadataLeader2)
  358. prodSuccess := new(ProduceResponse)
  359. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  360. leader2.Returns(prodSuccess)
  361. expectResults(t, producer, 10, 0)
  362. seedBroker.Close()
  363. leader2.Close()
  364. closeProducer(t, producer)
  365. }
  366. func TestAsyncProducerMultipleRetries(t *testing.T) {
  367. seedBroker := NewMockBroker(t, 1)
  368. leader1 := NewMockBroker(t, 2)
  369. leader2 := NewMockBroker(t, 3)
  370. metadataLeader1 := new(MetadataResponse)
  371. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  372. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  373. seedBroker.Returns(metadataLeader1)
  374. config := NewConfig()
  375. config.Producer.Flush.Messages = 10
  376. config.Producer.Return.Successes = true
  377. config.Producer.Retry.Max = 4
  378. config.Producer.Retry.Backoff = 0
  379. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  380. if err != nil {
  381. t.Fatal(err)
  382. }
  383. for i := 0; i < 10; i++ {
  384. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  385. }
  386. prodNotLeader := new(ProduceResponse)
  387. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  388. leader1.Returns(prodNotLeader)
  389. metadataLeader2 := new(MetadataResponse)
  390. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  391. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  392. seedBroker.Returns(metadataLeader2)
  393. leader2.Returns(prodNotLeader)
  394. seedBroker.Returns(metadataLeader1)
  395. leader1.Returns(prodNotLeader)
  396. seedBroker.Returns(metadataLeader1)
  397. leader1.Returns(prodNotLeader)
  398. seedBroker.Returns(metadataLeader2)
  399. prodSuccess := new(ProduceResponse)
  400. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  401. leader2.Returns(prodSuccess)
  402. expectResults(t, producer, 10, 0)
  403. for i := 0; i < 10; i++ {
  404. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  405. }
  406. leader2.Returns(prodSuccess)
  407. expectResults(t, producer, 10, 0)
  408. seedBroker.Close()
  409. leader1.Close()
  410. leader2.Close()
  411. closeProducer(t, producer)
  412. }
  413. func TestAsyncProducerOutOfRetries(t *testing.T) {
  414. t.Skip("Enable once bug #294 is fixed.")
  415. seedBroker := NewMockBroker(t, 1)
  416. leader := NewMockBroker(t, 2)
  417. metadataResponse := new(MetadataResponse)
  418. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  419. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  420. seedBroker.Returns(metadataResponse)
  421. config := NewConfig()
  422. config.Producer.Flush.Messages = 10
  423. config.Producer.Return.Successes = true
  424. config.Producer.Retry.Backoff = 0
  425. config.Producer.Retry.Max = 0
  426. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  427. if err != nil {
  428. t.Fatal(err)
  429. }
  430. for i := 0; i < 10; i++ {
  431. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  432. }
  433. prodNotLeader := new(ProduceResponse)
  434. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  435. leader.Returns(prodNotLeader)
  436. for i := 0; i < 10; i++ {
  437. select {
  438. case msg := <-producer.Errors():
  439. if msg.Err != ErrNotLeaderForPartition {
  440. t.Error(msg.Err)
  441. }
  442. case <-producer.Successes():
  443. t.Error("Unexpected success")
  444. }
  445. }
  446. seedBroker.Returns(metadataResponse)
  447. for i := 0; i < 10; i++ {
  448. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  449. }
  450. prodSuccess := new(ProduceResponse)
  451. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  452. leader.Returns(prodSuccess)
  453. expectResults(t, producer, 10, 0)
  454. leader.Close()
  455. seedBroker.Close()
  456. safeClose(t, producer)
  457. }
  458. func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
  459. seedBroker := NewMockBroker(t, 1)
  460. leader := NewMockBroker(t, 2)
  461. leaderAddr := leader.Addr()
  462. metadataResponse := new(MetadataResponse)
  463. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  464. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  465. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  466. seedBroker.Returns(metadataResponse)
  467. config := NewConfig()
  468. config.Producer.Return.Successes = true
  469. config.Producer.Retry.Backoff = 0
  470. config.Producer.Retry.Max = 1
  471. config.Producer.Partitioner = NewRoundRobinPartitioner
  472. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  473. if err != nil {
  474. t.Fatal(err)
  475. }
  476. // prime partition 0
  477. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  478. prodSuccess := new(ProduceResponse)
  479. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  480. leader.Returns(prodSuccess)
  481. expectResults(t, producer, 1, 0)
  482. // prime partition 1
  483. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  484. prodSuccess = new(ProduceResponse)
  485. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  486. leader.Returns(prodSuccess)
  487. expectResults(t, producer, 1, 0)
  488. // reboot the broker (the producer will get EOF on its existing connection)
  489. leader.Close()
  490. leader = NewMockBrokerAddr(t, 2, leaderAddr)
  491. // send another message on partition 0 to trigger the EOF and retry
  492. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  493. // tell partition 0 to go to that broker again
  494. seedBroker.Returns(metadataResponse)
  495. // succeed this time
  496. prodSuccess = new(ProduceResponse)
  497. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  498. leader.Returns(prodSuccess)
  499. expectResults(t, producer, 1, 0)
  500. // shutdown
  501. closeProducer(t, producer)
  502. seedBroker.Close()
  503. leader.Close()
  504. }
  505. func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
  506. seedBroker := NewMockBroker(t, 1)
  507. leader := NewMockBroker(t, 2)
  508. metadataResponse := new(MetadataResponse)
  509. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  510. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  511. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  512. seedBroker.Returns(metadataResponse)
  513. config := NewConfig()
  514. config.Producer.Flush.Messages = 5
  515. config.Producer.Return.Successes = true
  516. config.Producer.Retry.Backoff = 0
  517. config.Producer.Retry.Max = 1
  518. config.Producer.Partitioner = NewManualPartitioner
  519. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  520. if err != nil {
  521. t.Fatal(err)
  522. }
  523. // prime partitions
  524. for p := int32(0); p < 2; p++ {
  525. for i := 0; i < 5; i++ {
  526. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
  527. }
  528. prodSuccess := new(ProduceResponse)
  529. prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
  530. leader.Returns(prodSuccess)
  531. expectResults(t, producer, 5, 0)
  532. }
  533. // send more messages on partition 0
  534. for i := 0; i < 5; i++ {
  535. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  536. }
  537. prodNotLeader := new(ProduceResponse)
  538. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  539. leader.Returns(prodNotLeader)
  540. time.Sleep(50 * time.Millisecond)
  541. leader.SetHandlerByMap(map[string]MockResponse{
  542. "ProduceRequest": NewMockProduceResponse(t).
  543. SetVersion(0).
  544. SetError("my_topic", 0, ErrNoError),
  545. })
  546. // tell partition 0 to go to that broker again
  547. seedBroker.Returns(metadataResponse)
  548. // succeed this time
  549. expectResults(t, producer, 5, 0)
  550. // put five more through
  551. for i := 0; i < 5; i++ {
  552. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  553. }
  554. expectResults(t, producer, 5, 0)
  555. // shutdown
  556. closeProducer(t, producer)
  557. seedBroker.Close()
  558. leader.Close()
  559. }
  560. func TestAsyncProducerRetryShutdown(t *testing.T) {
  561. seedBroker := NewMockBroker(t, 1)
  562. leader := NewMockBroker(t, 2)
  563. metadataLeader := new(MetadataResponse)
  564. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  565. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  566. seedBroker.Returns(metadataLeader)
  567. config := NewConfig()
  568. config.Producer.Flush.Messages = 10
  569. config.Producer.Return.Successes = true
  570. config.Producer.Retry.Backoff = 0
  571. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  572. if err != nil {
  573. t.Fatal(err)
  574. }
  575. for i := 0; i < 10; i++ {
  576. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  577. }
  578. producer.AsyncClose()
  579. time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
  580. producer.Input() <- &ProducerMessage{Topic: "FOO"}
  581. if err := <-producer.Errors(); err.Err != ErrShuttingDown {
  582. t.Error(err)
  583. }
  584. prodNotLeader := new(ProduceResponse)
  585. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  586. leader.Returns(prodNotLeader)
  587. seedBroker.Returns(metadataLeader)
  588. prodSuccess := new(ProduceResponse)
  589. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  590. leader.Returns(prodSuccess)
  591. expectResults(t, producer, 10, 0)
  592. seedBroker.Close()
  593. leader.Close()
  594. // wait for the async-closed producer to shut down fully
  595. for err := range producer.Errors() {
  596. t.Error(err)
  597. }
  598. }
  599. func TestAsyncProducerNoReturns(t *testing.T) {
  600. seedBroker := NewMockBroker(t, 1)
  601. leader := NewMockBroker(t, 2)
  602. metadataLeader := new(MetadataResponse)
  603. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  604. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  605. seedBroker.Returns(metadataLeader)
  606. config := NewConfig()
  607. config.Producer.Flush.Messages = 10
  608. config.Producer.Return.Successes = false
  609. config.Producer.Return.Errors = false
  610. config.Producer.Retry.Backoff = 0
  611. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  612. if err != nil {
  613. t.Fatal(err)
  614. }
  615. for i := 0; i < 10; i++ {
  616. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  617. }
  618. wait := make(chan bool)
  619. go func() {
  620. if err := producer.Close(); err != nil {
  621. t.Error(err)
  622. }
  623. close(wait)
  624. }()
  625. prodSuccess := new(ProduceResponse)
  626. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  627. leader.Returns(prodSuccess)
  628. <-wait
  629. seedBroker.Close()
  630. leader.Close()
  631. }
  632. // This example shows how to use the producer while simultaneously
  633. // reading the Errors channel to know about any failures.
  634. func ExampleAsyncProducer_select() {
  635. producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
  636. if err != nil {
  637. panic(err)
  638. }
  639. defer func() {
  640. if err := producer.Close(); err != nil {
  641. log.Fatalln(err)
  642. }
  643. }()
  644. // Trap SIGINT to trigger a shutdown.
  645. signals := make(chan os.Signal, 1)
  646. signal.Notify(signals, os.Interrupt)
  647. var enqueued, errors int
  648. ProducerLoop:
  649. for {
  650. select {
  651. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  652. enqueued++
  653. case err := <-producer.Errors():
  654. log.Println("Failed to produce message", err)
  655. errors++
  656. case <-signals:
  657. break ProducerLoop
  658. }
  659. }
  660. log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
  661. }
  662. // This example shows how to use the producer with separate goroutines
  663. // reading from the Successes and Errors channels. Note that in order
  664. // for the Successes channel to be populated, you have to set
  665. // config.Producer.Return.Successes to true.
  666. func ExampleAsyncProducer_goroutines() {
  667. config := NewConfig()
  668. config.Producer.Return.Successes = true
  669. producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
  670. if err != nil {
  671. panic(err)
  672. }
  673. // Trap SIGINT to trigger a graceful shutdown.
  674. signals := make(chan os.Signal, 1)
  675. signal.Notify(signals, os.Interrupt)
  676. var (
  677. wg sync.WaitGroup
  678. enqueued, successes, errors int
  679. )
  680. wg.Add(1)
  681. go func() {
  682. defer wg.Done()
  683. for range producer.Successes() {
  684. successes++
  685. }
  686. }()
  687. wg.Add(1)
  688. go func() {
  689. defer wg.Done()
  690. for err := range producer.Errors() {
  691. log.Println(err)
  692. errors++
  693. }
  694. }()
  695. ProducerLoop:
  696. for {
  697. message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  698. select {
  699. case producer.Input() <- message:
  700. enqueued++
  701. case <-signals:
  702. producer.AsyncClose() // Trigger a shutdown of the producer.
  703. break ProducerLoop
  704. }
  705. }
  706. wg.Wait()
  707. log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
  708. }