async_producer_test.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269
  1. package sarama
  2. import (
  3. "errors"
  4. "log"
  5. "os"
  6. "os/signal"
  7. "sync"
  8. "sync/atomic"
  9. "testing"
  10. "time"
  11. "github.com/fortytw2/leaktest"
  12. metrics "github.com/rcrowley/go-metrics"
  13. )
  14. const TestMessage = "ABC THE MESSAGE"
  15. func closeProducer(t *testing.T, p AsyncProducer) {
  16. var wg sync.WaitGroup
  17. p.AsyncClose()
  18. wg.Add(2)
  19. go func() {
  20. for range p.Successes() {
  21. t.Error("Unexpected message on Successes()")
  22. }
  23. wg.Done()
  24. }()
  25. go func() {
  26. for msg := range p.Errors() {
  27. t.Error(msg.Err)
  28. }
  29. wg.Done()
  30. }()
  31. wg.Wait()
  32. }
  33. func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
  34. expect := successes + errors
  35. for expect > 0 {
  36. select {
  37. case msg := <-p.Errors():
  38. if msg.Msg.flags != 0 {
  39. t.Error("Message had flags set")
  40. }
  41. errors--
  42. expect--
  43. if errors < 0 {
  44. t.Error(msg.Err)
  45. }
  46. case msg := <-p.Successes():
  47. if msg.flags != 0 {
  48. t.Error("Message had flags set")
  49. }
  50. successes--
  51. expect--
  52. if successes < 0 {
  53. t.Error("Too many successes")
  54. }
  55. }
  56. }
  57. if successes != 0 || errors != 0 {
  58. t.Error("Unexpected successes", successes, "or errors", errors)
  59. }
  60. }
  61. type testPartitioner chan *int32
  62. func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
  63. part := <-p
  64. if part == nil {
  65. return 0, errors.New("BOOM")
  66. }
  67. return *part, nil
  68. }
  69. func (p testPartitioner) RequiresConsistency() bool {
  70. return true
  71. }
  72. func (p testPartitioner) feed(partition int32) {
  73. p <- &partition
  74. }
  75. type flakyEncoder bool
  76. func (f flakyEncoder) Length() int {
  77. return len(TestMessage)
  78. }
  79. func (f flakyEncoder) Encode() ([]byte, error) {
  80. if !bool(f) {
  81. return nil, errors.New("flaky encoding error")
  82. }
  83. return []byte(TestMessage), nil
  84. }
  85. func TestAsyncProducer(t *testing.T) {
  86. seedBroker := NewMockBroker(t, 1)
  87. leader := NewMockBroker(t, 2)
  88. metadataResponse := new(MetadataResponse)
  89. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  90. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  91. seedBroker.Returns(metadataResponse)
  92. prodSuccess := new(ProduceResponse)
  93. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  94. leader.Returns(prodSuccess)
  95. config := NewConfig()
  96. config.Producer.Flush.Messages = 10
  97. config.Producer.Return.Successes = true
  98. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  99. if err != nil {
  100. t.Fatal(err)
  101. }
  102. for i := 0; i < 10; i++ {
  103. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  104. }
  105. for i := 0; i < 10; i++ {
  106. select {
  107. case msg := <-producer.Errors():
  108. t.Error(msg.Err)
  109. if msg.Msg.flags != 0 {
  110. t.Error("Message had flags set")
  111. }
  112. case msg := <-producer.Successes():
  113. if msg.flags != 0 {
  114. t.Error("Message had flags set")
  115. }
  116. if msg.Metadata.(int) != i {
  117. t.Error("Message metadata did not match")
  118. }
  119. case <-time.After(time.Second):
  120. t.Errorf("Timeout waiting for msg #%d", i)
  121. goto done
  122. }
  123. }
  124. done:
  125. closeProducer(t, producer)
  126. leader.Close()
  127. seedBroker.Close()
  128. }
  129. func TestAsyncProducerMultipleFlushes(t *testing.T) {
  130. seedBroker := NewMockBroker(t, 1)
  131. leader := NewMockBroker(t, 2)
  132. metadataResponse := new(MetadataResponse)
  133. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  134. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  135. seedBroker.Returns(metadataResponse)
  136. prodSuccess := new(ProduceResponse)
  137. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  138. leader.Returns(prodSuccess)
  139. leader.Returns(prodSuccess)
  140. leader.Returns(prodSuccess)
  141. config := NewConfig()
  142. config.Producer.Flush.Messages = 5
  143. config.Producer.Return.Successes = true
  144. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  145. if err != nil {
  146. t.Fatal(err)
  147. }
  148. for flush := 0; flush < 3; flush++ {
  149. for i := 0; i < 5; i++ {
  150. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  151. }
  152. expectResults(t, producer, 5, 0)
  153. }
  154. closeProducer(t, producer)
  155. leader.Close()
  156. seedBroker.Close()
  157. }
  158. func TestAsyncProducerMultipleBrokers(t *testing.T) {
  159. seedBroker := NewMockBroker(t, 1)
  160. leader0 := NewMockBroker(t, 2)
  161. leader1 := NewMockBroker(t, 3)
  162. metadataResponse := new(MetadataResponse)
  163. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  164. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  165. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, nil, ErrNoError)
  166. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  167. seedBroker.Returns(metadataResponse)
  168. prodResponse0 := new(ProduceResponse)
  169. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  170. leader0.Returns(prodResponse0)
  171. prodResponse1 := new(ProduceResponse)
  172. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  173. leader1.Returns(prodResponse1)
  174. config := NewConfig()
  175. config.Producer.Flush.Messages = 5
  176. config.Producer.Return.Successes = true
  177. config.Producer.Partitioner = NewRoundRobinPartitioner
  178. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. for i := 0; i < 10; i++ {
  183. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  184. }
  185. expectResults(t, producer, 10, 0)
  186. closeProducer(t, producer)
  187. leader1.Close()
  188. leader0.Close()
  189. seedBroker.Close()
  190. }
  191. func TestAsyncProducerCustomPartitioner(t *testing.T) {
  192. seedBroker := NewMockBroker(t, 1)
  193. leader := NewMockBroker(t, 2)
  194. metadataResponse := new(MetadataResponse)
  195. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  196. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  197. seedBroker.Returns(metadataResponse)
  198. prodResponse := new(ProduceResponse)
  199. prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  200. leader.Returns(prodResponse)
  201. config := NewConfig()
  202. config.Producer.Flush.Messages = 2
  203. config.Producer.Return.Successes = true
  204. config.Producer.Partitioner = func(topic string) Partitioner {
  205. p := make(testPartitioner)
  206. go func() {
  207. p.feed(0)
  208. p <- nil
  209. p <- nil
  210. p <- nil
  211. p.feed(0)
  212. }()
  213. return p
  214. }
  215. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  216. if err != nil {
  217. t.Fatal(err)
  218. }
  219. for i := 0; i < 5; i++ {
  220. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  221. }
  222. expectResults(t, producer, 2, 3)
  223. closeProducer(t, producer)
  224. leader.Close()
  225. seedBroker.Close()
  226. }
  227. func TestAsyncProducerFailureRetry(t *testing.T) {
  228. seedBroker := NewMockBroker(t, 1)
  229. leader1 := NewMockBroker(t, 2)
  230. leader2 := NewMockBroker(t, 3)
  231. metadataLeader1 := new(MetadataResponse)
  232. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  233. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  234. seedBroker.Returns(metadataLeader1)
  235. config := NewConfig()
  236. config.Producer.Flush.Messages = 10
  237. config.Producer.Return.Successes = true
  238. config.Producer.Retry.Backoff = 0
  239. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  240. if err != nil {
  241. t.Fatal(err)
  242. }
  243. seedBroker.Close()
  244. for i := 0; i < 10; i++ {
  245. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  246. }
  247. prodNotLeader := new(ProduceResponse)
  248. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  249. leader1.Returns(prodNotLeader)
  250. metadataLeader2 := new(MetadataResponse)
  251. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  252. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  253. leader1.Returns(metadataLeader2)
  254. prodSuccess := new(ProduceResponse)
  255. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  256. leader2.Returns(prodSuccess)
  257. expectResults(t, producer, 10, 0)
  258. leader1.Close()
  259. for i := 0; i < 10; i++ {
  260. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  261. }
  262. leader2.Returns(prodSuccess)
  263. expectResults(t, producer, 10, 0)
  264. leader2.Close()
  265. closeProducer(t, producer)
  266. }
  267. type testLogger struct {
  268. t *testing.T
  269. }
  270. func (l *testLogger) Print(v ...interface{}) {
  271. l.t.Log(v...)
  272. }
  273. func (l *testLogger) Printf(format string, v ...interface{}) {
  274. l.t.Logf(format, v...)
  275. }
  276. func (l *testLogger) Println(v ...interface{}) {
  277. l.t.Log(v...)
  278. }
  279. func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
  280. tt := func(t *testing.T, kErr KError) {
  281. seedBroker := NewMockBroker(t, 1)
  282. leader1 := NewMockBroker(t, 2)
  283. leader2 := NewMockBroker(t, 3)
  284. metadataLeader1 := new(MetadataResponse)
  285. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  286. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  287. metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  288. seedBroker.Returns(metadataLeader1)
  289. config := NewConfig()
  290. config.Producer.Flush.Messages = 2
  291. config.Producer.Return.Successes = true
  292. config.Producer.Retry.Max = 0 // disable!
  293. config.Producer.Retry.Backoff = 0
  294. config.Producer.Partitioner = NewManualPartitioner
  295. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  296. if err != nil {
  297. t.Fatal(err)
  298. }
  299. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  300. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
  301. prodNotLeader := new(ProduceResponse)
  302. prodNotLeader.AddTopicPartition("my_topic", 0, kErr)
  303. prodNotLeader.AddTopicPartition("my_topic", 1, kErr)
  304. leader1.Returns(prodNotLeader)
  305. expectResults(t, producer, 0, 2)
  306. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  307. metadataLeader2 := new(MetadataResponse)
  308. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  309. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  310. metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  311. leader1.Returns(metadataLeader2)
  312. leader1.Returns(metadataLeader2)
  313. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
  314. prodSuccess := new(ProduceResponse)
  315. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  316. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  317. leader2.Returns(prodSuccess)
  318. expectResults(t, producer, 2, 0)
  319. seedBroker.Close()
  320. leader1.Close()
  321. leader2.Close()
  322. closeProducer(t, producer)
  323. }
  324. t.Run("retriable error", func(t *testing.T) {
  325. tt(t, ErrNotLeaderForPartition)
  326. })
  327. t.Run("non-retriable error", func(t *testing.T) {
  328. tt(t, ErrNotController)
  329. })
  330. }
  331. func TestAsyncProducerEncoderFailures(t *testing.T) {
  332. seedBroker := NewMockBroker(t, 1)
  333. leader := NewMockBroker(t, 2)
  334. metadataResponse := new(MetadataResponse)
  335. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  336. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  337. seedBroker.Returns(metadataResponse)
  338. prodSuccess := new(ProduceResponse)
  339. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  340. leader.Returns(prodSuccess)
  341. leader.Returns(prodSuccess)
  342. leader.Returns(prodSuccess)
  343. config := NewConfig()
  344. config.Producer.Flush.Messages = 1
  345. config.Producer.Return.Successes = true
  346. config.Producer.Partitioner = NewManualPartitioner
  347. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  348. if err != nil {
  349. t.Fatal(err)
  350. }
  351. for flush := 0; flush < 3; flush++ {
  352. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
  353. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
  354. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
  355. expectResults(t, producer, 1, 2)
  356. }
  357. closeProducer(t, producer)
  358. leader.Close()
  359. seedBroker.Close()
  360. }
  361. // If a Kafka broker becomes unavailable and then returns back in service, then
  362. // producer reconnects to it and continues sending messages.
  363. func TestAsyncProducerBrokerBounce(t *testing.T) {
  364. // Given
  365. seedBroker := NewMockBroker(t, 1)
  366. leader := NewMockBroker(t, 2)
  367. leaderAddr := leader.Addr()
  368. metadataResponse := new(MetadataResponse)
  369. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  370. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  371. seedBroker.Returns(metadataResponse)
  372. prodSuccess := new(ProduceResponse)
  373. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  374. config := NewConfig()
  375. config.Producer.Flush.Messages = 1
  376. config.Producer.Return.Successes = true
  377. config.Producer.Retry.Backoff = 0
  378. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  379. if err != nil {
  380. t.Fatal(err)
  381. }
  382. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  383. leader.Returns(prodSuccess)
  384. expectResults(t, producer, 1, 0)
  385. // When: a broker connection gets reset by a broker (network glitch, restart, you name it).
  386. leader.Close() // producer should get EOF
  387. leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  388. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  389. // Then: a produced message goes through the new broker connection.
  390. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  391. leader.Returns(prodSuccess)
  392. expectResults(t, producer, 1, 0)
  393. closeProducer(t, producer)
  394. seedBroker.Close()
  395. leader.Close()
  396. }
  397. func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  398. seedBroker := NewMockBroker(t, 1)
  399. leader1 := NewMockBroker(t, 2)
  400. leader2 := NewMockBroker(t, 3)
  401. metadataLeader1 := new(MetadataResponse)
  402. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  403. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  404. seedBroker.Returns(metadataLeader1)
  405. config := NewConfig()
  406. config.Producer.Flush.Messages = 10
  407. config.Producer.Return.Successes = true
  408. config.Producer.Retry.Max = 3
  409. config.Producer.Retry.Backoff = 0
  410. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  411. if err != nil {
  412. t.Fatal(err)
  413. }
  414. for i := 0; i < 10; i++ {
  415. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  416. }
  417. leader1.Close() // producer should get EOF
  418. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  419. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  420. // ok fine, tell it to go to leader2 finally
  421. metadataLeader2 := new(MetadataResponse)
  422. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  423. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  424. seedBroker.Returns(metadataLeader2)
  425. prodSuccess := new(ProduceResponse)
  426. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  427. leader2.Returns(prodSuccess)
  428. expectResults(t, producer, 10, 0)
  429. seedBroker.Close()
  430. leader2.Close()
  431. closeProducer(t, producer)
  432. }
  433. func TestAsyncProducerMultipleRetries(t *testing.T) {
  434. seedBroker := NewMockBroker(t, 1)
  435. leader1 := NewMockBroker(t, 2)
  436. leader2 := NewMockBroker(t, 3)
  437. metadataLeader1 := new(MetadataResponse)
  438. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  439. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  440. seedBroker.Returns(metadataLeader1)
  441. config := NewConfig()
  442. config.Producer.Flush.Messages = 10
  443. config.Producer.Return.Successes = true
  444. config.Producer.Retry.Max = 4
  445. config.Producer.Retry.Backoff = 0
  446. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  447. if err != nil {
  448. t.Fatal(err)
  449. }
  450. for i := 0; i < 10; i++ {
  451. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  452. }
  453. prodNotLeader := new(ProduceResponse)
  454. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  455. leader1.Returns(prodNotLeader)
  456. metadataLeader2 := new(MetadataResponse)
  457. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  458. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  459. seedBroker.Returns(metadataLeader2)
  460. leader2.Returns(prodNotLeader)
  461. seedBroker.Returns(metadataLeader1)
  462. leader1.Returns(prodNotLeader)
  463. seedBroker.Returns(metadataLeader1)
  464. leader1.Returns(prodNotLeader)
  465. seedBroker.Returns(metadataLeader2)
  466. prodSuccess := new(ProduceResponse)
  467. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  468. leader2.Returns(prodSuccess)
  469. expectResults(t, producer, 10, 0)
  470. for i := 0; i < 10; i++ {
  471. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  472. }
  473. leader2.Returns(prodSuccess)
  474. expectResults(t, producer, 10, 0)
  475. seedBroker.Close()
  476. leader1.Close()
  477. leader2.Close()
  478. closeProducer(t, producer)
  479. }
  480. func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
  481. seedBroker := NewMockBroker(t, 1)
  482. leader1 := NewMockBroker(t, 2)
  483. leader2 := NewMockBroker(t, 3)
  484. metadataLeader1 := new(MetadataResponse)
  485. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  486. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  487. seedBroker.Returns(metadataLeader1)
  488. config := NewConfig()
  489. config.Producer.Flush.Messages = 1
  490. config.Producer.Return.Successes = true
  491. config.Producer.Retry.Max = 4
  492. backoffCalled := make([]int32, config.Producer.Retry.Max+1)
  493. config.Producer.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
  494. atomic.AddInt32(&backoffCalled[retries-1], 1)
  495. return 0
  496. }
  497. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  498. if err != nil {
  499. t.Fatal(err)
  500. }
  501. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  502. prodNotLeader := new(ProduceResponse)
  503. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  504. prodSuccess := new(ProduceResponse)
  505. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  506. metadataLeader2 := new(MetadataResponse)
  507. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  508. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  509. leader1.Returns(prodNotLeader)
  510. seedBroker.Returns(metadataLeader2)
  511. leader2.Returns(prodNotLeader)
  512. seedBroker.Returns(metadataLeader1)
  513. leader1.Returns(prodNotLeader)
  514. seedBroker.Returns(metadataLeader1)
  515. leader1.Returns(prodNotLeader)
  516. seedBroker.Returns(metadataLeader2)
  517. leader2.Returns(prodSuccess)
  518. expectResults(t, producer, 1, 0)
  519. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  520. leader2.Returns(prodSuccess)
  521. expectResults(t, producer, 1, 0)
  522. seedBroker.Close()
  523. leader1.Close()
  524. leader2.Close()
  525. closeProducer(t, producer)
  526. for i := 0; i < config.Producer.Retry.Max; i++ {
  527. if atomic.LoadInt32(&backoffCalled[i]) != 1 {
  528. t.Errorf("expected one retry attempt #%d", i)
  529. }
  530. }
  531. if atomic.LoadInt32(&backoffCalled[config.Producer.Retry.Max]) != 0 {
  532. t.Errorf("expected no retry attempt #%d", config.Producer.Retry.Max)
  533. }
  534. }
  535. func TestAsyncProducerOutOfRetries(t *testing.T) {
  536. t.Skip("Enable once bug #294 is fixed.")
  537. seedBroker := NewMockBroker(t, 1)
  538. leader := NewMockBroker(t, 2)
  539. metadataResponse := new(MetadataResponse)
  540. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  541. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  542. seedBroker.Returns(metadataResponse)
  543. config := NewConfig()
  544. config.Producer.Flush.Messages = 10
  545. config.Producer.Return.Successes = true
  546. config.Producer.Retry.Backoff = 0
  547. config.Producer.Retry.Max = 0
  548. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  549. if err != nil {
  550. t.Fatal(err)
  551. }
  552. for i := 0; i < 10; i++ {
  553. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  554. }
  555. prodNotLeader := new(ProduceResponse)
  556. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  557. leader.Returns(prodNotLeader)
  558. for i := 0; i < 10; i++ {
  559. select {
  560. case msg := <-producer.Errors():
  561. if msg.Err != ErrNotLeaderForPartition {
  562. t.Error(msg.Err)
  563. }
  564. case <-producer.Successes():
  565. t.Error("Unexpected success")
  566. }
  567. }
  568. seedBroker.Returns(metadataResponse)
  569. for i := 0; i < 10; i++ {
  570. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  571. }
  572. prodSuccess := new(ProduceResponse)
  573. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  574. leader.Returns(prodSuccess)
  575. expectResults(t, producer, 10, 0)
  576. leader.Close()
  577. seedBroker.Close()
  578. safeClose(t, producer)
  579. }
  580. func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
  581. seedBroker := NewMockBroker(t, 1)
  582. leader := NewMockBroker(t, 2)
  583. leaderAddr := leader.Addr()
  584. metadataResponse := new(MetadataResponse)
  585. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  586. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  587. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError)
  588. seedBroker.Returns(metadataResponse)
  589. config := NewConfig()
  590. config.Producer.Return.Successes = true
  591. config.Producer.Retry.Backoff = 0
  592. config.Producer.Retry.Max = 1
  593. config.Producer.Partitioner = NewRoundRobinPartitioner
  594. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  595. if err != nil {
  596. t.Fatal(err)
  597. }
  598. // prime partition 0
  599. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  600. prodSuccess := new(ProduceResponse)
  601. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  602. leader.Returns(prodSuccess)
  603. expectResults(t, producer, 1, 0)
  604. // prime partition 1
  605. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  606. prodSuccess = new(ProduceResponse)
  607. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  608. leader.Returns(prodSuccess)
  609. expectResults(t, producer, 1, 0)
  610. // reboot the broker (the producer will get EOF on its existing connection)
  611. leader.Close()
  612. leader = NewMockBrokerAddr(t, 2, leaderAddr)
  613. // send another message on partition 0 to trigger the EOF and retry
  614. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  615. // tell partition 0 to go to that broker again
  616. seedBroker.Returns(metadataResponse)
  617. // succeed this time
  618. prodSuccess = new(ProduceResponse)
  619. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  620. leader.Returns(prodSuccess)
  621. expectResults(t, producer, 1, 0)
  622. // shutdown
  623. closeProducer(t, producer)
  624. seedBroker.Close()
  625. leader.Close()
  626. }
  627. func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
  628. seedBroker := NewMockBroker(t, 1)
  629. leader := NewMockBroker(t, 2)
  630. metadataResponse := new(MetadataResponse)
  631. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  632. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  633. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError)
  634. seedBroker.Returns(metadataResponse)
  635. config := NewConfig()
  636. config.Producer.Flush.Messages = 5
  637. config.Producer.Return.Successes = true
  638. config.Producer.Retry.Backoff = 0
  639. config.Producer.Retry.Max = 1
  640. config.Producer.Partitioner = NewManualPartitioner
  641. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  642. if err != nil {
  643. t.Fatal(err)
  644. }
  645. // prime partitions
  646. for p := int32(0); p < 2; p++ {
  647. for i := 0; i < 5; i++ {
  648. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
  649. }
  650. prodSuccess := new(ProduceResponse)
  651. prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
  652. leader.Returns(prodSuccess)
  653. expectResults(t, producer, 5, 0)
  654. }
  655. // send more messages on partition 0
  656. for i := 0; i < 5; i++ {
  657. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  658. }
  659. prodNotLeader := new(ProduceResponse)
  660. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  661. leader.Returns(prodNotLeader)
  662. time.Sleep(50 * time.Millisecond)
  663. leader.SetHandlerByMap(map[string]MockResponse{
  664. "ProduceRequest": NewMockProduceResponse(t).
  665. SetVersion(0).
  666. SetError("my_topic", 0, ErrNoError),
  667. })
  668. // tell partition 0 to go to that broker again
  669. seedBroker.Returns(metadataResponse)
  670. // succeed this time
  671. expectResults(t, producer, 5, 0)
  672. // put five more through
  673. for i := 0; i < 5; i++ {
  674. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  675. }
  676. expectResults(t, producer, 5, 0)
  677. // shutdown
  678. closeProducer(t, producer)
  679. seedBroker.Close()
  680. leader.Close()
  681. }
  682. func TestAsyncProducerRetryShutdown(t *testing.T) {
  683. seedBroker := NewMockBroker(t, 1)
  684. leader := NewMockBroker(t, 2)
  685. metadataLeader := new(MetadataResponse)
  686. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  687. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  688. seedBroker.Returns(metadataLeader)
  689. config := NewConfig()
  690. config.Producer.Flush.Messages = 10
  691. config.Producer.Return.Successes = true
  692. config.Producer.Retry.Backoff = 0
  693. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  694. if err != nil {
  695. t.Fatal(err)
  696. }
  697. for i := 0; i < 10; i++ {
  698. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  699. }
  700. producer.AsyncClose()
  701. time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
  702. producer.Input() <- &ProducerMessage{Topic: "FOO"}
  703. if err := <-producer.Errors(); err.Err != ErrShuttingDown {
  704. t.Error(err)
  705. }
  706. prodNotLeader := new(ProduceResponse)
  707. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  708. leader.Returns(prodNotLeader)
  709. seedBroker.Returns(metadataLeader)
  710. prodSuccess := new(ProduceResponse)
  711. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  712. leader.Returns(prodSuccess)
  713. expectResults(t, producer, 10, 0)
  714. seedBroker.Close()
  715. leader.Close()
  716. // wait for the async-closed producer to shut down fully
  717. for err := range producer.Errors() {
  718. t.Error(err)
  719. }
  720. }
  721. func TestAsyncProducerNoReturns(t *testing.T) {
  722. seedBroker := NewMockBroker(t, 1)
  723. leader := NewMockBroker(t, 2)
  724. metadataLeader := new(MetadataResponse)
  725. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  726. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  727. seedBroker.Returns(metadataLeader)
  728. config := NewConfig()
  729. config.Producer.Flush.Messages = 10
  730. config.Producer.Return.Successes = false
  731. config.Producer.Return.Errors = false
  732. config.Producer.Retry.Backoff = 0
  733. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  734. if err != nil {
  735. t.Fatal(err)
  736. }
  737. for i := 0; i < 10; i++ {
  738. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  739. }
  740. wait := make(chan bool)
  741. go func() {
  742. if err := producer.Close(); err != nil {
  743. t.Error(err)
  744. }
  745. close(wait)
  746. }()
  747. prodSuccess := new(ProduceResponse)
  748. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  749. leader.Returns(prodSuccess)
  750. <-wait
  751. seedBroker.Close()
  752. leader.Close()
  753. }
  754. func TestAsyncProducerIdempotentGoldenPath(t *testing.T) {
  755. broker := NewMockBroker(t, 1)
  756. metadataResponse := &MetadataResponse{
  757. Version: 1,
  758. ControllerID: 1,
  759. }
  760. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  761. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
  762. broker.Returns(metadataResponse)
  763. initProducerID := &InitProducerIDResponse{
  764. ThrottleTime: 0,
  765. ProducerID: 1000,
  766. ProducerEpoch: 1,
  767. }
  768. broker.Returns(initProducerID)
  769. config := NewConfig()
  770. config.Producer.Flush.Messages = 10
  771. config.Producer.Return.Successes = true
  772. config.Producer.Retry.Max = 4
  773. config.Producer.RequiredAcks = WaitForAll
  774. config.Producer.Retry.Backoff = 0
  775. config.Producer.Idempotent = true
  776. config.Net.MaxOpenRequests = 1
  777. config.Version = V0_11_0_0
  778. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  779. if err != nil {
  780. t.Fatal(err)
  781. }
  782. for i := 0; i < 10; i++ {
  783. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  784. }
  785. prodSuccess := &ProduceResponse{
  786. Version: 3,
  787. ThrottleTime: 0,
  788. }
  789. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  790. broker.Returns(prodSuccess)
  791. expectResults(t, producer, 10, 0)
  792. broker.Close()
  793. closeProducer(t, producer)
  794. }
  795. func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
  796. //Logger = log.New(os.Stderr, "", log.LstdFlags)
  797. tests := []struct {
  798. name string
  799. failAfterWrite bool
  800. }{
  801. {"FailAfterWrite", true},
  802. {"FailBeforeWrite", false},
  803. }
  804. for _, test := range tests {
  805. broker := NewMockBroker(t, 1)
  806. metadataResponse := &MetadataResponse{
  807. Version: 1,
  808. ControllerID: 1,
  809. }
  810. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  811. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
  812. initProducerIDResponse := &InitProducerIDResponse{
  813. ThrottleTime: 0,
  814. ProducerID: 1000,
  815. ProducerEpoch: 1,
  816. }
  817. prodNotLeaderResponse := &ProduceResponse{
  818. Version: 3,
  819. ThrottleTime: 0,
  820. }
  821. prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)
  822. prodDuplicate := &ProduceResponse{
  823. Version: 3,
  824. ThrottleTime: 0,
  825. }
  826. prodDuplicate.AddTopicPartition("my_topic", 0, ErrDuplicateSequenceNumber)
  827. prodOutOfSeq := &ProduceResponse{
  828. Version: 3,
  829. ThrottleTime: 0,
  830. }
  831. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  832. prodSuccessResponse := &ProduceResponse{
  833. Version: 3,
  834. ThrottleTime: 0,
  835. }
  836. prodSuccessResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  837. prodCounter := 0
  838. lastBatchFirstSeq := -1
  839. lastBatchSize := -1
  840. lastSequenceWrittenToDisk := -1
  841. handlerFailBeforeWrite := func(req *request) (res encoder) {
  842. switch req.body.key() {
  843. case 3:
  844. return metadataResponse
  845. case 22:
  846. return initProducerIDResponse
  847. case 0:
  848. prodCounter++
  849. preq := req.body.(*ProduceRequest)
  850. batch := preq.records["my_topic"][0].RecordBatch
  851. batchFirstSeq := int(batch.FirstSequence)
  852. batchSize := len(batch.Records)
  853. if lastSequenceWrittenToDisk == batchFirstSeq-1 { //in sequence append
  854. if lastBatchFirstSeq == batchFirstSeq { //is a batch retry
  855. if lastBatchSize == batchSize { //good retry
  856. // mock write to disk
  857. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  858. return prodSuccessResponse
  859. }
  860. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  861. return prodOutOfSeq
  862. } // not a retry
  863. // save batch just received for future check
  864. lastBatchFirstSeq = batchFirstSeq
  865. lastBatchSize = batchSize
  866. if prodCounter%2 == 1 {
  867. if test.failAfterWrite {
  868. // mock write to disk
  869. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  870. }
  871. return prodNotLeaderResponse
  872. }
  873. // mock write to disk
  874. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  875. return prodSuccessResponse
  876. }
  877. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize == batchSize { // is a good batch retry
  878. if lastSequenceWrittenToDisk == (batchFirstSeq + batchSize - 1) { // we already have the messages
  879. return prodDuplicate
  880. }
  881. // mock write to disk
  882. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  883. return prodSuccessResponse
  884. } //out of sequence / bad retried batch
  885. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize != batchSize {
  886. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  887. } else if lastSequenceWrittenToDisk+1 != batchFirstSeq {
  888. t.Errorf("[%s] Out of sequence message lastSequence=%d new batch starts at=%d", test.name, lastSequenceWrittenToDisk, batchFirstSeq)
  889. } else {
  890. t.Errorf("[%s] Unexpected error", test.name)
  891. }
  892. return prodOutOfSeq
  893. }
  894. return nil
  895. }
  896. config := NewConfig()
  897. config.Version = V0_11_0_0
  898. config.Producer.Idempotent = true
  899. config.Net.MaxOpenRequests = 1
  900. config.Producer.RequiredAcks = WaitForAll
  901. config.Producer.Return.Successes = true
  902. config.Producer.Flush.Frequency = 50 * time.Millisecond
  903. config.Producer.Retry.Backoff = 100 * time.Millisecond
  904. broker.setHandler(handlerFailBeforeWrite)
  905. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  906. if err != nil {
  907. t.Fatal(err)
  908. }
  909. for i := 0; i < 3; i++ {
  910. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  911. }
  912. go func() {
  913. for i := 0; i < 7; i++ {
  914. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine")}
  915. time.Sleep(100 * time.Millisecond)
  916. }
  917. }()
  918. expectResults(t, producer, 10, 0)
  919. broker.Close()
  920. closeProducer(t, producer)
  921. }
  922. }
  923. func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
  924. broker := NewMockBroker(t, 1)
  925. metadataResponse := &MetadataResponse{
  926. Version: 1,
  927. ControllerID: 1,
  928. }
  929. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  930. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
  931. broker.Returns(metadataResponse)
  932. initProducerID := &InitProducerIDResponse{
  933. ThrottleTime: 0,
  934. ProducerID: 1000,
  935. ProducerEpoch: 1,
  936. }
  937. broker.Returns(initProducerID)
  938. config := NewConfig()
  939. config.Producer.Flush.Messages = 10
  940. config.Producer.Return.Successes = true
  941. config.Producer.Retry.Max = 400000
  942. config.Producer.RequiredAcks = WaitForAll
  943. config.Producer.Retry.Backoff = 0
  944. config.Producer.Idempotent = true
  945. config.Net.MaxOpenRequests = 1
  946. config.Version = V0_11_0_0
  947. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  948. if err != nil {
  949. t.Fatal(err)
  950. }
  951. for i := 0; i < 10; i++ {
  952. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  953. }
  954. prodOutOfSeq := &ProduceResponse{
  955. Version: 3,
  956. ThrottleTime: 0,
  957. }
  958. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  959. broker.Returns(prodOutOfSeq)
  960. expectResults(t, producer, 0, 10)
  961. broker.Close()
  962. closeProducer(t, producer)
  963. }
  964. // TestBrokerProducerShutdown ensures that a call to shutdown stops the
  965. // brokerProducer run() loop and doesn't leak any goroutines
  966. func TestBrokerProducerShutdown(t *testing.T) {
  967. defer leaktest.Check(t)()
  968. metrics.UseNilMetrics = true // disable Sarama's go-metrics library
  969. defer func() {
  970. metrics.UseNilMetrics = false
  971. }()
  972. mockBroker := NewMockBroker(t, 1)
  973. metadataResponse := &MetadataResponse{}
  974. metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID())
  975. metadataResponse.AddTopicPartition(
  976. "my_topic", 0, mockBroker.BrokerID(), nil, nil, nil, ErrNoError)
  977. mockBroker.Returns(metadataResponse)
  978. producer, err := NewAsyncProducer([]string{mockBroker.Addr()}, nil)
  979. if err != nil {
  980. t.Fatal(err)
  981. }
  982. broker := &Broker{
  983. addr: mockBroker.Addr(),
  984. id: mockBroker.BrokerID(),
  985. }
  986. bp := producer.(*asyncProducer).newBrokerProducer(broker)
  987. bp.shutdown()
  988. _ = producer.Close()
  989. mockBroker.Close()
  990. }
  991. // This example shows how to use the producer while simultaneously
  992. // reading the Errors channel to know about any failures.
  993. func ExampleAsyncProducer_select() {
  994. producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
  995. if err != nil {
  996. panic(err)
  997. }
  998. defer func() {
  999. if err := producer.Close(); err != nil {
  1000. log.Fatalln(err)
  1001. }
  1002. }()
  1003. // Trap SIGINT to trigger a shutdown.
  1004. signals := make(chan os.Signal, 1)
  1005. signal.Notify(signals, os.Interrupt)
  1006. var enqueued, errors int
  1007. ProducerLoop:
  1008. for {
  1009. select {
  1010. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  1011. enqueued++
  1012. case err := <-producer.Errors():
  1013. log.Println("Failed to produce message", err)
  1014. errors++
  1015. case <-signals:
  1016. break ProducerLoop
  1017. }
  1018. }
  1019. log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
  1020. }
  1021. // This example shows how to use the producer with separate goroutines
  1022. // reading from the Successes and Errors channels. Note that in order
  1023. // for the Successes channel to be populated, you have to set
  1024. // config.Producer.Return.Successes to true.
  1025. func ExampleAsyncProducer_goroutines() {
  1026. config := NewConfig()
  1027. config.Producer.Return.Successes = true
  1028. producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
  1029. if err != nil {
  1030. panic(err)
  1031. }
  1032. // Trap SIGINT to trigger a graceful shutdown.
  1033. signals := make(chan os.Signal, 1)
  1034. signal.Notify(signals, os.Interrupt)
  1035. var (
  1036. wg sync.WaitGroup
  1037. enqueued, successes, errors int
  1038. )
  1039. wg.Add(1)
  1040. go func() {
  1041. defer wg.Done()
  1042. for range producer.Successes() {
  1043. successes++
  1044. }
  1045. }()
  1046. wg.Add(1)
  1047. go func() {
  1048. defer wg.Done()
  1049. for err := range producer.Errors() {
  1050. log.Println(err)
  1051. errors++
  1052. }
  1053. }()
  1054. ProducerLoop:
  1055. for {
  1056. message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  1057. select {
  1058. case producer.Input() <- message:
  1059. enqueued++
  1060. case <-signals:
  1061. producer.AsyncClose() // Trigger a shutdown of the producer.
  1062. break ProducerLoop
  1063. }
  1064. }
  1065. wg.Wait()
  1066. log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
  1067. }