async_producer_test.go 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322
  1. package sarama
  2. import (
  3. "errors"
  4. "log"
  5. "os"
  6. "os/signal"
  7. "sync"
  8. "sync/atomic"
  9. "testing"
  10. "time"
  11. "github.com/fortytw2/leaktest"
  12. metrics "github.com/rcrowley/go-metrics"
  13. )
  14. const TestMessage = "ABC THE MESSAGE"
  15. func closeProducer(t *testing.T, p AsyncProducer) {
  16. var wg sync.WaitGroup
  17. p.AsyncClose()
  18. wg.Add(2)
  19. go func() {
  20. for range p.Successes() {
  21. t.Error("Unexpected message on Successes()")
  22. }
  23. wg.Done()
  24. }()
  25. go func() {
  26. for msg := range p.Errors() {
  27. t.Error(msg.Err)
  28. }
  29. wg.Done()
  30. }()
  31. wg.Wait()
  32. }
  33. func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
  34. expect := successes + errors
  35. for expect > 0 {
  36. select {
  37. case msg := <-p.Errors():
  38. if msg.Msg.flags != 0 {
  39. t.Error("Message had flags set")
  40. }
  41. errors--
  42. expect--
  43. if errors < 0 {
  44. t.Error(msg.Err)
  45. }
  46. case msg := <-p.Successes():
  47. if msg.flags != 0 {
  48. t.Error("Message had flags set")
  49. }
  50. successes--
  51. expect--
  52. if successes < 0 {
  53. t.Error("Too many successes")
  54. }
  55. }
  56. }
  57. if successes != 0 || errors != 0 {
  58. t.Error("Unexpected successes", successes, "or errors", errors)
  59. }
  60. }
  61. type testPartitioner chan *int32
  62. func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
  63. part := <-p
  64. if part == nil {
  65. return 0, errors.New("BOOM")
  66. }
  67. return *part, nil
  68. }
  69. func (p testPartitioner) RequiresConsistency() bool {
  70. return true
  71. }
  72. func (p testPartitioner) feed(partition int32) {
  73. p <- &partition
  74. }
  75. type flakyEncoder bool
  76. func (f flakyEncoder) Length() int {
  77. return len(TestMessage)
  78. }
  79. func (f flakyEncoder) Encode() ([]byte, error) {
  80. if !bool(f) {
  81. return nil, errors.New("flaky encoding error")
  82. }
  83. return []byte(TestMessage), nil
  84. }
  85. func TestAsyncProducer(t *testing.T) {
  86. seedBroker := NewMockBroker(t, 1)
  87. leader := NewMockBroker(t, 2)
  88. metadataResponse := new(MetadataResponse)
  89. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  90. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  91. seedBroker.Returns(metadataResponse)
  92. prodSuccess := new(ProduceResponse)
  93. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  94. leader.Returns(prodSuccess)
  95. config := NewConfig()
  96. config.Producer.Flush.Messages = 10
  97. config.Producer.Return.Successes = true
  98. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  99. if err != nil {
  100. t.Fatal(err)
  101. }
  102. for i := 0; i < 10; i++ {
  103. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  104. }
  105. for i := 0; i < 10; i++ {
  106. select {
  107. case msg := <-producer.Errors():
  108. t.Error(msg.Err)
  109. if msg.Msg.flags != 0 {
  110. t.Error("Message had flags set")
  111. }
  112. case msg := <-producer.Successes():
  113. if msg.flags != 0 {
  114. t.Error("Message had flags set")
  115. }
  116. if msg.Metadata.(int) != i {
  117. t.Error("Message metadata did not match")
  118. }
  119. case <-time.After(time.Second):
  120. t.Errorf("Timeout waiting for msg #%d", i)
  121. goto done
  122. }
  123. }
  124. done:
  125. closeProducer(t, producer)
  126. leader.Close()
  127. seedBroker.Close()
  128. }
  129. func TestAsyncProducerMultipleFlushes(t *testing.T) {
  130. seedBroker := NewMockBroker(t, 1)
  131. leader := NewMockBroker(t, 2)
  132. metadataResponse := new(MetadataResponse)
  133. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  134. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  135. seedBroker.Returns(metadataResponse)
  136. prodSuccess := new(ProduceResponse)
  137. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  138. leader.Returns(prodSuccess)
  139. leader.Returns(prodSuccess)
  140. leader.Returns(prodSuccess)
  141. config := NewConfig()
  142. config.Producer.Flush.Messages = 5
  143. config.Producer.Return.Successes = true
  144. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  145. if err != nil {
  146. t.Fatal(err)
  147. }
  148. for flush := 0; flush < 3; flush++ {
  149. for i := 0; i < 5; i++ {
  150. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  151. }
  152. expectResults(t, producer, 5, 0)
  153. }
  154. closeProducer(t, producer)
  155. leader.Close()
  156. seedBroker.Close()
  157. }
  158. func TestAsyncProducerMultipleBrokers(t *testing.T) {
  159. seedBroker := NewMockBroker(t, 1)
  160. leader0 := NewMockBroker(t, 2)
  161. leader1 := NewMockBroker(t, 3)
  162. metadataResponse := new(MetadataResponse)
  163. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  164. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  165. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, nil, ErrNoError)
  166. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  167. seedBroker.Returns(metadataResponse)
  168. prodResponse0 := new(ProduceResponse)
  169. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  170. leader0.Returns(prodResponse0)
  171. prodResponse1 := new(ProduceResponse)
  172. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  173. leader1.Returns(prodResponse1)
  174. config := NewConfig()
  175. config.Producer.Flush.Messages = 5
  176. config.Producer.Return.Successes = true
  177. config.Producer.Partitioner = NewRoundRobinPartitioner
  178. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. for i := 0; i < 10; i++ {
  183. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  184. }
  185. expectResults(t, producer, 10, 0)
  186. closeProducer(t, producer)
  187. leader1.Close()
  188. leader0.Close()
  189. seedBroker.Close()
  190. }
  191. func TestAsyncProducerCustomPartitioner(t *testing.T) {
  192. seedBroker := NewMockBroker(t, 1)
  193. leader := NewMockBroker(t, 2)
  194. metadataResponse := new(MetadataResponse)
  195. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  196. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  197. seedBroker.Returns(metadataResponse)
  198. prodResponse := new(ProduceResponse)
  199. prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  200. leader.Returns(prodResponse)
  201. config := NewConfig()
  202. config.Producer.Flush.Messages = 2
  203. config.Producer.Return.Successes = true
  204. config.Producer.Partitioner = func(topic string) Partitioner {
  205. p := make(testPartitioner)
  206. go func() {
  207. p.feed(0)
  208. p <- nil
  209. p <- nil
  210. p <- nil
  211. p.feed(0)
  212. }()
  213. return p
  214. }
  215. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  216. if err != nil {
  217. t.Fatal(err)
  218. }
  219. for i := 0; i < 5; i++ {
  220. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  221. }
  222. expectResults(t, producer, 2, 3)
  223. closeProducer(t, producer)
  224. leader.Close()
  225. seedBroker.Close()
  226. }
  227. func TestAsyncProducerFailureRetry(t *testing.T) {
  228. seedBroker := NewMockBroker(t, 1)
  229. leader1 := NewMockBroker(t, 2)
  230. leader2 := NewMockBroker(t, 3)
  231. metadataLeader1 := new(MetadataResponse)
  232. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  233. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  234. seedBroker.Returns(metadataLeader1)
  235. config := NewConfig()
  236. config.Producer.Flush.Messages = 10
  237. config.Producer.Return.Successes = true
  238. config.Producer.Retry.Backoff = 0
  239. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  240. if err != nil {
  241. t.Fatal(err)
  242. }
  243. seedBroker.Close()
  244. for i := 0; i < 10; i++ {
  245. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  246. }
  247. prodNotLeader := new(ProduceResponse)
  248. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  249. leader1.Returns(prodNotLeader)
  250. metadataLeader2 := new(MetadataResponse)
  251. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  252. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  253. leader1.Returns(metadataLeader2)
  254. prodSuccess := new(ProduceResponse)
  255. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  256. leader2.Returns(prodSuccess)
  257. expectResults(t, producer, 10, 0)
  258. leader1.Close()
  259. for i := 0; i < 10; i++ {
  260. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  261. }
  262. leader2.Returns(prodSuccess)
  263. expectResults(t, producer, 10, 0)
  264. leader2.Close()
  265. closeProducer(t, producer)
  266. }
  267. func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
  268. tt := func(t *testing.T, kErr KError) {
  269. seedBroker := NewMockBroker(t, 1)
  270. leader1 := NewMockBroker(t, 2)
  271. leader2 := NewMockBroker(t, 3)
  272. metadataLeader1 := new(MetadataResponse)
  273. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  274. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  275. metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  276. seedBroker.Returns(metadataLeader1)
  277. config := NewConfig()
  278. config.Producer.Flush.Messages = 2
  279. config.Producer.Return.Successes = true
  280. config.Producer.Retry.Max = 0 // disable!
  281. config.Producer.Retry.Backoff = 0
  282. config.Producer.Partitioner = NewManualPartitioner
  283. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  284. if err != nil {
  285. t.Fatal(err)
  286. }
  287. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  288. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
  289. prodNotLeader := new(ProduceResponse)
  290. prodNotLeader.AddTopicPartition("my_topic", 0, kErr)
  291. prodNotLeader.AddTopicPartition("my_topic", 1, kErr)
  292. leader1.Returns(prodNotLeader)
  293. expectResults(t, producer, 0, 2)
  294. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  295. metadataLeader2 := new(MetadataResponse)
  296. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  297. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  298. metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  299. leader1.Returns(metadataLeader2)
  300. leader1.Returns(metadataLeader2)
  301. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
  302. prodSuccess := new(ProduceResponse)
  303. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  304. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  305. leader2.Returns(prodSuccess)
  306. expectResults(t, producer, 2, 0)
  307. seedBroker.Close()
  308. leader1.Close()
  309. leader2.Close()
  310. closeProducer(t, producer)
  311. }
  312. t.Run("retriable error", func(t *testing.T) {
  313. tt(t, ErrNotLeaderForPartition)
  314. })
  315. t.Run("non-retriable error", func(t *testing.T) {
  316. tt(t, ErrNotController)
  317. })
  318. }
  319. func TestAsyncProducerEncoderFailures(t *testing.T) {
  320. seedBroker := NewMockBroker(t, 1)
  321. leader := NewMockBroker(t, 2)
  322. metadataResponse := new(MetadataResponse)
  323. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  324. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  325. seedBroker.Returns(metadataResponse)
  326. prodSuccess := new(ProduceResponse)
  327. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  328. leader.Returns(prodSuccess)
  329. leader.Returns(prodSuccess)
  330. leader.Returns(prodSuccess)
  331. config := NewConfig()
  332. config.Producer.Flush.Messages = 1
  333. config.Producer.Return.Successes = true
  334. config.Producer.Partitioner = NewManualPartitioner
  335. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  336. if err != nil {
  337. t.Fatal(err)
  338. }
  339. for flush := 0; flush < 3; flush++ {
  340. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
  341. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
  342. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
  343. expectResults(t, producer, 1, 2)
  344. }
  345. closeProducer(t, producer)
  346. leader.Close()
  347. seedBroker.Close()
  348. }
  349. // If a Kafka broker becomes unavailable and then returns back in service, then
  350. // producer reconnects to it and continues sending messages.
  351. func TestAsyncProducerBrokerBounce(t *testing.T) {
  352. // Given
  353. seedBroker := NewMockBroker(t, 1)
  354. leader := NewMockBroker(t, 2)
  355. leaderAddr := leader.Addr()
  356. metadataResponse := new(MetadataResponse)
  357. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  358. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  359. seedBroker.Returns(metadataResponse)
  360. prodSuccess := new(ProduceResponse)
  361. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  362. config := NewConfig()
  363. config.Producer.Flush.Messages = 1
  364. config.Producer.Return.Successes = true
  365. config.Producer.Retry.Backoff = 0
  366. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  367. if err != nil {
  368. t.Fatal(err)
  369. }
  370. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  371. leader.Returns(prodSuccess)
  372. expectResults(t, producer, 1, 0)
  373. // When: a broker connection gets reset by a broker (network glitch, restart, you name it).
  374. leader.Close() // producer should get EOF
  375. leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  376. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  377. // Then: a produced message goes through the new broker connection.
  378. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  379. leader.Returns(prodSuccess)
  380. expectResults(t, producer, 1, 0)
  381. closeProducer(t, producer)
  382. seedBroker.Close()
  383. leader.Close()
  384. }
  385. func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  386. seedBroker := NewMockBroker(t, 1)
  387. leader1 := NewMockBroker(t, 2)
  388. leader2 := NewMockBroker(t, 3)
  389. metadataLeader1 := new(MetadataResponse)
  390. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  391. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  392. seedBroker.Returns(metadataLeader1)
  393. config := NewConfig()
  394. config.Producer.Flush.Messages = 10
  395. config.Producer.Return.Successes = true
  396. config.Producer.Retry.Max = 3
  397. config.Producer.Retry.Backoff = 0
  398. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  399. if err != nil {
  400. t.Fatal(err)
  401. }
  402. for i := 0; i < 10; i++ {
  403. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  404. }
  405. leader1.Close() // producer should get EOF
  406. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  407. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  408. // ok fine, tell it to go to leader2 finally
  409. metadataLeader2 := new(MetadataResponse)
  410. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  411. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  412. seedBroker.Returns(metadataLeader2)
  413. prodSuccess := new(ProduceResponse)
  414. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  415. leader2.Returns(prodSuccess)
  416. expectResults(t, producer, 10, 0)
  417. seedBroker.Close()
  418. leader2.Close()
  419. closeProducer(t, producer)
  420. }
  421. func TestAsyncProducerMultipleRetries(t *testing.T) {
  422. seedBroker := NewMockBroker(t, 1)
  423. leader1 := NewMockBroker(t, 2)
  424. leader2 := NewMockBroker(t, 3)
  425. metadataLeader1 := new(MetadataResponse)
  426. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  427. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  428. seedBroker.Returns(metadataLeader1)
  429. config := NewConfig()
  430. config.Producer.Flush.Messages = 10
  431. config.Producer.Return.Successes = true
  432. config.Producer.Retry.Max = 4
  433. config.Producer.Retry.Backoff = 0
  434. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  435. if err != nil {
  436. t.Fatal(err)
  437. }
  438. for i := 0; i < 10; i++ {
  439. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  440. }
  441. prodNotLeader := new(ProduceResponse)
  442. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  443. leader1.Returns(prodNotLeader)
  444. metadataLeader2 := new(MetadataResponse)
  445. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  446. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  447. seedBroker.Returns(metadataLeader2)
  448. leader2.Returns(prodNotLeader)
  449. seedBroker.Returns(metadataLeader1)
  450. leader1.Returns(prodNotLeader)
  451. seedBroker.Returns(metadataLeader1)
  452. leader1.Returns(prodNotLeader)
  453. seedBroker.Returns(metadataLeader2)
  454. prodSuccess := new(ProduceResponse)
  455. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  456. leader2.Returns(prodSuccess)
  457. expectResults(t, producer, 10, 0)
  458. for i := 0; i < 10; i++ {
  459. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  460. }
  461. leader2.Returns(prodSuccess)
  462. expectResults(t, producer, 10, 0)
  463. seedBroker.Close()
  464. leader1.Close()
  465. leader2.Close()
  466. closeProducer(t, producer)
  467. }
  468. func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
  469. seedBroker := NewMockBroker(t, 1)
  470. leader1 := NewMockBroker(t, 2)
  471. leader2 := NewMockBroker(t, 3)
  472. metadataLeader1 := new(MetadataResponse)
  473. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  474. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  475. seedBroker.Returns(metadataLeader1)
  476. config := NewConfig()
  477. config.Producer.Flush.Messages = 1
  478. config.Producer.Return.Successes = true
  479. config.Producer.Retry.Max = 4
  480. backoffCalled := make([]int32, config.Producer.Retry.Max+1)
  481. config.Producer.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
  482. atomic.AddInt32(&backoffCalled[retries-1], 1)
  483. return 0
  484. }
  485. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  486. if err != nil {
  487. t.Fatal(err)
  488. }
  489. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  490. prodNotLeader := new(ProduceResponse)
  491. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  492. prodSuccess := new(ProduceResponse)
  493. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  494. metadataLeader2 := new(MetadataResponse)
  495. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  496. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  497. leader1.Returns(prodNotLeader)
  498. seedBroker.Returns(metadataLeader2)
  499. leader2.Returns(prodNotLeader)
  500. seedBroker.Returns(metadataLeader1)
  501. leader1.Returns(prodNotLeader)
  502. seedBroker.Returns(metadataLeader1)
  503. leader1.Returns(prodNotLeader)
  504. seedBroker.Returns(metadataLeader2)
  505. leader2.Returns(prodSuccess)
  506. expectResults(t, producer, 1, 0)
  507. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  508. leader2.Returns(prodSuccess)
  509. expectResults(t, producer, 1, 0)
  510. seedBroker.Close()
  511. leader1.Close()
  512. leader2.Close()
  513. closeProducer(t, producer)
  514. for i := 0; i < config.Producer.Retry.Max; i++ {
  515. if atomic.LoadInt32(&backoffCalled[i]) != 1 {
  516. t.Errorf("expected one retry attempt #%d", i)
  517. }
  518. }
  519. if atomic.LoadInt32(&backoffCalled[config.Producer.Retry.Max]) != 0 {
  520. t.Errorf("expected no retry attempt #%d", config.Producer.Retry.Max)
  521. }
  522. }
  523. func TestAsyncProducerOutOfRetries(t *testing.T) {
  524. t.Skip("Enable once bug #294 is fixed.")
  525. seedBroker := NewMockBroker(t, 1)
  526. leader := NewMockBroker(t, 2)
  527. metadataResponse := new(MetadataResponse)
  528. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  529. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  530. seedBroker.Returns(metadataResponse)
  531. config := NewConfig()
  532. config.Producer.Flush.Messages = 10
  533. config.Producer.Return.Successes = true
  534. config.Producer.Retry.Backoff = 0
  535. config.Producer.Retry.Max = 0
  536. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  537. if err != nil {
  538. t.Fatal(err)
  539. }
  540. for i := 0; i < 10; i++ {
  541. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  542. }
  543. prodNotLeader := new(ProduceResponse)
  544. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  545. leader.Returns(prodNotLeader)
  546. for i := 0; i < 10; i++ {
  547. select {
  548. case msg := <-producer.Errors():
  549. if msg.Err != ErrNotLeaderForPartition {
  550. t.Error(msg.Err)
  551. }
  552. case <-producer.Successes():
  553. t.Error("Unexpected success")
  554. }
  555. }
  556. seedBroker.Returns(metadataResponse)
  557. for i := 0; i < 10; i++ {
  558. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  559. }
  560. prodSuccess := new(ProduceResponse)
  561. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  562. leader.Returns(prodSuccess)
  563. expectResults(t, producer, 10, 0)
  564. leader.Close()
  565. seedBroker.Close()
  566. safeClose(t, producer)
  567. }
  568. func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
  569. seedBroker := NewMockBroker(t, 1)
  570. leader := NewMockBroker(t, 2)
  571. leaderAddr := leader.Addr()
  572. metadataResponse := new(MetadataResponse)
  573. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  574. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  575. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError)
  576. seedBroker.Returns(metadataResponse)
  577. config := NewConfig()
  578. config.Producer.Return.Successes = true
  579. config.Producer.Retry.Backoff = 0
  580. config.Producer.Retry.Max = 1
  581. config.Producer.Partitioner = NewRoundRobinPartitioner
  582. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  583. if err != nil {
  584. t.Fatal(err)
  585. }
  586. // prime partition 0
  587. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  588. prodSuccess := new(ProduceResponse)
  589. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  590. leader.Returns(prodSuccess)
  591. expectResults(t, producer, 1, 0)
  592. // prime partition 1
  593. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  594. prodSuccess = new(ProduceResponse)
  595. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  596. leader.Returns(prodSuccess)
  597. expectResults(t, producer, 1, 0)
  598. // reboot the broker (the producer will get EOF on its existing connection)
  599. leader.Close()
  600. leader = NewMockBrokerAddr(t, 2, leaderAddr)
  601. // send another message on partition 0 to trigger the EOF and retry
  602. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  603. // tell partition 0 to go to that broker again
  604. seedBroker.Returns(metadataResponse)
  605. // succeed this time
  606. prodSuccess = new(ProduceResponse)
  607. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  608. leader.Returns(prodSuccess)
  609. expectResults(t, producer, 1, 0)
  610. // shutdown
  611. closeProducer(t, producer)
  612. seedBroker.Close()
  613. leader.Close()
  614. }
  615. func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
  616. seedBroker := NewMockBroker(t, 1)
  617. leader := NewMockBroker(t, 2)
  618. metadataResponse := new(MetadataResponse)
  619. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  620. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  621. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError)
  622. seedBroker.Returns(metadataResponse)
  623. config := NewConfig()
  624. config.Producer.Flush.Messages = 5
  625. config.Producer.Return.Successes = true
  626. config.Producer.Retry.Backoff = 0
  627. config.Producer.Retry.Max = 1
  628. config.Producer.Partitioner = NewManualPartitioner
  629. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  630. if err != nil {
  631. t.Fatal(err)
  632. }
  633. // prime partitions
  634. for p := int32(0); p < 2; p++ {
  635. for i := 0; i < 5; i++ {
  636. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
  637. }
  638. prodSuccess := new(ProduceResponse)
  639. prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
  640. leader.Returns(prodSuccess)
  641. expectResults(t, producer, 5, 0)
  642. }
  643. // send more messages on partition 0
  644. for i := 0; i < 5; i++ {
  645. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  646. }
  647. prodNotLeader := new(ProduceResponse)
  648. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  649. leader.Returns(prodNotLeader)
  650. time.Sleep(50 * time.Millisecond)
  651. leader.SetHandlerByMap(map[string]MockResponse{
  652. "ProduceRequest": NewMockProduceResponse(t).
  653. SetVersion(0).
  654. SetError("my_topic", 0, ErrNoError),
  655. })
  656. // tell partition 0 to go to that broker again
  657. seedBroker.Returns(metadataResponse)
  658. // succeed this time
  659. expectResults(t, producer, 5, 0)
  660. // put five more through
  661. for i := 0; i < 5; i++ {
  662. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  663. }
  664. expectResults(t, producer, 5, 0)
  665. // shutdown
  666. closeProducer(t, producer)
  667. seedBroker.Close()
  668. leader.Close()
  669. }
  670. func TestAsyncProducerRetryShutdown(t *testing.T) {
  671. seedBroker := NewMockBroker(t, 1)
  672. leader := NewMockBroker(t, 2)
  673. metadataLeader := new(MetadataResponse)
  674. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  675. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  676. seedBroker.Returns(metadataLeader)
  677. config := NewConfig()
  678. config.Producer.Flush.Messages = 10
  679. config.Producer.Return.Successes = true
  680. config.Producer.Retry.Backoff = 0
  681. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  682. if err != nil {
  683. t.Fatal(err)
  684. }
  685. for i := 0; i < 10; i++ {
  686. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  687. }
  688. producer.AsyncClose()
  689. time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
  690. producer.Input() <- &ProducerMessage{Topic: "FOO"}
  691. if err := <-producer.Errors(); err.Err != ErrShuttingDown {
  692. t.Error(err)
  693. }
  694. prodNotLeader := new(ProduceResponse)
  695. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  696. leader.Returns(prodNotLeader)
  697. seedBroker.Returns(metadataLeader)
  698. prodSuccess := new(ProduceResponse)
  699. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  700. leader.Returns(prodSuccess)
  701. expectResults(t, producer, 10, 0)
  702. seedBroker.Close()
  703. leader.Close()
  704. // wait for the async-closed producer to shut down fully
  705. for err := range producer.Errors() {
  706. t.Error(err)
  707. }
  708. }
  709. func TestAsyncProducerNoReturns(t *testing.T) {
  710. seedBroker := NewMockBroker(t, 1)
  711. leader := NewMockBroker(t, 2)
  712. metadataLeader := new(MetadataResponse)
  713. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  714. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  715. seedBroker.Returns(metadataLeader)
  716. config := NewConfig()
  717. config.Producer.Flush.Messages = 10
  718. config.Producer.Return.Successes = false
  719. config.Producer.Return.Errors = false
  720. config.Producer.Retry.Backoff = 0
  721. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  722. if err != nil {
  723. t.Fatal(err)
  724. }
  725. for i := 0; i < 10; i++ {
  726. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  727. }
  728. wait := make(chan bool)
  729. go func() {
  730. if err := producer.Close(); err != nil {
  731. t.Error(err)
  732. }
  733. close(wait)
  734. }()
  735. prodSuccess := new(ProduceResponse)
  736. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  737. leader.Returns(prodSuccess)
  738. <-wait
  739. seedBroker.Close()
  740. leader.Close()
  741. }
  742. func TestAsyncProducerIdempotentGoldenPath(t *testing.T) {
  743. broker := NewMockBroker(t, 1)
  744. metadataResponse := &MetadataResponse{
  745. Version: 1,
  746. ControllerID: 1,
  747. }
  748. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  749. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
  750. broker.Returns(metadataResponse)
  751. initProducerID := &InitProducerIDResponse{
  752. ThrottleTime: 0,
  753. ProducerID: 1000,
  754. ProducerEpoch: 1,
  755. }
  756. broker.Returns(initProducerID)
  757. config := NewConfig()
  758. config.Producer.Flush.Messages = 10
  759. config.Producer.Return.Successes = true
  760. config.Producer.Retry.Max = 4
  761. config.Producer.RequiredAcks = WaitForAll
  762. config.Producer.Retry.Backoff = 0
  763. config.Producer.Idempotent = true
  764. config.Net.MaxOpenRequests = 1
  765. config.Version = V0_11_0_0
  766. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  767. if err != nil {
  768. t.Fatal(err)
  769. }
  770. for i := 0; i < 10; i++ {
  771. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  772. }
  773. prodSuccess := &ProduceResponse{
  774. Version: 3,
  775. ThrottleTime: 0,
  776. }
  777. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  778. broker.Returns(prodSuccess)
  779. expectResults(t, producer, 10, 0)
  780. broker.Close()
  781. closeProducer(t, producer)
  782. }
  783. func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
  784. //Logger = log.New(os.Stderr, "", log.LstdFlags)
  785. tests := []struct {
  786. name string
  787. failAfterWrite bool
  788. }{
  789. {"FailAfterWrite", true},
  790. {"FailBeforeWrite", false},
  791. }
  792. for _, test := range tests {
  793. broker := NewMockBroker(t, 1)
  794. metadataResponse := &MetadataResponse{
  795. Version: 1,
  796. ControllerID: 1,
  797. }
  798. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  799. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
  800. initProducerIDResponse := &InitProducerIDResponse{
  801. ThrottleTime: 0,
  802. ProducerID: 1000,
  803. ProducerEpoch: 1,
  804. }
  805. prodNotLeaderResponse := &ProduceResponse{
  806. Version: 3,
  807. ThrottleTime: 0,
  808. }
  809. prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)
  810. prodDuplicate := &ProduceResponse{
  811. Version: 3,
  812. ThrottleTime: 0,
  813. }
  814. prodDuplicate.AddTopicPartition("my_topic", 0, ErrDuplicateSequenceNumber)
  815. prodOutOfSeq := &ProduceResponse{
  816. Version: 3,
  817. ThrottleTime: 0,
  818. }
  819. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  820. prodSuccessResponse := &ProduceResponse{
  821. Version: 3,
  822. ThrottleTime: 0,
  823. }
  824. prodSuccessResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  825. prodCounter := 0
  826. lastBatchFirstSeq := -1
  827. lastBatchSize := -1
  828. lastSequenceWrittenToDisk := -1
  829. handlerFailBeforeWrite := func(req *request) (res encoderWithHeader) {
  830. switch req.body.key() {
  831. case 3:
  832. return metadataResponse
  833. case 22:
  834. return initProducerIDResponse
  835. case 0:
  836. prodCounter++
  837. preq := req.body.(*ProduceRequest)
  838. batch := preq.records["my_topic"][0].RecordBatch
  839. batchFirstSeq := int(batch.FirstSequence)
  840. batchSize := len(batch.Records)
  841. if lastSequenceWrittenToDisk == batchFirstSeq-1 { //in sequence append
  842. if lastBatchFirstSeq == batchFirstSeq { //is a batch retry
  843. if lastBatchSize == batchSize { //good retry
  844. // mock write to disk
  845. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  846. return prodSuccessResponse
  847. }
  848. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  849. return prodOutOfSeq
  850. } // not a retry
  851. // save batch just received for future check
  852. lastBatchFirstSeq = batchFirstSeq
  853. lastBatchSize = batchSize
  854. if prodCounter%2 == 1 {
  855. if test.failAfterWrite {
  856. // mock write to disk
  857. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  858. }
  859. return prodNotLeaderResponse
  860. }
  861. // mock write to disk
  862. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  863. return prodSuccessResponse
  864. }
  865. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize == batchSize { // is a good batch retry
  866. if lastSequenceWrittenToDisk == (batchFirstSeq + batchSize - 1) { // we already have the messages
  867. return prodDuplicate
  868. }
  869. // mock write to disk
  870. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  871. return prodSuccessResponse
  872. } //out of sequence / bad retried batch
  873. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize != batchSize {
  874. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  875. } else if lastSequenceWrittenToDisk+1 != batchFirstSeq {
  876. t.Errorf("[%s] Out of sequence message lastSequence=%d new batch starts at=%d", test.name, lastSequenceWrittenToDisk, batchFirstSeq)
  877. } else {
  878. t.Errorf("[%s] Unexpected error", test.name)
  879. }
  880. return prodOutOfSeq
  881. }
  882. return nil
  883. }
  884. config := NewConfig()
  885. config.Version = V0_11_0_0
  886. config.Producer.Idempotent = true
  887. config.Net.MaxOpenRequests = 1
  888. config.Producer.RequiredAcks = WaitForAll
  889. config.Producer.Return.Successes = true
  890. config.Producer.Flush.Frequency = 50 * time.Millisecond
  891. config.Producer.Retry.Backoff = 100 * time.Millisecond
  892. broker.setHandler(handlerFailBeforeWrite)
  893. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  894. if err != nil {
  895. t.Fatal(err)
  896. }
  897. for i := 0; i < 3; i++ {
  898. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  899. }
  900. go func() {
  901. for i := 0; i < 7; i++ {
  902. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine")}
  903. time.Sleep(100 * time.Millisecond)
  904. }
  905. }()
  906. expectResults(t, producer, 10, 0)
  907. broker.Close()
  908. closeProducer(t, producer)
  909. }
  910. }
  911. func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
  912. broker := NewMockBroker(t, 1)
  913. metadataResponse := &MetadataResponse{
  914. Version: 1,
  915. ControllerID: 1,
  916. }
  917. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  918. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
  919. broker.Returns(metadataResponse)
  920. initProducerID := &InitProducerIDResponse{
  921. ThrottleTime: 0,
  922. ProducerID: 1000,
  923. ProducerEpoch: 1,
  924. }
  925. broker.Returns(initProducerID)
  926. config := NewConfig()
  927. config.Producer.Flush.Messages = 10
  928. config.Producer.Return.Successes = true
  929. config.Producer.Retry.Max = 400000
  930. config.Producer.RequiredAcks = WaitForAll
  931. config.Producer.Retry.Backoff = 0
  932. config.Producer.Idempotent = true
  933. config.Net.MaxOpenRequests = 1
  934. config.Version = V0_11_0_0
  935. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  936. if err != nil {
  937. t.Fatal(err)
  938. }
  939. for i := 0; i < 10; i++ {
  940. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  941. }
  942. prodOutOfSeq := &ProduceResponse{
  943. Version: 3,
  944. ThrottleTime: 0,
  945. }
  946. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  947. broker.Returns(prodOutOfSeq)
  948. expectResults(t, producer, 0, 10)
  949. broker.Close()
  950. closeProducer(t, producer)
  951. }
  952. func TestAsyncProducerIdempotentEpochRollover(t *testing.T) {
  953. broker := NewMockBroker(t, 1)
  954. defer broker.Close()
  955. metadataResponse := &MetadataResponse{
  956. Version: 1,
  957. ControllerID: 1,
  958. }
  959. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  960. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
  961. broker.Returns(metadataResponse)
  962. initProducerID := &InitProducerIDResponse{
  963. ThrottleTime: 0,
  964. ProducerID: 1000,
  965. ProducerEpoch: 1,
  966. }
  967. broker.Returns(initProducerID)
  968. config := NewConfig()
  969. config.Producer.Flush.Messages = 10
  970. config.Producer.Flush.Frequency = 10 * time.Millisecond
  971. config.Producer.Return.Successes = true
  972. config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust
  973. config.Producer.RequiredAcks = WaitForAll
  974. config.Producer.Retry.Backoff = 0
  975. config.Producer.Idempotent = true
  976. config.Net.MaxOpenRequests = 1
  977. config.Version = V0_11_0_0
  978. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  979. if err != nil {
  980. t.Fatal(err)
  981. }
  982. defer closeProducer(t, producer)
  983. producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
  984. prodError := &ProduceResponse{
  985. Version: 3,
  986. ThrottleTime: 0,
  987. }
  988. prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable)
  989. broker.Returns(prodError)
  990. <-producer.Errors()
  991. lastReqRes := broker.history[len(broker.history)-1]
  992. lastProduceBatch := lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
  993. if lastProduceBatch.FirstSequence != 0 {
  994. t.Error("first sequence not zero")
  995. }
  996. if lastProduceBatch.ProducerEpoch != 1 {
  997. t.Error("first epoch was not one")
  998. }
  999. // Now if we produce again, the epoch should have rolled over.
  1000. producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
  1001. broker.Returns(prodError)
  1002. <-producer.Errors()
  1003. lastReqRes = broker.history[len(broker.history)-1]
  1004. lastProduceBatch = lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
  1005. if lastProduceBatch.FirstSequence != 0 {
  1006. t.Error("second sequence not zero")
  1007. }
  1008. if lastProduceBatch.ProducerEpoch <= 1 {
  1009. t.Error("second epoch was not > 1")
  1010. }
  1011. }
  1012. // TestBrokerProducerShutdown ensures that a call to shutdown stops the
  1013. // brokerProducer run() loop and doesn't leak any goroutines
  1014. func TestBrokerProducerShutdown(t *testing.T) {
  1015. defer leaktest.Check(t)()
  1016. metrics.UseNilMetrics = true // disable Sarama's go-metrics library
  1017. defer func() {
  1018. metrics.UseNilMetrics = false
  1019. }()
  1020. mockBroker := NewMockBroker(t, 1)
  1021. metadataResponse := &MetadataResponse{}
  1022. metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID())
  1023. metadataResponse.AddTopicPartition(
  1024. "my_topic", 0, mockBroker.BrokerID(), nil, nil, nil, ErrNoError)
  1025. mockBroker.Returns(metadataResponse)
  1026. producer, err := NewAsyncProducer([]string{mockBroker.Addr()}, nil)
  1027. if err != nil {
  1028. t.Fatal(err)
  1029. }
  1030. broker := &Broker{
  1031. addr: mockBroker.Addr(),
  1032. id: mockBroker.BrokerID(),
  1033. }
  1034. bp := producer.(*asyncProducer).newBrokerProducer(broker)
  1035. bp.shutdown()
  1036. _ = producer.Close()
  1037. mockBroker.Close()
  1038. }
  1039. // This example shows how to use the producer while simultaneously
  1040. // reading the Errors channel to know about any failures.
  1041. func ExampleAsyncProducer_select() {
  1042. producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
  1043. if err != nil {
  1044. panic(err)
  1045. }
  1046. defer func() {
  1047. if err := producer.Close(); err != nil {
  1048. log.Fatalln(err)
  1049. }
  1050. }()
  1051. // Trap SIGINT to trigger a shutdown.
  1052. signals := make(chan os.Signal, 1)
  1053. signal.Notify(signals, os.Interrupt)
  1054. var enqueued, errors int
  1055. ProducerLoop:
  1056. for {
  1057. select {
  1058. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  1059. enqueued++
  1060. case err := <-producer.Errors():
  1061. log.Println("Failed to produce message", err)
  1062. errors++
  1063. case <-signals:
  1064. break ProducerLoop
  1065. }
  1066. }
  1067. log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
  1068. }
  1069. // This example shows how to use the producer with separate goroutines
  1070. // reading from the Successes and Errors channels. Note that in order
  1071. // for the Successes channel to be populated, you have to set
  1072. // config.Producer.Return.Successes to true.
  1073. func ExampleAsyncProducer_goroutines() {
  1074. config := NewConfig()
  1075. config.Producer.Return.Successes = true
  1076. producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
  1077. if err != nil {
  1078. panic(err)
  1079. }
  1080. // Trap SIGINT to trigger a graceful shutdown.
  1081. signals := make(chan os.Signal, 1)
  1082. signal.Notify(signals, os.Interrupt)
  1083. var (
  1084. wg sync.WaitGroup
  1085. enqueued, successes, errors int
  1086. )
  1087. wg.Add(1)
  1088. go func() {
  1089. defer wg.Done()
  1090. for range producer.Successes() {
  1091. successes++
  1092. }
  1093. }()
  1094. wg.Add(1)
  1095. go func() {
  1096. defer wg.Done()
  1097. for err := range producer.Errors() {
  1098. log.Println(err)
  1099. errors++
  1100. }
  1101. }()
  1102. ProducerLoop:
  1103. for {
  1104. message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  1105. select {
  1106. case producer.Input() <- message:
  1107. enqueued++
  1108. case <-signals:
  1109. producer.AsyncClose() // Trigger a shutdown of the producer.
  1110. break ProducerLoop
  1111. }
  1112. }
  1113. wg.Wait()
  1114. log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
  1115. }