async_producer_test.go 34 KB


  1. package sarama
  2. import (
  3. "errors"
  4. "log"
  5. "os"
  6. "os/signal"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. const TestMessage = "ABC THE MESSAGE"
  12. func closeProducer(t *testing.T, p AsyncProducer) {
  13. var wg sync.WaitGroup
  14. p.AsyncClose()
  15. wg.Add(2)
  16. go func() {
  17. for range p.Successes() {
  18. t.Error("Unexpected message on Successes()")
  19. }
  20. wg.Done()
  21. }()
  22. go func() {
  23. for msg := range p.Errors() {
  24. t.Error(msg.Err)
  25. }
  26. wg.Done()
  27. }()
  28. wg.Wait()
  29. }
  30. func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
  31. expect := successes + errors
  32. for expect > 0 {
  33. select {
  34. case msg := <-p.Errors():
  35. if msg.Msg.flags != 0 {
  36. t.Error("Message had flags set")
  37. }
  38. errors--
  39. expect--
  40. if errors < 0 {
  41. t.Error(msg.Err)
  42. }
  43. case msg := <-p.Successes():
  44. if msg.flags != 0 {
  45. t.Error("Message had flags set")
  46. }
  47. successes--
  48. expect--
  49. if successes < 0 {
  50. t.Error("Too many successes")
  51. }
  52. }
  53. }
  54. if successes != 0 || errors != 0 {
  55. t.Error("Unexpected successes", successes, "or errors", errors)
  56. }
  57. }
  58. type testPartitioner chan *int32
  59. func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
  60. part := <-p
  61. if part == nil {
  62. return 0, errors.New("BOOM")
  63. }
  64. return *part, nil
  65. }
  66. func (p testPartitioner) RequiresConsistency() bool {
  67. return true
  68. }
  69. func (p testPartitioner) feed(partition int32) {
  70. p <- &partition
  71. }
  72. type flakyEncoder bool
  73. func (f flakyEncoder) Length() int {
  74. return len(TestMessage)
  75. }
  76. func (f flakyEncoder) Encode() ([]byte, error) {
  77. if !bool(f) {
  78. return nil, errors.New("flaky encoding error")
  79. }
  80. return []byte(TestMessage), nil
  81. }
  82. func TestAsyncProducer(t *testing.T) {
  83. seedBroker := NewMockBroker(t, 1)
  84. leader := NewMockBroker(t, 2)
  85. metadataResponse := new(MetadataResponse)
  86. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  87. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  88. seedBroker.Returns(metadataResponse)
  89. prodSuccess := new(ProduceResponse)
  90. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  91. leader.Returns(prodSuccess)
  92. config := NewConfig()
  93. config.Producer.Flush.Messages = 10
  94. config.Producer.Return.Successes = true
  95. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  96. if err != nil {
  97. t.Fatal(err)
  98. }
  99. for i := 0; i < 10; i++ {
  100. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  101. }
  102. for i := 0; i < 10; i++ {
  103. select {
  104. case msg := <-producer.Errors():
  105. t.Error(msg.Err)
  106. if msg.Msg.flags != 0 {
  107. t.Error("Message had flags set")
  108. }
  109. case msg := <-producer.Successes():
  110. if msg.flags != 0 {
  111. t.Error("Message had flags set")
  112. }
  113. if msg.Metadata.(int) != i {
  114. t.Error("Message metadata did not match")
  115. }
  116. case <-time.After(time.Second):
  117. t.Errorf("Timeout waiting for msg #%d", i)
  118. goto done
  119. }
  120. }
  121. done:
  122. closeProducer(t, producer)
  123. leader.Close()
  124. seedBroker.Close()
  125. }
  126. func TestAsyncProducerMultipleFlushes(t *testing.T) {
  127. seedBroker := NewMockBroker(t, 1)
  128. leader := NewMockBroker(t, 2)
  129. metadataResponse := new(MetadataResponse)
  130. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  131. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  132. seedBroker.Returns(metadataResponse)
  133. prodSuccess := new(ProduceResponse)
  134. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  135. leader.Returns(prodSuccess)
  136. leader.Returns(prodSuccess)
  137. leader.Returns(prodSuccess)
  138. config := NewConfig()
  139. config.Producer.Flush.Messages = 5
  140. config.Producer.Return.Successes = true
  141. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  142. if err != nil {
  143. t.Fatal(err)
  144. }
  145. for flush := 0; flush < 3; flush++ {
  146. for i := 0; i < 5; i++ {
  147. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  148. }
  149. expectResults(t, producer, 5, 0)
  150. }
  151. closeProducer(t, producer)
  152. leader.Close()
  153. seedBroker.Close()
  154. }
  155. func TestAsyncProducerMultipleBrokers(t *testing.T) {
  156. seedBroker := NewMockBroker(t, 1)
  157. leader0 := NewMockBroker(t, 2)
  158. leader1 := NewMockBroker(t, 3)
  159. metadataResponse := new(MetadataResponse)
  160. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  161. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  162. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  163. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  164. seedBroker.Returns(metadataResponse)
  165. prodResponse0 := new(ProduceResponse)
  166. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  167. leader0.Returns(prodResponse0)
  168. prodResponse1 := new(ProduceResponse)
  169. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  170. leader1.Returns(prodResponse1)
  171. config := NewConfig()
  172. config.Producer.Flush.Messages = 5
  173. config.Producer.Return.Successes = true
  174. config.Producer.Partitioner = NewRoundRobinPartitioner
  175. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  176. if err != nil {
  177. t.Fatal(err)
  178. }
  179. for i := 0; i < 10; i++ {
  180. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  181. }
  182. expectResults(t, producer, 10, 0)
  183. closeProducer(t, producer)
  184. leader1.Close()
  185. leader0.Close()
  186. seedBroker.Close()
  187. }
  188. func TestAsyncProducerCustomPartitioner(t *testing.T) {
  189. seedBroker := NewMockBroker(t, 1)
  190. leader := NewMockBroker(t, 2)
  191. metadataResponse := new(MetadataResponse)
  192. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  193. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  194. seedBroker.Returns(metadataResponse)
  195. prodResponse := new(ProduceResponse)
  196. prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  197. leader.Returns(prodResponse)
  198. config := NewConfig()
  199. config.Producer.Flush.Messages = 2
  200. config.Producer.Return.Successes = true
  201. config.Producer.Partitioner = func(topic string) Partitioner {
  202. p := make(testPartitioner)
  203. go func() {
  204. p.feed(0)
  205. p <- nil
  206. p <- nil
  207. p <- nil
  208. p.feed(0)
  209. }()
  210. return p
  211. }
  212. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  213. if err != nil {
  214. t.Fatal(err)
  215. }
  216. for i := 0; i < 5; i++ {
  217. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  218. }
  219. expectResults(t, producer, 2, 3)
  220. closeProducer(t, producer)
  221. leader.Close()
  222. seedBroker.Close()
  223. }
  224. func TestAsyncProducerFailureRetry(t *testing.T) {
  225. seedBroker := NewMockBroker(t, 1)
  226. leader1 := NewMockBroker(t, 2)
  227. leader2 := NewMockBroker(t, 3)
  228. metadataLeader1 := new(MetadataResponse)
  229. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  230. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  231. seedBroker.Returns(metadataLeader1)
  232. config := NewConfig()
  233. config.Producer.Flush.Messages = 10
  234. config.Producer.Return.Successes = true
  235. config.Producer.Retry.Backoff = 0
  236. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  237. if err != nil {
  238. t.Fatal(err)
  239. }
  240. seedBroker.Close()
  241. for i := 0; i < 10; i++ {
  242. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  243. }
  244. prodNotLeader := new(ProduceResponse)
  245. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  246. leader1.Returns(prodNotLeader)
  247. metadataLeader2 := new(MetadataResponse)
  248. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  249. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  250. leader1.Returns(metadataLeader2)
  251. prodSuccess := new(ProduceResponse)
  252. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  253. leader2.Returns(prodSuccess)
  254. expectResults(t, producer, 10, 0)
  255. leader1.Close()
  256. for i := 0; i < 10; i++ {
  257. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  258. }
  259. leader2.Returns(metadataLeader2)
  260. leader2.Returns(prodSuccess)
  261. expectResults(t, producer, 10, 0)
  262. leader2.Close()
  263. closeProducer(t, producer)
  264. }
  265. func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
  266. tt := func(t *testing.T, kErr KError) {
  267. seedBroker := NewMockBroker(t, 1)
  268. leader1 := NewMockBroker(t, 2)
  269. leader2 := NewMockBroker(t, 3)
  270. metadataLeader1 := new(MetadataResponse)
  271. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  272. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  273. metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  274. seedBroker.Returns(metadataLeader1)
  275. config := NewConfig()
  276. config.Producer.Flush.Messages = 2
  277. config.Producer.Return.Successes = true
  278. config.Producer.Retry.Max = 0 // disable!
  279. config.Producer.Retry.Backoff = 0
  280. config.Producer.Partitioner = NewManualPartitioner
  281. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  282. if err != nil {
  283. t.Fatal(err)
  284. }
  285. seedBroker.Close()
  286. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  287. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
  288. prodNotLeader := new(ProduceResponse)
  289. prodNotLeader.AddTopicPartition("my_topic", 0, kErr)
  290. prodNotLeader.AddTopicPartition("my_topic", 1, kErr)
  291. leader1.Returns(prodNotLeader)
  292. expectResults(t, producer, 0, 2)
  293. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  294. metadataLeader2 := new(MetadataResponse)
  295. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  296. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  297. metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, ErrNoError)
  298. leader1.Returns(metadataLeader2)
  299. leader1.Returns(metadataLeader2)
  300. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
  301. prodSuccess := new(ProduceResponse)
  302. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  303. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  304. leader2.Returns(prodSuccess)
  305. expectResults(t, producer, 2, 0)
  306. leader1.Close()
  307. leader2.Close()
  308. closeProducer(t, producer)
  309. }
  310. t.Run("retriable error", func(t *testing.T) {
  311. tt(t, ErrNotLeaderForPartition)
  312. })
  313. t.Run("non-retriable error", func(t *testing.T) {
  314. tt(t, ErrNotController)
  315. })
  316. }
  317. func TestAsyncProducerEncoderFailures(t *testing.T) {
  318. seedBroker := NewMockBroker(t, 1)
  319. leader := NewMockBroker(t, 2)
  320. metadataResponse := new(MetadataResponse)
  321. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  322. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  323. seedBroker.Returns(metadataResponse)
  324. prodSuccess := new(ProduceResponse)
  325. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  326. leader.Returns(prodSuccess)
  327. leader.Returns(prodSuccess)
  328. leader.Returns(prodSuccess)
  329. config := NewConfig()
  330. config.Producer.Flush.Messages = 1
  331. config.Producer.Return.Successes = true
  332. config.Producer.Partitioner = NewManualPartitioner
  333. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  334. if err != nil {
  335. t.Fatal(err)
  336. }
  337. for flush := 0; flush < 3; flush++ {
  338. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
  339. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
  340. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
  341. expectResults(t, producer, 1, 2)
  342. }
  343. closeProducer(t, producer)
  344. leader.Close()
  345. seedBroker.Close()
  346. }
  347. // If a Kafka broker becomes unavailable and then returns back in service, then
  348. // producer reconnects to it and continues sending messages.
  349. func TestAsyncProducerBrokerBounce(t *testing.T) {
  350. // Given
  351. seedBroker := NewMockBroker(t, 1)
  352. leader := NewMockBroker(t, 2)
  353. leaderAddr := leader.Addr()
  354. metadataResponse := new(MetadataResponse)
  355. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  356. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  357. seedBroker.Returns(metadataResponse)
  358. prodSuccess := new(ProduceResponse)
  359. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  360. config := NewConfig()
  361. config.Producer.Flush.Messages = 1
  362. config.Producer.Return.Successes = true
  363. config.Producer.Retry.Backoff = 0
  364. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  365. if err != nil {
  366. t.Fatal(err)
  367. }
  368. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  369. leader.Returns(prodSuccess)
  370. expectResults(t, producer, 1, 0)
  371. // When: a broker connection gets reset by a broker (network glitch, restart, you name it).
  372. leader.Close() // producer should get EOF
  373. leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  374. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  375. // Then: a produced message goes through the new broker connection.
  376. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  377. leader.Returns(prodSuccess)
  378. expectResults(t, producer, 1, 0)
  379. closeProducer(t, producer)
  380. seedBroker.Close()
  381. leader.Close()
  382. }
  383. func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  384. seedBroker := NewMockBroker(t, 1)
  385. leader1 := NewMockBroker(t, 2)
  386. leader2 := NewMockBroker(t, 3)
  387. metadataLeader1 := new(MetadataResponse)
  388. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  389. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  390. seedBroker.Returns(metadataLeader1)
  391. config := NewConfig()
  392. config.Producer.Flush.Messages = 10
  393. config.Producer.Return.Successes = true
  394. config.Producer.Retry.Max = 3
  395. config.Producer.Retry.Backoff = 0
  396. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  397. if err != nil {
  398. t.Fatal(err)
  399. }
  400. for i := 0; i < 10; i++ {
  401. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  402. }
  403. leader1.Close() // producer should get EOF
  404. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  405. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  406. // ok fine, tell it to go to leader2 finally
  407. metadataLeader2 := new(MetadataResponse)
  408. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  409. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  410. seedBroker.Returns(metadataLeader2)
  411. prodSuccess := new(ProduceResponse)
  412. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  413. leader2.Returns(prodSuccess)
  414. expectResults(t, producer, 10, 0)
  415. seedBroker.Close()
  416. leader2.Close()
  417. closeProducer(t, producer)
  418. }
  419. func TestAsyncProducerMultipleRetries(t *testing.T) {
  420. seedBroker := NewMockBroker(t, 1)
  421. leader1 := NewMockBroker(t, 2)
  422. leader2 := NewMockBroker(t, 3)
  423. metadataLeader1 := new(MetadataResponse)
  424. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  425. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  426. seedBroker.Returns(metadataLeader1)
  427. config := NewConfig()
  428. config.Producer.Flush.Messages = 10
  429. config.Producer.Return.Successes = true
  430. config.Producer.Retry.Max = 4
  431. config.Producer.Retry.Backoff = 0
  432. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  433. if err != nil {
  434. t.Fatal(err)
  435. }
  436. for i := 0; i < 10; i++ {
  437. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  438. }
  439. prodNotLeader := new(ProduceResponse)
  440. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  441. leader1.Returns(prodNotLeader)
  442. metadataLeader2 := new(MetadataResponse)
  443. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  444. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  445. seedBroker.Returns(metadataLeader2)
  446. leader2.Returns(prodNotLeader)
  447. seedBroker.Returns(metadataLeader1)
  448. leader1.Returns(prodNotLeader)
  449. seedBroker.Returns(metadataLeader1)
  450. leader1.Returns(prodNotLeader)
  451. seedBroker.Returns(metadataLeader2)
  452. seedBroker.Returns(metadataLeader2)
  453. prodSuccess := new(ProduceResponse)
  454. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  455. leader2.Returns(prodSuccess)
  456. expectResults(t, producer, 10, 0)
  457. for i := 0; i < 10; i++ {
  458. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  459. }
  460. leader2.Returns(prodSuccess)
  461. expectResults(t, producer, 10, 0)
  462. seedBroker.Close()
  463. leader1.Close()
  464. leader2.Close()
  465. closeProducer(t, producer)
  466. }
  467. func TestAsyncProducerOutOfRetries(t *testing.T) {
  468. t.Skip("Enable once bug #294 is fixed.")
  469. seedBroker := NewMockBroker(t, 1)
  470. leader := NewMockBroker(t, 2)
  471. metadataResponse := new(MetadataResponse)
  472. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  473. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  474. seedBroker.Returns(metadataResponse)
  475. config := NewConfig()
  476. config.Producer.Flush.Messages = 10
  477. config.Producer.Return.Successes = true
  478. config.Producer.Retry.Backoff = 0
  479. config.Producer.Retry.Max = 0
  480. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  481. if err != nil {
  482. t.Fatal(err)
  483. }
  484. for i := 0; i < 10; i++ {
  485. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  486. }
  487. prodNotLeader := new(ProduceResponse)
  488. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  489. leader.Returns(prodNotLeader)
  490. for i := 0; i < 10; i++ {
  491. select {
  492. case msg := <-producer.Errors():
  493. if msg.Err != ErrNotLeaderForPartition {
  494. t.Error(msg.Err)
  495. }
  496. case <-producer.Successes():
  497. t.Error("Unexpected success")
  498. }
  499. }
  500. seedBroker.Returns(metadataResponse)
  501. for i := 0; i < 10; i++ {
  502. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  503. }
  504. prodSuccess := new(ProduceResponse)
  505. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  506. leader.Returns(prodSuccess)
  507. expectResults(t, producer, 10, 0)
  508. leader.Close()
  509. seedBroker.Close()
  510. safeClose(t, producer)
  511. }
  512. func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
  513. seedBroker := NewMockBroker(t, 1)
  514. leader := NewMockBroker(t, 2)
  515. leaderAddr := leader.Addr()
  516. metadataResponse := new(MetadataResponse)
  517. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  518. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  519. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  520. seedBroker.Returns(metadataResponse)
  521. config := NewConfig()
  522. config.Producer.Return.Successes = true
  523. config.Producer.Retry.Backoff = 0
  524. config.Producer.Retry.Max = 1
  525. config.Producer.Partitioner = NewRoundRobinPartitioner
  526. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  527. if err != nil {
  528. t.Fatal(err)
  529. }
  530. // prime partition 0
  531. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  532. prodSuccess := new(ProduceResponse)
  533. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  534. leader.Returns(prodSuccess)
  535. expectResults(t, producer, 1, 0)
  536. // prime partition 1
  537. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  538. prodSuccess = new(ProduceResponse)
  539. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  540. leader.Returns(prodSuccess)
  541. expectResults(t, producer, 1, 0)
  542. // reboot the broker (the producer will get EOF on its existing connection)
  543. leader.Close()
  544. leader = NewMockBrokerAddr(t, 2, leaderAddr)
  545. // send another message on partition 0 to trigger the EOF and retry
  546. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  547. // tell partition 0 to go to that broker again
  548. seedBroker.Returns(metadataResponse)
  549. // succeed this time
  550. prodSuccess = new(ProduceResponse)
  551. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  552. leader.Returns(prodSuccess)
  553. expectResults(t, producer, 1, 0)
  554. // shutdown
  555. closeProducer(t, producer)
  556. seedBroker.Close()
  557. leader.Close()
  558. }
  559. func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
  560. seedBroker := NewMockBroker(t, 1)
  561. leader := NewMockBroker(t, 2)
  562. metadataResponse := new(MetadataResponse)
  563. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  564. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  565. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  566. seedBroker.Returns(metadataResponse)
  567. config := NewConfig()
  568. config.Producer.Flush.Messages = 5
  569. config.Producer.Return.Successes = true
  570. config.Producer.Retry.Backoff = 0
  571. config.Producer.Retry.Max = 1
  572. config.Producer.Partitioner = NewManualPartitioner
  573. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  574. if err != nil {
  575. t.Fatal(err)
  576. }
  577. // prime partitions
  578. for p := int32(0); p < 2; p++ {
  579. for i := 0; i < 5; i++ {
  580. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
  581. }
  582. prodSuccess := new(ProduceResponse)
  583. prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
  584. leader.Returns(prodSuccess)
  585. expectResults(t, producer, 5, 0)
  586. }
  587. // send more messages on partition 0
  588. for i := 0; i < 5; i++ {
  589. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  590. }
  591. prodNotLeader := new(ProduceResponse)
  592. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  593. leader.Returns(prodNotLeader)
  594. time.Sleep(50 * time.Millisecond)
  595. leader.SetHandlerByMap(map[string]MockResponse{
  596. "ProduceRequest": NewMockProduceResponse(t).
  597. SetVersion(0).
  598. SetError("my_topic", 0, ErrNoError),
  599. })
  600. // tell partition 0 to go to that broker again
  601. seedBroker.Returns(metadataResponse)
  602. // succeed this time
  603. expectResults(t, producer, 5, 0)
  604. seedBroker.Returns(metadataResponse)
  605. // put five more through
  606. for i := 0; i < 5; i++ {
  607. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  608. }
  609. expectResults(t, producer, 5, 0)
  610. // shutdown
  611. closeProducer(t, producer)
  612. seedBroker.Close()
  613. leader.Close()
  614. }
  615. func TestAsyncProducerRetryShutdown(t *testing.T) {
  616. seedBroker := NewMockBroker(t, 1)
  617. leader := NewMockBroker(t, 2)
  618. metadataLeader := new(MetadataResponse)
  619. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  620. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  621. seedBroker.Returns(metadataLeader)
  622. config := NewConfig()
  623. config.Producer.Flush.Messages = 10
  624. config.Producer.Return.Successes = true
  625. config.Producer.Retry.Backoff = 0
  626. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  627. if err != nil {
  628. t.Fatal(err)
  629. }
  630. for i := 0; i < 10; i++ {
  631. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  632. }
  633. producer.AsyncClose()
  634. time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
  635. producer.Input() <- &ProducerMessage{Topic: "FOO"}
  636. if err := <-producer.Errors(); err.Err != ErrShuttingDown {
  637. t.Error(err)
  638. }
  639. prodNotLeader := new(ProduceResponse)
  640. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  641. leader.Returns(prodNotLeader)
  642. seedBroker.Returns(metadataLeader)
  643. prodSuccess := new(ProduceResponse)
  644. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  645. leader.Returns(prodSuccess)
  646. expectResults(t, producer, 10, 0)
  647. seedBroker.Close()
  648. leader.Close()
  649. // wait for the async-closed producer to shut down fully
  650. for err := range producer.Errors() {
  651. t.Error(err)
  652. }
  653. }
  654. func TestAsyncProducerNoReturns(t *testing.T) {
  655. seedBroker := NewMockBroker(t, 1)
  656. leader := NewMockBroker(t, 2)
  657. metadataLeader := new(MetadataResponse)
  658. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  659. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  660. seedBroker.Returns(metadataLeader)
  661. config := NewConfig()
  662. config.Producer.Flush.Messages = 10
  663. config.Producer.Return.Successes = false
  664. config.Producer.Return.Errors = false
  665. config.Producer.Retry.Backoff = 0
  666. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  667. if err != nil {
  668. t.Fatal(err)
  669. }
  670. for i := 0; i < 10; i++ {
  671. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  672. }
  673. wait := make(chan bool)
  674. go func() {
  675. if err := producer.Close(); err != nil {
  676. t.Error(err)
  677. }
  678. close(wait)
  679. }()
  680. prodSuccess := new(ProduceResponse)
  681. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  682. leader.Returns(prodSuccess)
  683. <-wait
  684. seedBroker.Close()
  685. leader.Close()
  686. }
  687. func TestAsyncProducerIdempotentGoldenPath(t *testing.T) {
  688. broker := NewMockBroker(t, 1)
  689. metadataResponse := &MetadataResponse{
  690. Version: 1,
  691. ControllerID: 1,
  692. }
  693. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  694. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  695. broker.Returns(metadataResponse)
  696. initProducerID := &InitProducerIDResponse{
  697. ThrottleTime: 0,
  698. ProducerID: 1000,
  699. ProducerEpoch: 1,
  700. }
  701. broker.Returns(initProducerID)
  702. config := NewConfig()
  703. config.Producer.Flush.Messages = 10
  704. config.Producer.Return.Successes = true
  705. config.Producer.Retry.Max = 4
  706. config.Producer.RequiredAcks = WaitForAll
  707. config.Producer.Retry.Backoff = 0
  708. config.Producer.Idempotent = true
  709. config.Net.MaxOpenRequests = 1
  710. config.Version = V0_11_0_0
  711. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  712. if err != nil {
  713. t.Fatal(err)
  714. }
  715. for i := 0; i < 10; i++ {
  716. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  717. }
  718. prodSuccess := &ProduceResponse{
  719. Version: 3,
  720. ThrottleTime: 0,
  721. }
  722. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  723. broker.Returns(prodSuccess)
  724. expectResults(t, producer, 10, 0)
  725. broker.Close()
  726. closeProducer(t, producer)
  727. }
  728. func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
  729. //Logger = log.New(os.Stderr, "", log.LstdFlags)
  730. tests := []struct {
  731. name string
  732. failAfterWrite bool
  733. }{
  734. {"FailAfterWrite", true},
  735. {"FailBeforeWrite", false},
  736. }
  737. for _, test := range tests {
  738. broker := NewMockBroker(t, 1)
  739. metadataResponse := &MetadataResponse{
  740. Version: 1,
  741. ControllerID: 1,
  742. }
  743. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  744. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  745. initProducerIDResponse := &InitProducerIDResponse{
  746. ThrottleTime: 0,
  747. ProducerID: 1000,
  748. ProducerEpoch: 1,
  749. }
  750. prodNotLeaderResponse := &ProduceResponse{
  751. Version: 3,
  752. ThrottleTime: 0,
  753. }
  754. prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)
  755. prodDuplicate := &ProduceResponse{
  756. Version: 3,
  757. ThrottleTime: 0,
  758. }
  759. prodDuplicate.AddTopicPartition("my_topic", 0, ErrDuplicateSequenceNumber)
  760. prodOutOfSeq := &ProduceResponse{
  761. Version: 3,
  762. ThrottleTime: 0,
  763. }
  764. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  765. prodSuccessResponse := &ProduceResponse{
  766. Version: 3,
  767. ThrottleTime: 0,
  768. }
  769. prodSuccessResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  770. prodCounter := 0
  771. lastBatchFirstSeq := -1
  772. lastBatchSize := -1
  773. lastSequenceWrittenToDisk := -1
  774. handlerFailBeforeWrite := func(req *request) (res encoder) {
  775. switch req.body.key() {
  776. case 3:
  777. return metadataResponse
  778. case 22:
  779. return initProducerIDResponse
  780. case 0:
  781. prodCounter++
  782. preq := req.body.(*ProduceRequest)
  783. batch := preq.records["my_topic"][0].RecordBatch
  784. batchFirstSeq := int(batch.FirstSequence)
  785. batchSize := len(batch.Records)
  786. if lastSequenceWrittenToDisk == batchFirstSeq-1 { //in sequence append
  787. if lastBatchFirstSeq == batchFirstSeq { //is a batch retry
  788. if lastBatchSize == batchSize { //good retry
  789. // mock write to disk
  790. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  791. return prodSuccessResponse
  792. }
  793. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  794. return prodOutOfSeq
  795. } else { // not a retry
  796. // save batch just received for future check
  797. lastBatchFirstSeq = batchFirstSeq
  798. lastBatchSize = batchSize
  799. if prodCounter%2 == 1 {
  800. if test.failAfterWrite {
  801. // mock write to disk
  802. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  803. }
  804. return prodNotLeaderResponse
  805. }
  806. // mock write to disk
  807. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  808. return prodSuccessResponse
  809. }
  810. } else {
  811. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize == batchSize { // is a good batch retry
  812. if lastSequenceWrittenToDisk == (batchFirstSeq + batchSize - 1) { // we already have the messages
  813. return prodDuplicate
  814. }
  815. // mock write to disk
  816. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  817. return prodSuccessResponse
  818. } else { //out of sequence / bad retried batch
  819. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize != batchSize {
  820. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  821. } else if lastSequenceWrittenToDisk+1 != batchFirstSeq {
  822. t.Errorf("[%s] Out of sequence message lastSequence=%d new batch starts at=%d", test.name, lastSequenceWrittenToDisk, batchFirstSeq)
  823. } else {
  824. t.Errorf("[%s] Unexpected error", test.name)
  825. }
  826. return prodOutOfSeq
  827. }
  828. }
  829. }
  830. return nil
  831. }
  832. config := NewConfig()
  833. config.Version = V0_11_0_0
  834. config.Producer.Idempotent = true
  835. config.Net.MaxOpenRequests = 1
  836. config.Producer.RequiredAcks = WaitForAll
  837. config.Producer.Return.Successes = true
  838. config.Producer.Flush.Frequency = 50 * time.Millisecond
  839. config.Producer.Retry.Backoff = 100 * time.Millisecond
  840. broker.setHandler(handlerFailBeforeWrite)
  841. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  842. if err != nil {
  843. t.Fatal(err)
  844. }
  845. for i := 0; i < 3; i++ {
  846. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  847. }
  848. go func() {
  849. for i := 0; i < 7; i++ {
  850. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine")}
  851. time.Sleep(100 * time.Millisecond)
  852. }
  853. }()
  854. expectResults(t, producer, 10, 0)
  855. broker.Close()
  856. closeProducer(t, producer)
  857. }
  858. }
  859. func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
  860. broker := NewMockBroker(t, 1)
  861. metadataResponse := &MetadataResponse{
  862. Version: 1,
  863. ControllerID: 1,
  864. }
  865. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  866. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  867. broker.Returns(metadataResponse)
  868. initProducerID := &InitProducerIDResponse{
  869. ThrottleTime: 0,
  870. ProducerID: 1000,
  871. ProducerEpoch: 1,
  872. }
  873. broker.Returns(initProducerID)
  874. config := NewConfig()
  875. config.Producer.Flush.Messages = 10
  876. config.Producer.Return.Successes = true
  877. config.Producer.Retry.Max = 400000
  878. config.Producer.RequiredAcks = WaitForAll
  879. config.Producer.Retry.Backoff = 0
  880. config.Producer.Idempotent = true
  881. config.Net.MaxOpenRequests = 1
  882. config.Version = V0_11_0_0
  883. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  884. if err != nil {
  885. t.Fatal(err)
  886. }
  887. for i := 0; i < 10; i++ {
  888. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  889. }
  890. prodOutOfSeq := &ProduceResponse{
  891. Version: 3,
  892. ThrottleTime: 0,
  893. }
  894. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  895. broker.Returns(prodOutOfSeq)
  896. expectResults(t, producer, 0, 10)
  897. broker.Close()
  898. closeProducer(t, producer)
  899. }
  900. // This example shows how to use the producer while simultaneously
  901. // reading the Errors channel to know about any failures.
  902. func ExampleAsyncProducer_select() {
  903. producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
  904. if err != nil {
  905. panic(err)
  906. }
  907. defer func() {
  908. if err := producer.Close(); err != nil {
  909. log.Fatalln(err)
  910. }
  911. }()
  912. // Trap SIGINT to trigger a shutdown.
  913. signals := make(chan os.Signal, 1)
  914. signal.Notify(signals, os.Interrupt)
  915. var enqueued, errors int
  916. ProducerLoop:
  917. for {
  918. select {
  919. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  920. enqueued++
  921. case err := <-producer.Errors():
  922. log.Println("Failed to produce message", err)
  923. errors++
  924. case <-signals:
  925. break ProducerLoop
  926. }
  927. }
  928. log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
  929. }
  930. // This example shows how to use the producer with separate goroutines
  931. // reading from the Successes and Errors channels. Note that in order
  932. // for the Successes channel to be populated, you have to set
  933. // config.Producer.Return.Successes to true.
  934. func ExampleAsyncProducer_goroutines() {
  935. config := NewConfig()
  936. config.Producer.Return.Successes = true
  937. producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
  938. if err != nil {
  939. panic(err)
  940. }
  941. // Trap SIGINT to trigger a graceful shutdown.
  942. signals := make(chan os.Signal, 1)
  943. signal.Notify(signals, os.Interrupt)
  944. var (
  945. wg sync.WaitGroup
  946. enqueued, successes, errors int
  947. )
  948. wg.Add(1)
  949. go func() {
  950. defer wg.Done()
  951. for range producer.Successes() {
  952. successes++
  953. }
  954. }()
  955. wg.Add(1)
  956. go func() {
  957. defer wg.Done()
  958. for err := range producer.Errors() {
  959. log.Println(err)
  960. errors++
  961. }
  962. }()
  963. ProducerLoop:
  964. for {
  965. message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  966. select {
  967. case producer.Input() <- message:
  968. enqueued++
  969. case <-signals:
  970. producer.AsyncClose() // Trigger a shutdown of the producer.
  971. break ProducerLoop
  972. }
  973. }
  974. wg.Wait()
  975. log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
  976. }